Creating A Weighted Average User Defined Aggregate in StreamInsight 1.1

November 16 2011

I’ve been doing some prototyping StreamInsight lately and hit the following issue. I needed to write a weighted average so that I could roll up result sets.  Let me elaborate on the scenario.  I had a query whose HoppingWindow has a WindowSize of 1 minute.  One of the things the query returns is an average of one of the values returned by the event payload:

AvgLoadTime = win.Avg(e => e.LoadTime)

I then want to take all the results of that query and roll them up into a larger window of 1 hour. So that means writing a new query that uses the results of the first query for its calculation. But, if I were to simply take the average of the average, I’d get inaccurate results. What I need was the weighted mean.

To do so, I changed the code to look like this:

AvgLoadTime = win.Sum(e => e.AvgLoadTime * e.RecordCount)/win.Sum(e => e.RecordCount)

But when I deployed to StreamInsight 1.1 and got no love:

Microsoft.ComplexEventProcessing.Linq.QueryGenerationException: Expression '(e.Sum(e => (e.AvgLoadTime * Convert(e.RecordCount))) / Convert(e.Sum(e => e.RecordCount)))' contains more than one aggregate method call, which is not supported.

Doh. But then I got hip to user-defined aggregates. I found the documentation on this MSDN page essential and it helped me to write my extension, which looks like this:

public class WeightedAverage : CepAggregate<WqAggregate, double> { public override double GenerateOutput(IEnumerable<WqAggregate> win) { var AvgLoadTime = win.Sum(e => e.AvgLoadTime*e.RecordCount)/win.Sum(e => e.RecordCount); return AvgLoadTime; } } public static class ExtensionMethods { [CepUserDefinedAggregate(typeof(WeightedAverage))] public static double WeightedAvg(this CepWindow<WqAggregate> window) { throw CepUtility.DoNotCall(); } }

Where WqAggregate is just a class with the various fields that are returned as a result of my query.

So, I can now write this code:

AvgLoadTime = win.WeightedAvg()

Nice! I just had to deploy the .dll where I wrote the extension to the StreamInsight server and I was good to go. Now, with StreamInsight 1.2, the need to do this all goes away with but for now, this turns about to be a reasonable fix.

7/12/2012 9:34:00 PM #

hollister  uk

This is often great. Another one stare upon this finding gratification and we are amazed. We are most certainly interested in this kind of issues. One particular appreciate their tip, and value your precious time inside this. Please keep editing.  

hollister uk

Add comment




biuquote
Loading



My VS Achievements