StreamInsight Nuggets

March 22 2012

Here’s some choice quotes from a whitepaper on StreamInsight:

Relational database applications typically acquire data and store it to disk before it can be analyzed. We therefore call analysis with traditional relational database systems query-driven. Query-driven analysis is well-suited for historical data. … To reach the necessary performance and scale, [some] applications need to analyze the data in near real time while it is being acquired from the source. We denote these applications as event-driven applications because new event data arriving at the system triggers the necessary analysis.

I think this is a great intro to help wrap yr head around the difference between query-driven and event-driven analysis.

Microsoft StreamInsight is Microsoft’s platform to build high-throughput, low-latency event-driven analytics applications.

That’s a great one liner.

With StreamInsight, business insight is delivered at the speed at which data is produced, as opposed to the speed at which traditional reports are processed or consumed.

Nicely worded value prop.

StreamInsight’s runtime performs calculations incrementally whenever possible. This means that the processing only involves the data for the current result and the new event. Unlike in traditional databases, updating a report with aggregates or KPIs with StreamInsight does not require to re-iterate through past data once a new event comes in. Instead, StreamInsight answers continuous queries with a single pass over all the data, which is an important capability for long-running, potentially infinite, standing queries. Incremental processing is one key performance benefit of StreamInsight.

That is rad engineering methinks.

StreamInsight automatically distributes the processing across the available processor cores on the system as well. Thread management and query parallelization are performed automatically by the system.

How cool is that!

Creating A Weighted Average User Defined Aggregate in StreamInsight 1.1

November 16 2011

I’ve been doing some prototyping StreamInsight lately and hit the following issue. I needed to write a weighted average so that I could roll up result sets.  Let me elaborate on the scenario.  I had a query whose HoppingWindow has a WindowSize of 1 minute.  One of the things the query returns is an average of one of the values returned by the event payload:

AvgLoadTime = win.Avg(e => e.LoadTime)

I then want to take all the results of that query and roll them up into a larger window of 1 hour. So that means writing a new query that uses the results of the first query for its calculation. But, if I were to simply take the average of the average, I’d get inaccurate results. What I need was the weighted mean.

To do so, I changed the code to look like this:

AvgLoadTime = win.Sum(e => e.AvgLoadTime * e.RecordCount)/win.Sum(e => e.RecordCount)

But when I deployed to StreamInsight 1.1 and got no love:

Microsoft.ComplexEventProcessing.Linq.QueryGenerationException: Expression '(e.Sum(e => (e.AvgLoadTime * Convert(e.RecordCount))) / Convert(e.Sum(e => e.RecordCount)))' contains more than one aggregate method call, which is not supported.

Doh. But then I got hip to user-defined aggregates. I found the documentation on this MSDN page essential and it helped me to write my extension, which looks like this:

public class WeightedAverage : CepAggregate<WqAggregate, double> { public override double GenerateOutput(IEnumerable<WqAggregate> win) { var AvgLoadTime = win.Sum(e => e.AvgLoadTime*e.RecordCount)/win.Sum(e => e.RecordCount); return AvgLoadTime; } } public static class ExtensionMethods { [CepUserDefinedAggregate(typeof(WeightedAverage))] public static double WeightedAvg(this CepWindow<WqAggregate> window) { throw CepUtility.DoNotCall(); } }

Where WqAggregate is just a class with the various fields that are returned as a result of my query.

So, I can now write this code:

AvgLoadTime = win.WeightedAvg()

Nice! I just had to deploy the .dll where I wrote the extension to the StreamInsight server and I was good to go. Now, with StreamInsight 1.2, the need to do this all goes away with but for now, this turns about to be a reasonable fix.