sqlstreaminsight

Hopping Windows / SQL StreamInsight / from a table?


Maybe this can be done without StreamInsight, but I'm curious.

I have an application that is populating a table with "messages" (inserts a row in the table).

I want to create a monitoring application that monitors this table for the rate at which messages are "arriving", and how quickly they are "processed" (flag gets updated).

As this is a vendors application, I don't want to drop in a trigger or anything. But I can query the db and the table has a PK using an identity column.

How can I get to a hopping window query? I would love to show a line graph for the say the past 30 minutes showing the rate of messages coming in, and the rate at which the messages are process.ed.


Solution

  • Depending on what information is captured in this table of messages, I think you could probably do this faster by just running a SQL query.

    If you are still wanting to use StreamInsight to do this, here's some code to get you started.

    var app = Application;
    var interval = TimeSpan.FromSeconds(1);
    var windowSize = TimeSpan.FromSeconds(10);
    var hopSize = TimeSpan.FromSeconds(1);
    
    /* Replace the Observable.Interval with your logic to poll the database and
       convert the messages to instances of TPayload. It just needs to be a class
       that implements the IObservable<TPayload> interface. */
    var observable = app.DefineObservable(()=> Observable.Interval(interval));
    
    // Convert the observable to a point streamable.
    var streamable = observable.ToPointStreamable(
                e=> PointEvent.CreateInsert(DateTimeOffset.Now, e),
                AdvanceTimeSettings.IncreasingStartTime);
    
    /* Using the streamable from the step before, write your actual LINQ queries
       to do the analytics you want. */
    var query = from win in streamable.HoppingWindow(windowSize, hopSize)
            select new Payload{
                Timestamp = DateTime.UtcNow,
                Value = win.Count()
            };
    
    /* Create a sink to output your events (WCF, etc). It just needs to be a
       class that implements the IObserver<TPayload> interface. The
       implementation is highly dependent on your needs. */
    var observer = app.DefineObserver(()=> Observer.Create<Payload>(e => e.Dump()));
    
    query.Bind(observer).Run();