I’ve been doing some prototyping StreamInsight lately and hit the following issue. I needed to write a weighted average so that I could roll up result sets.  Let me elaborate on the scenario.  I had a query whose HoppingWindow has a WindowSize of 1 minute.  One of the things the query returns is an average of one of the values returned by the event payload:

AvgLoadTime = win.Avg(e => e.LoadTime)

I then want to take all the results of that query and roll them up into a larger window of 1 hour. So that means writing a new query that uses the results of the first query for its calculation. But, if I were to simply take the average of the average, I’d get inaccurate results. What I need was the weighted mean.

To do so, I changed the code to look like this:

AvgLoadTime = win.Sum(e => e.AvgLoadTime * e.RecordCount)/win.Sum(e => e.RecordCount)

But when I deployed to StreamInsight 1.1 and got no love:

Microsoft.ComplexEventProcessing.Linq.QueryGenerationException: Expression '(e.Sum(e => (e.AvgLoadTime * Convert(e.RecordCount))) / Convert(e.Sum(e => e.RecordCount)))' contains more than one aggregate method call, which is not supported.

Doh. But then I got hip to user-defined aggregates. I found the documentation on this MSDN page essential and it helped me to write my extension, which looks like this:

public class WeightedAverage : CepAggregate<WqAggregate, double> { public override double GenerateOutput(IEnumerable<WqAggregate> win) { var AvgLoadTime = win.Sum(e => e.AvgLoadTime*e.RecordCount)/win.Sum(e => e.RecordCount); return AvgLoadTime; } } public static class ExtensionMethods { [CepUserDefinedAggregate(typeof(WeightedAverage))] public static double WeightedAvg(this CepWindow<WqAggregate> window) { throw CepUtility.DoNotCall(); } }

Where WqAggregate is just a class with the various fields that are returned as a result of my query.

So, I can now write this code:

AvgLoadTime = win.WeightedAvg()

Nice! I just had to deploy the .dll where I wrote the extension to the StreamInsight server and I was good to go. Now, with StreamInsight 1.2, the need to do this all goes away with but for now, this turns about to be a reasonable fix.