轮询一个空目录(并发送一个空消息,正文为空):
from('file://temp?sendEmptyMessageWhenIdle=true')
停止路线:
.process(new Processor() {public void process(Exchange exchange) throws Exception {getContext().stopRoute('ROUTE_ID');}
})
访问主体中对象的属性:
承认对象有一个名为'getMydata()'的方法:
new ValueBuilder(simple('${body.mydata}')).isEqualTo(...)
定义一个聚合器:
.aggregate(simple('${header.id}.substring(0,15)'), genericAggregationStrategy)
.completionPredicate(header(Exchange.BATCH_COMPLETE).isEqualTo(Boolean.TRUE))
-
'${header.id}.substring(0,15)'
:标记以区分消息(此处,返回的字符串是所有消息所共有,我们将它们汇总在一起) -
Exchange.BATCH_COMPLETE
:谓词表示轮询结束(例如,解析的所有文件) -
genericAggregationStrategy
:上面是一个聚合器的示例,该聚合器将列表中所有消息的内容分组:
public class GenericAggregationStrategy implements AggregationStrategy {@SuppressWarnings('unchecked')public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {if (oldExchange == null) {ArrayList<Object> list = new ArrayList<Object>();list.add(newExchange.getIn().getBody());newExchange.getIn().setBody(list);return newExchange;} else {Object oldIn = oldExchange.getIn().getBody();ArrayList<Object> list = null;if(oldIn instanceof ArrayList) {list = (ArrayList<Object>) oldIn;} else {list = new ArrayList<Object>();list.add(oldIn);}list.add(newExchange.getIn().getBody());newExchange.getIn().setBody(list);return newExchange;}}
}
手动触发聚合的完成(无论它是什么):
发送标题为Exchange.AGGREGATION_COMPLETE_ALL_GROUPS = true
的消息
可以执行from('bean:...')
,因为知道该bean将被永久轮询(如使用'file')并每次都被实例化。 使用以下命令修改路线上的邮件正文 :
.transform(myExpression)
与myExpression
:
public class MyExpression implements Expression {public <T> T evaluate(Exchange exchange, Class<T> type) {MyBean newData = ...;return exchange.getContext().getTypeConverter().convertTo(type, newData);}
}
使用JaxB:
- 在路线上:
.[un]marshal().jaxb('my.business_classes.package')
- 具有可配置的DataFormat:
.[un]marshal(jaxbDataFormat)
与:
// indicate to Jaxb to not write XML prolog :
JaxbDataFormat jaxbDataFormat = new JaxbDataFormat('my.business_classes.package');
jaxb.setFragment(true);
线程管理的一般概念:
- 一个
from(...)
=一个线程 - 除了
from('direct:...')
wich会创建一个具有唯一标识符的“命名路由”,该标识符只能由另一条路由(与调用方在同一线程中)调用。 - 组件
.resequence().batch()
创建一个新线程以重新抛出消息。
定义关闭策略:
getContext().setShutdownStrategy(new MyShutdownStrategy(getContext()));
与:
public class MyShutdownStrategy extends DefaultShutdownStrategy {protected CamelContext camelContext;private long timeout = 1;private TimeUnit timeUnit = TimeUnit.SECONDS;public SpiralShutdownStrategy(CamelContext camelContext) {this.camelContext = camelContext;}@Overridepublic long getTimeout() {return this.timeout;}@Overridepublic TimeUnit getTimeUnit() {return this.timeUnit;}@Overridepublic CamelContext getCamelContext() {return this.camelContext;}/*** To ensure shutdown**/@Overridepublic void suspend(CamelContext context, List<RouteStartupOrder> routes) throws Exception {doShutdown(context, routes, getTimeout(), getTimeUnit(), false, false, false);}/*** To ensure shutdown**/@Overridepublic void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception {doShutdown(context, routes, this.timeout, this.timeUnit, false, false, false);}/*** To ensure shutdown**/@Overridepublic boolean shutdown(CamelContext context, RouteStartupOrder route, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout)throws Exception {super.shutdown(context, route, this.timeout, this.timeUnit, false);return true;}
}
停止批处理:
.process(new Processor() {public void process(Exchange exchange) throws Exception {context.stop();}
});
从路由调用bean的方法:
- 方法的返回始终会影响到消息的正文。 例如 :
-
public void myMethod(Exchange e)
:
不会修饰身体 -
public boolean myMethod(Exchange e)
:
布尔值(或任何原始类型)将在主体中设置 -
public Object myMethod(Exchange e)
:
该对象将被放置在主体中(即使为null) -
public Message myMethod(Exchange e)
:
邮件将被放置在正文中(最好避免这种情况) -
public List<Object> myMethod(Exchange e)
:
该列表将在正文中设置:与.split()
使用时,每个对象将以新消息发送 -
public List<Message> myMethod(Exchange e)
:
该列表将在正文中设置:.split()
将为每个元素创建一条新消息(最好避免,见上)
-
- 可配置方法的参数:
-
public void myMethod(Exchange e)
:
完整的交换将通过 -
public void myMethod(Object o)
:
骆驼将尝试在所需参数的类中转换主体 -
public void myMethod(@Body File o, @Header('myHeader') String myParamHeader)
:
骆驼将按规定注入每个参数
-
路线上的例外管理:
- 以全局方式(在所有路径之前声明):
onException(MyException.class, RuntimeCamelException.class).to(...)...
- 真正处理异常而不在路径(和日志)中冒泡:
onException(...).handled(true).to(...)...
- 在Exception之后继续在路由中进行处理:
onException(...).continued(true).to(...)...
- 例外是“已处理”或“继续”
- 本地方式(沿路线):
from(...) .onException(...).to('manage_error').log('FAIL !!').end() .to('continue_route')...
要写入文件,只需要标题Exchange.FILE_NAME
。
使用
- 使用表达式从唯一的可比较“键”(数字,字符串或自定义比较器)计算消息的新顺序
- 两种方式:
- .batch():批处理模式。
用标记分裂身体:
.split(body().tokenize('TOKEN'))
知道令牌将从内容中删除。 例如,如果收到包含以下内容的消息:“ data1TOKENdata2TOKENdata3”,则创建的消息将为:“ data1”,“ data2”,“ data3”。 因此,在处理XML数据时应避免这种情况,建议使用“ tokenizeXML()”。
动态访问人体数据:
- 轻量级“脚本”语言: 简单表达语言
- 读取文件数据: 文件表达语言
发送邮件:
from('direct:mail').setHeader('To', constant(mailTo)).setHeader('From', constant(mailFrom)) .setHeader('Subject', constant(mailSubject)) .to('smtp://${user}@${server}:${port}?password=${password}');
带有附件:
.beanRef(MAIL_ATTACHER, 'attachLog');
//with
public class MailAttacher {public void attachLog(Exchange exc) throws Exception {File toAttach = ...; exc.getIn().addAttachment(toAttach.getName(), new DataHandler(new FileDataSource(toAttach)));// if neededexc.setProperty(Exchange.CHARSET_NAME, 'UTF-8');}
}
有用的Exchange属性:
- Exchange.AGGREGATED_ *:聚合管理
- Exchange.BATCH_ *:已处理的邮件管理
- Exchange.FILE_ *:文件消息管理
- Exchange.HTTP_ *:Web请求管理
- Exchange.LOOP_ *:循环管理
- Exchange.REDELIVERY_ *:异常管理
- Exchange.SPLIT_ *:分割内容管理
循环路线:
from('direct:...')
.loop(countExpression)
.to('direct:insideLoop')
.end()
其中“ countExpression”是用于动态计算循环计数(评估进入循环)的表达式。 如果过程很复杂,最好将循环的代码移动到新的路径中。
标头管理:
消息的标题是在其创建时定义的。 当使用'.split()'时,所有后续消息都将具有与原始消息相同的标头(因此,在管理文件时要小心)。 聚合中,必须手动管理自定义标头,以保留在其余路由中。
截取消息
并执行路由并行化(在路由之前声明):
interceptSendToEndpoint('ENDPOINT_TO_INTERSEPT').to(...)...
参考: Developpef博客上来自我们JCG合作伙伴 Paul-Emmanuel的Apache Camel Cheatsheet 。
翻译自: https://www.javacodegeeks.com/2013/01/apache-camel-cheatsheet.html