在我先前关于复杂事件处理的博客文章中 ,我演示了使用Esper,开源CEP软件和Twitter4J API处理来自Twitter的推文流的方法。 但是,CEP产品不仅仅只处理一个数据流。 单个数据流可以通过标准的异步消息传递平台轻松处理,并且不会带来非常具有挑战性的可伸缩性或延迟问题。 但是,当涉及消费多个实时数据流并进行实时分析时,并且当数据流之间的相关性很重要时,没有什么比CEP平台更胜一筹了。 源馈送流媒体平台的速度,数量和复杂性可能会有所不同。 真正的企业级CEP应该轻松有效地处理各种实时高速数据,例如股票行情自动收录器和速度较慢但数量众多的脱机批量上传。 除了提供标准接口之外,CEP还应该提供一种更简单的编程语言来查询流数据并通过诸如模式匹配和快照查询之类的功能来生成连续的情报。
Sybase交易平台– RAP版本。 引用网址 |
为了保持简单性和高水平,CEP可以分为三个基本部分。 第一种是获取/使用源数据的机制。 接下来是调查数据,识别事件和模式,然后通过为目标系统提供可操作的项与目标系统进行交互的过程。 可执行事件采用不同的形式和格式,具体取决于您使用CEP的应用程序。 一个行动项目可能是–根据风险监控应用程序中计算的风险出售股票头寸。 通过读取化工厂中的数千个传感器来指示洗钱应用程序中的潜在欺诈事件或监视系统中的灾难性事件。 从字面上看,有成千上万种情况是无法手动和离线检查数据的。 在完成以下部分之后,您可能需要自己尝试Aleri。 此链接http://www.sybase.com/aleriform可以直接将您带到Aleri下载页面。 可从Sybase的官方网站免费获得有效期为90天的评估副本。 大量的文档,出色的教程和网站上的一些示例代码应该可以帮助您快速入门。
如果您是任何CEP产品的现有用户,我建议您将Aleri与该产品进行比较,并与社区共享或在此博客上发表评论。 根据一些过时的估计,Tibco CEP是市场上最大的CEP供应商。 我不确定StreamBase另一个领先产品有多少市场份额。 您还可以在Youtube.com上观看 网络研讨会 ,该研讨会总体上介绍了CEP的好处,以及具体介绍了Streambase的一些关键功能。 对于新手来说,这是CEP和资本市场用例的绝佳介绍。
通过使用Studio(gui)或使用Splash(语言)或通过使用Aleri Modeling语言(ML)创建模型来构建Aleri CEP上的应用程序,这是部署之前的最后阶段。
以下是Splash的主要功能列表。
- 数据类型 –支持标准数据类型和XML。 还为用户定义的数据类型支持'Typedef'。
- 访问控制 –粒度级别的访问控制,允许访问一个或多个流(包含许多流)
- SQL –建立模型的另一种方式。 由于其视觉范式,构建Aleri工作室模型可能需要更长的时间。 精通SQL的人应该可以使用Aleri SQL更快地完成它,这与众所周知的常规SQL非常相似。
- 联接 –支持的联接为内部,左侧,右侧和完全联接
- 过滤器表达式 –包括何处,拥有,分组拥有
- ML – Aleri SQL以Aleri建模语言(ML)生成数据模型–熟练的ML用户可能仅使用ML(代替Aleri Studio和Aleri SQL)来构建模型。
- 模式匹配语言 –包括诸如“内部”以指示间隔(滑动窗口),“从”以指示数据流和有趣的“ fby”以指示序列的结构(其后为)
- 用户定义的函数 – Splash中提供的用户定义的函数接口使您可以用C ++或Java创建函数,并在模型的Splash表达式中使用它们。
高级模式匹配–功能在此处通过示例进行说明。 –以下三个代码段及其说明直接取自Sybase有关Aleri的文档。
第一个示例检查以查看经纪人是否发送与其他的客户之一相同的股票的买单,然后为该客户插入买单,然后出售该股票。 当这些动作按顺序发生时,它将创建一个“ buyahead”事件。
within 5 minutes
from
BuyStock[Symbol=sym; Shares=n1; Broker=b; Customer=c0] as Buy1,
BuyStock[Symbol=sym; Shares=n2; Broker=b; Customer=c1] as Buy2,
SellStock[Symbol=sym; Shares=n1; Broker=b; Customer=c0] as Sell
on Buy1 fby Buy2 fby Sell
{
if ((b = c0) and (b != c1)) {
output [Symbol=sym; Shares=n1; Broker=b];
}
}
本示例使用fby关系检查三个事件,一个接一个。 因为在三个模式中使用了相同的变量sym,所以三个事件中的值必须相同。 但是,不同的变量可能具有相同的值(例如n1和n2。),如果Buy1和Sell事件中的Broker和Customer相同,而Buy2事件中的Customer不同,则它将输出一个事件。
下一个示例显示对事件的布尔运算。 该规则描述了一种可能的盗窃情况,即在架子上有商品读取时(可能通过RFID),然后没有对该商品进行结帐,然后在门附近的扫描仪上读取了该商品。
within 12 hours
from
ShelfReading[TagId=tag; ProductName=pname] as onShelf,
CounterReading[TagId=tag] as checkout,
ExitReading[TagId=tag; AreaId=area] as exit
on onShelf fby not(checkout) fby exit
output [TagId=t; ProductName=pname; AreaId=area];
下一个示例显示了如果用户尝试在5分钟内三次未成功登录帐户,则如何发出警报。
from
LoginAttempt[IpAddress=ip; Account=acct; Result=0] as login1,
LoginAttempt[IpAddress=ip; Account=acct; Result=0] as login2,
LoginAttempt[IpAddress=ip; Account=acct; Result=0] as login3,
LoginAttempt[IpAddress=ip; Account=acct; Result=1] as login4
on (login1 fby login2 fby login3) and not(login4)
output [Account=acct];
希望闯入计算机系统的人们经常扫描多个TCP / IP端口以查找开放的端口,并尝试利用侦听这些端口的程序中的漏洞。 这是一条规则,检查是否单个IP地址尝试在三个端口上进行连接,以及是否使用“ sendmail”程序进行了连接。
within 30 minutes
from
Connect[Source=ip; Port=22] as c1,
Connect[Source=ip; Port=23] as c2,
Connect[Source=ip; Port=25] as c3
SendMail[Source=ip] as send
on (c1 and c2 and c3) fby send
output [Source=ip];
Aleri提供了许多现成的接口,可轻松与源系统和目标系统集成。 通过这些接口/适配器,Aleri平台可以与标准关系数据库,消息传递框架(如IBM MQ),套接字和文件系统文件进行通信。 Aleri可以通过标准化接口轻松使用csv,FIX,路透社市场数据,SOAP,http,SMTP等各种格式的数据。
以下是将Aleri与其他系统集成的可用技术。
- Java,C ++和点网提供了发布/订阅API-一种标准的发布/订阅机制
- 通过ODBC和JDBC连接使用带有SELECT,UPDATE,DELETE和INSERT语句的SQL接口 。
- 内置用于市场数据和FIX的适配器
在本系列的下一部分中,我们将介绍Aleri Studio,它是可以帮助我们轻松构建CEP应用程序的gui。
在我的上一篇文章中,对Sybase的复杂事件处理平台Aleri进行了高级概述。
本周,让我们回顾一下Aleri Studio,Aleri平台的用户界面以及pub / sub api的使用,这是与Aleri平台进行交互的多种方式之一。 该工作室是平台不可或缺的一部分,并随附免费的评估版。 如果您尚未这样做,请从此处下载副本。 Aleri产品的安装过程非常简单,几分钟即可启动并运行。
aleri工作室是用于构建模型的创作平台,该模型定义了各种数据流之间的交互作用和排序。 它还可以合并多个流以形成一个或多个流。 使用这个基于Eclipse的工作室,您可以通过向其提供测试数据来测试所构建的模型,并实时监控流中的活动。 让我们看一下您可以在Aleri中定义的各种流及其功能。
源流 –只有这种类型的流才能处理传入的数据。 传入数据可以执行的操作是插入,更新,删除和向上插入。 Upsert,顾名思义,如果流中已经存在定义行的键,则更新数据。 否则,它将在流中插入一条记录。
聚合流 –此流为由特定属性定义的每个组创建摘要记录。 这提供了与ANSI SQL中的“分组依据”等效的功能。
复制流 –通过复制另一个流但使用不同的保留规则来创建此流。
计算流 –该流允许您在数据的每一行上使用一个函数来为数据流的每一行获取新的计算元素。
扩展流 –该流是通过其他列表达式从另一个流派生的
过滤流 –您可以为此流定义过滤条件。 就像扩展和计算流一样,此流在其他流上应用过滤条件以派生新流。
Flex Stream –通过自定义编码方法,在处理流数据方面具有显着的灵活性。 只有此流允许您编写自己的方法以满足特殊需求。
加入流 –通过在某些条件下加入两个或多个流来创建新流。 内连接和外连接均可用于连接流。
模式流 –模式匹配规则与此流一起应用
联合流 –顾名思义,这将连接具有相同行数据结构的两个或多个流。 与加入流不同,此流包含来自所有参与流的所有数据。
通过使用其中一些流和Aeri的pub api,我将演示将Twitter实时提要隔离到两个不同的流中。 Twitter实时提要由Twitter4j库中的侦听器使用。 如果您只想先尝试使用Twitter4j库,请按照我之前的文章“ 在Twitter上跟踪用户情绪 ”。 通过使用Aleri的发布API,将twitter4j侦听器接收的数据馈送到我们模型中的源流。 在本练习中,我们将尝试根据推文的内容将其分离出来。 基于我之前的帖子中的示例,我们将根据内容将传入流分为两个流。 一个流将获取任何包含'lol'的推文,而另一个流将在文本中显示带有笑脸“ :)”的推文。 首先,让我们列出使它成为一个可行示例所需执行的任务。
- 创建具有三个流的模型
- 验证模型没有错误
- 创建一个静态数据文件
- 启动Aleri服务器,并将静态数据文件手动输入到流中,以确认模型正确工作。
- 编写Java代码以使用Twitter提要。 使用发布API将推文发布到Aleri平台。
- 运行演示并观看实时数据流经各种流的过程。
该图像是Aleri Studio的三个流的快照-左侧的一个名为“ tweets”是源流,右侧的两个名为“ lolFilter”和“ smileyFilter”属于过滤器类型。 源流接受传入的数据,而过滤器流接收已过滤的数据。 这是我定义过滤条件的方式-例如(tweets.text,'%lol%')。 tweets是流的名称,text是我们感兴趣的流中的字段。%lol%表示,选择内容中带有“ lol”字符串的任何tweet。 每个流只有2个字段-id和text。 ID和文本映射到Twitter发送的ID和文本消息。 定义模型后,您可以通过单击顶部功能区中的复选标记来检查是否有任何错误。 如果出现任何错误,则会在图像右下方的面板中显示。 一旦您的模型没有错误,就可以进行测试了。
下图显示了Studio的测试界面。 首先尝试使用静态数据文件运行模型。 顶部的红色小方块表示Aleri服务器当前正在运行。 右下角的控制台窗口显示服务器消息,例如成功启动和停止等。在左窗格中的“运行测试”选项卡上,您可以在其中选择静态数据文件来馈送源流。 右侧窗格显示所有当前正在运行的流以及由流处理的实时数据。
下图显示了用于测试模型的数据文件的格式
tweets ALERI_OPS="i" id="1" text="324test 1234" ;
tweets ALERI_OPS="i" id="2" text="test 12345";
tweets ALERI_OPS="i" id="3" text="test 1234666" ;
tweets ALERI_OPS="i" id="4" text="test 1234888" ;
tweets ALERI_OPS="i" id="5" text="test 1234999" ;
此练习的源代码在底部。
请记住,您需要在构建路径中具有twitter4j库,并在运行程序之前运行Aleri服务器。 因为我没有在执行线程中添加任何计时器,所以停止执行的唯一方法是中止执行。 为了简洁起见,并且为了使代码行简短,我删除了所有异常处理和日志记录。 该代码仅利用Aleri的pub / sub api的发布部分。 我将在我的下一篇博文中演示api的sub side的用法。
package com.sybase.aleri;import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterException;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.Configuration;
import twitter4j.conf.ConfigurationBuilder;import com.aleri.pubsub.SpGatewayConstants;
import com.aleri.pubsub.SpObserver;
import com.aleri.pubsub.SpPlatform;
import com.aleri.pubsub.SpPlatformParms;
import com.aleri.pubsub.SpPlatformStatus;
import com.aleri.pubsub.SpPublication;
import com.aleri.pubsub.SpStream;
import com.aleri.pubsub.SpStreamDataRecord;
import com.aleri.pubsub.SpStreamDefinition;
import com.aleri.pubsub.SpSubscription;
import com.aleri.pubsub.SpSubscriptionCommon;
import com.aleri.pubsub.impl.SpFactory;
import com.aleri.pubsub.impl.SpUtils;
import com.aleri.pubsub.test.ClientSpObserver;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Vector;
import java.util.TimeZone;public class TwitterTest_2 {//make sure that Aleri server is running prior to running this programstatic {//creates the publishing platformcreatePlatform();}// Important objects from the publish APIstatic SpStream stream;static SpPlatformStatus platformStatus;static SpPublication pub;public static void main(String[] args) throws TwitterException, IOException {TwitterTest_2 tt2 = new TwitterTest_2();ConfigurationBuilder cb = new ConfigurationBuilder();cb.setDebugEnabled(true);//use your twitter id and passcodecb.setUser("Your user name");cb.setPassword("Your Password");// creating the twitter4j listenerConfiguration cfg = cb.build();TwitterStream twitterStream = new TwitterStreamFactory(cfg).getInstance();StatusListener_1 listener;listener = new StatusListener_1();twitterStream.addListener(listener);//runs the sample that comes with twitter4jtwitterStream.sample();}private static int createPlatform() {int rc = 0;//Aleri platform configuration - better alternative is to your properties fileString host = "localhost";int port = 22000;//aleri configured to run with empty userid and pwd stringsString user = "";String password = "";//name of the source stream - the one that gets the data from the twitter4jString streamName = "tweets";String name = "TwitterTest_2";SpPlatformParms parms = SpFactory.createPlatformParms(host, port, user,password, false, false);platformStatus = SpFactory.createPlatformStatus();SpPlatform sp = SpFactory.createPlatform(parms, platformStatus);stream = sp.getStream(streamName);pub = sp.createPublication(name, platformStatus);// Then get the stream definition containing the schema information.SpStreamDefinition sdef = stream.getDefinition();
/*int numFieldsInRecord = sdef.getNumColumns();Vector colTypes = sdef.getColumnTypes();Vector colNames = sdef.getColumnNames();*/return 0;}static SpStream getStream() {return stream;}static SpPlatformStatus getPlatformStatus() {return platformStatus;}static SpPublication getPublication() {return pub;}static int publish(SpStream stream, SpPlatformStatus platformStatus,SpPublication pub, Collection fieldData) {int rc = 0;int i = pub.start();SpStreamDataRecord sdr = SpFactory.createStreamDataRecord(stream,fieldData, SpGatewayConstants.SO_UPSERT,SpGatewayConstants.SF_NULLFLAG, platformStatus);Collection dataSet = new Vector();dataSet.add(sdr);System.out.println("\nAttempting to publish the data set to the Platform for stream <"+ stream.getName() + ">.");rc = pub.publishTransaction(dataSet, SpGatewayConstants.SO_UPSERT,SpGatewayConstants.SF_NULLFLAG, 1);// commit blocks the thread until data is consumed by the platformSystem.out.println("before commit() call to the Platform.");rc = pub.commit();return 0;}}
参考: Aleri –复杂事件处理–第一部分 , 理解Aleri –复杂事件处理–第二部分来自我们JCG合作伙伴 Mahesh Gadgil在“ 简单而实用”的博客上。
翻译自: https://www.javacodegeeks.com/2012/04/aleri-complex-event-processing.html