不久前,我为Camel 创建了Redis连接器。 Redis是很棒的键值存储(还有更多),但是随后我需要一个在与Camel相同的JVM中运行的缓存,并注意到Infinispan已切换到ASL v2 。 Camel中已经有其他用于在JVM上进行缓存的连接器,例如Hazelcast和EHCache,但是如果您已经将Camel用作其他Red Hat产品的一部分,或者想了解LIRS驱逐如何胜过LRU,那么Infinispan值得尝试。
简而言之,Infinispan是事务性内存键值存储和数据网格。 在嵌入式模式下使用时,Infinispan与Camel驻留在同一JVM中,并允许Camel使用者接收缓存更改通知:
<route><from uri="infinispan://localhost?cacheContainer=#cacheContainer&caseName=orders&eventTypes=CACHE_ENTRY_CREATED"/><filter><simple>${out.header.CamelInfinispanIsPre} == true</simple><to uri="log:com.mycompany.order?showHeaders=true"/></filter>
</route>
在上面的示例中,当创建缓存条目时,Infinispan将触发两个事件-一个事件在创建缓存条目之前和之后。 也可以同步接收事件,即在处理高速缓存操作的同一线程中接收事件,或在不阻止高速缓存操作的情况下在单独的线程中异步接收事件。
将Infinispan用作本地缓存很简单,它公开了ConcurrentMap接口,并具有通常的到期,收回,钝化,持久存储,查询等功能。 使Infinispan成为数据网格的是节点发现其他节点以及在它们之间复制或分发数据的能力。 复制允许跨集群共享数据,而分发使用一致的哈希算法来实现更好的可伸缩性。
在客户端-服务器模式下,Infinispan作为独立应用程序运行,并且Camel生产者可以使用Infinispan的Hot Rod客户端发送消息。 Hot Rod是一种二进制,语言无关的智能协议,允许以拓扑结构和散列分布感知方式与Infinisnap服务器进行交互。
位于骆驼的Infinispan生产商目前提供GET , PUT , REMOVE和CLEAR操作。 这是生产者将数据放入订单缓存的示例:
<route><from uri="direct:orderCache"/><setHeader headerName="CamelInfinispanKey"><simple>${in.header.orderId}</simple></setHeader><setHeader headerName="CamelInfinispanValue"><simple>${in.header.orderTotal}</simple></setHeader><setHeader headerName="CamelInfinispanOperation"><simple>CamelInfinispanOperationPut</simple></setHeader><to uri="infinispan://localhost?caseName=orders"/>
</route>
让我们创建一个更有趣的示例。 Infinispan也符合JTA规范,可以参与交易。 我们将创建一个用于注册人员的REST API,该API将首先使用Camel sql组件将该人员持久保存在关系数据库中,然后在同一事务中将firstName放入Infinispan缓存中。 我们将使用事务处理的 Camel路由来做到这一点,因此,如果在路由过程中发生错误,在任何阶段,Camel都会确保回滚事务(用于缓存和数据库),以便数据库和缓存始终处于一致的状态。
<route><from uri="restlet:/persons?restletMethod=POST"/><transacted/><!-- PERSIST TO DB --><to uri="sql:insert into person(firstName, lastName) values(:#firstName,:#lastName)?dataSource=#dataSource"/><!-- DAMN EXCEPTION THROWER--><filter><simple>${in.header.lastName} == "damn"</simple><throwException ref="damn"/></filter><!-- PUT TO CACHE --><to uri="sql:select id from person WHERE id = (select max(id) from person)?dataSource=#dataSource"/><setHeader headerName="personId"><simple>${body[0][ID]}</simple></setHeader><setHeader headerName="CamelInfinispanKey"><simple>${headerAs(personId, String)}</simple></setHeader><setHeader headerName="CamelInfinispanValue"><simple>${in.header.firstName}</simple></setHeader><setHeader headerName="CamelInfinispanOperation"><simple>CamelInfinispanOperationPut</simple></setHeader><to uri="infinispan://localhost?cacheContainer=#cacheContainer&caseName=orders"/>
</route>
如您所见,路由中没有任何魔术或额外的配置,这是一条标准路由。 我们只有一小段代码,当该人的lastName被该死以模拟路线中间的错误时,将引发异常。
该应用程序使用atomikos JTA事务管理器以独立模式运行。 首先,我们创建一个JtaTransactionManager ,以供交易路线使用:
<bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"/>
<bean id="userTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"/>
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> <constructor-arg ref="userTransaction"/><constructor-arg ref="userTransactionManager"/>
</bean>
然后用它包装我们的数据源 :
public AtomikosDataSourceBean atomikosDataSourceBean() throws Exception {EmbeddedXADataSource ds = new EmbeddedXADataSource();ds.setCreateDatabase("create");ds.setDatabaseName("target/testdb");ds.setUser("");ds.setPassword("");AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();xaDataSource.setXaDataSource(ds);xaDataSource.setUniqueResourceName("xaDerby");return xaDataSource;
}
并使用TransactionManagerLookup告诉Infinispan参与同一笔交易:
public BasicCacheContainer basicCacheContainer() throws Throwable {GlobalConfiguration glob = new GlobalConfigurationBuilder().nonClusteredDefault().build();Configuration loc = new ConfigurationBuilder().transaction().transactionMode(TransactionMode.TRANSACTIONAL).transactionManagerLookup(new TransactionManagerLookup() {@Overridepublic TransactionManager getTransactionManager() throws Exception {return jtaTransactionManager.getTransactionManager();}}).build();return new DefaultCacheManager(glob, loc, true);
}
完成所有这些样板代码之后,我们就有了数据源,缓存和Camel路由参与同一事务。 要查看具有两个阶段提交和回滚的完整REST示例,请从github获取源代码并进行使用。
BTW Camel-infinispan组件仍然不是Camel主干的一部分,要运行示例,您也将需要它 。
翻译自: https://www.javacodegeeks.com/2013/08/transactional-caching-for-camel-with-infinispan.html