上篇文章介绍了查询模式中如何检测异常事件,这篇博文将介绍StreamInsight中如何检测间隙事件。
测试数据准备
为了方便测试查询,我们首先准备一个静态的测试数据源:
// 创建数据源,要注意的是4:16和4:30之间存在的事件间隙 var sourceData = new [] {new { Status = 1, TimeStamp = new DateTime(2009, 10, 23, 4, 12, 0) },new { Status = 0, TimeStamp = new DateTime(2009, 10, 23, 4, 13, 0) },new { Status = 1, TimeStamp = new DateTime(2009, 10, 23, 4, 14, 0) },new { Status = 0, TimeStamp = new DateTime(2009, 10, 23, 4, 15, 0) },new { Status = 0, TimeStamp = new DateTime(2009, 10, 23, 4, 16, 0) },new { Status = 1, TimeStamp = new DateTime(2009, 10, 23, 4, 30, 0) },new { Status = 0, TimeStamp = new DateTime(2009, 10, 23, 4, 35, 0) }, };
接下去将上述数据源转变为点类型复杂事件流:
var source = sourceData.ToPointStream(Application, ev => PointEvent.CreateInsert(ev.TimeStamp.ToLocalTime(), ev),AdvanceTimeSettings.StrictlyIncreasingStartTime);
间隙事件检测
问题:怎样在计算聚合时加入间隙事件?
我们先执行一个基本的计数聚合查询,如“计算过去5分钟内事件的数目?”:
var countQuery = from window in source.TumblingWindow(TimeSpan.FromMinutes(5), HoppingWindowOutputPolicy.ClipToWindowEnd)select new { count = window.Count() };
结果如下:
读者可以发现在计数结果中并没有[4:20:00, 4:30:00)区间的事件,这里我们把这些未出现的事件称为“间隙事件”。
下面要解决的问题是如何在结果中加入并显示这些“间隙事件”。解决该问题的主要思路是采用左反半部联接:
第1步,定义起点在无穷小事件,终点在无穷大时间的参考事件流
var defaultEvent = new[] { new { count = (long)0 } };var defaultStream = defaultEvent.ToIntervalStream(Application, ev =>IntervalEvent.CreateInsert(DateTime.MinValue.ToUniversalTime(), DateTime.MaxValue.ToUniversalTime(), ev),AdvanceTimeSettings.StrictlyIncreasingStartTime);
结果如下:
第2步,使用左反半部联接找出间隙事件:
var gaps = from def in defaultStream where (from right in countQuery where true select right).IsEmpty()select def;
结果如下:
第3步,将countQuery和gaps联接并输出结果。
(from e in countQuery.Union(gaps).ToIntervalEnumerable()where e.EventKind == EventKind.Insertselect new { e.StartTime, e.EndTime, e.Payload.count }).Dump();
结果如下:
下一篇将介绍StreamInsight查询模式中如何使用地理数据。