上篇文章介绍了查询模式中如何发现趋势,这篇博文将介绍StreamInsight中如何检测异常。
测试数据准备
为了方便测试查询,我们首先准备一个静态的测试数据源:
var now = DateTime.Parse("09/12/2011 8:57:00 PM"); var input = new[] {new { Time = now + TimeSpan.FromSeconds(1), Value = 20},new { Time = now + TimeSpan.FromSeconds(2), Value = 30},new { Time = now + TimeSpan.FromSeconds(3), Value = 120},new { Time = now + TimeSpan.FromSeconds(4), Value = 200},new { Time = now + TimeSpan.FromSeconds(5), Value = 20},new { Time = now + TimeSpan.FromSeconds(6), Value = 110},new { Time = now + TimeSpan.FromSeconds(7), Value = 110},new { Time = now + TimeSpan.FromSeconds(8), Value = 210},new { Time = now + TimeSpan.FromSeconds(9), Value = 120},new { Time = now + TimeSpan.FromSeconds(10), Value = 130},new { Time = now + TimeSpan.FromSeconds(11), Value = 20},new { Time = now + TimeSpan.FromSeconds(12), Value = 30}, };
接下去将上述数据源转变为点类型复杂事件流:
var inputStream = input.ToPointStream(Application, t =>PointEvent.CreateInsert(t.Time.ToLocalTime(), new { Value = t.Value }),AdvanceTimeSettings.IncreasingStartTime);
异常检测
问题:怎样每秒1次的计算过去5秒内Value字段值超过阈值100的事件数超过事件总数目80%的异常事件?
首先我们定义一下结果事件流中的负载类型SpikeEvent如下:
struct SpikeEvent {public double Ratio { get; set; } }
我们最终希望调用查询的方式如下:
int threshold = 100; double ratio = 0.8;var resultStream = DetectSpikes(inputStream,threshold, // 指定阈值(超过该阈值的事件被认为是“特殊事件”)ratio, // “特殊事件”占事件总数的百分比TimeSpan.FromSeconds(5), // 窗口大小TimeSpan.FromSeconds(1), // 跳跃大小e => e.Value); // 指定的比较字段
因此最关键的部分就是如何实现DetectSpikes。阅读过
《StreamInsight查询系列(十五)——查询模式之窗口比率》文章的读者应该对此类查询并不陌生。
这里不加过多描述地给出DetectSpikes的实现:
/// <summary> /// 在输入流中检测异常 /// </summary> /// <typeparam name="TInput">输入流事件类型</typeparam> /// <param name="inputStream">输入流</param> /// <param name="threshold">异常定义阈值</param> /// <param name="ratio">异常事件占事件总数的百分比</param> /// <param name="windowSize">衡量事件数目的窗口大小</param> /// <param name="hopSize">跳跃大小</param> /// <param name="fieldSelector">选择输入事件中的某个字段来检测事件类型</param> /// <returns>query that detects the spikes</returns> private static CepStream<SpikeEvent> DetectSpikes<TInput>(CepStream<TInput> inputStream, int threshold, double ratio,TimeSpan windowSize, TimeSpan hopSize,Expression<Func<TInput, int>> fieldSelector) {// 统计跳跃窗口内所有事件的数目var totalValues = from w in inputStream.HoppingWindow(windowSize,hopSize,HoppingWindowOutputPolicy.ClipToWindowEnd)select new{Count = w.Count(),};// 构造包含过滤条件的LINQ语句var parameter = fieldSelector.Parameters.First();var field = fieldSelector.Body;Expression<Func<TInput, bool>> filterExpression = (Expression<Func<TInput, bool>>)Expression.Lambda(Expression.GreaterThan(field, Expression.Constant(threshold)),parameter);// 统计跳跃窗口内异常事件的数目var bigValues = from w in inputStream.Where(filterExpression).HoppingWindow(windowSize,hopSize,HoppingWindowOutputPolicy.ClipToWindowEnd)select new{Count = w.Count(),};// 选择异常事件数目超过事件总数一定百分比的事件var output = from total in totalValues.ToPointEventStream()join big in bigValues.ToPointEventStream()on true equals truewhere big.Count * 1.0 / total.Count >= ratioselect new SpikeEvent { Ratio = big.Count * 1.0 / (total.Count) };return output; }
输出结果如下:
下一篇将介绍StreamInsight查询模式中如何检测间隙事件。