- Java的Apache Camel入门
- 使用CamelRunner改善路线的启动
- 使用Camel构建基于消息的应用程序
但是,由于我准备了包含所有这些材料的camel-demo-1.0.0-SNAPSHOT-project.zip ,因此我认为将它们组合并整体呈现会更容易。
Java的Apache Camel入门
用很少的Groovy行尝试Camel是一回事,但是用Java进行全面的项目则是另一回事。 今天,我将向您展示如何通过基于Maven的项目在Apache Camel上开始工作。 您也可以使用提供的camel-demo
作为项目模板来启动您自己的Apache Camel项目。 您只需要重命名Java包,并重命名pom的组和工件ID即可满足您的需要。
准备具有Camel依赖关系的基于Maven的项目
解压缩camel-demo
项目源代码,您将看到基本的目录布局。
camel-demo+- bin+- config+- data+- src+- pom.xml+- README.txt
使此演示成为基于Camel的项目的原因只是pom.xml
的声明。 让我们看一下文件及其依赖项。
<?xml version='1.0' encoding='UTF-8'?>
<project xmlns='http://maven.apache.org/POM/4.0.0' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xsi:schemaLocation='http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd'><modelVersion>4.0.0</modelVersion><groupId>deng.cameldemo</groupId><artifactId>camel-demo</artifactId><version>1.0.0-SNAPSHOT</version><packaging>jar</packaging><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><slf4j.version>1.6.6</slf4j.version><camel.version>2.10.1</camel.version></properties><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.6</source><target>1.6</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><version>2.3</version><configuration><descriptorRefs><descriptorRef>project</descriptorRef><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build><dependencies><!-- Unit testing lib --><dependency><groupId>junit</groupId><artifactId>junit-dep</artifactId><version>4.10</version><scope>test</scope></dependency><dependency><groupId>org.hamcrest</groupId><artifactId>hamcrest-library</artifactId><version>1.2.1</version><scope>test</scope></dependency><!-- Logging lib --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version><scope>runtime</scope><optional>true</optional></dependency><!-- Apache Commons lib --><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.0.1</version></dependency><!-- Apache Camel --><dependency><groupId>org.apache.camel</groupId><artifactId>camel-core</artifactId><version>${camel.version}</version></dependency><dependency><groupId>org.apache.camel</groupId><artifactId>camel-spring</artifactId><version>${camel.version}</version></dependency><dependency><groupId>org.apache.camel</groupId><artifactId>camel-groovy</artifactId><version>${camel.version}</version></dependency><dependency><groupId>org.apache.camel</groupId><artifactId>camel-jackson</artifactId><version>${camel.version}</version></dependency><dependency><groupId>org.apache.camel</groupId><artifactId>camel-mina</artifactId><version>${camel.version}</version></dependency></dependencies></project>
此pom.xml
对基于Java的应用程序进行贴花处理,它将生成jar
。 它需要最少的JDK 6或更高版本。 除了用于单元测试的典型junit
和hamcrest
之外,我还添加了slf4j
进行日志记录。 我也将Apache的commons-lang/io
夫妇添加到了项目中。 我认为这些是任何基于Java的应用程序都应使用的基本设置。
我声明的maven-assembly-plugin
仅用于此演示打包目的,您可以更改或删除以适合您自己的项目需求。
对于骆驼依赖性,您将需要最少的camel-core
来构建路线。 然后,您可以添加计划在项目中使用的任何其他组件。 我添加了以下内容来构建基于消息的典型应用程序开发:
-
camel-spring
–我们希望可以选择在xml文件中声明骆驼路线作为配置。 有关camel-demo/config
请参见camel-demo/config
目录。 -
camel-jackson
–我们希望将应用程序中的消息传递数据处理为JSON格式。 -
camel-mina
-我们想通过TCP套接字在整个网络上发送消息传递数据。 -
camel-groovy
– [可选]我们希望即使在xml配置内部也可以添加动态脚本来路由。 这对于调试和POC非常有用。
请注意,由于我们使用了多个骆驼组件依赖关系,因此我选择设置Maven属性${camel.version}
以便在升级Camel时,将pom.xml
文件维护在一个位置更容易。
您应该能够进入项目目录并运行mvn compile
来验证项目。 它应该编译没有错误。
使用CamelRunner改善路线的启动
准备好项目pom.xml
文件后,就可以开始创建骆驼路线来处理自己的业务逻辑了。 在我们太兴奋之前,让我们尝试一个简单的HelloRoute
,看看它如何工作以及如何首先运行它。 这是src/main/java/deng/cameldemo/HelloRoute.java
的路由定义代码。
package deng.cameldemo;import org.apache.camel.builder.RouteBuilder;public class HelloRoute extends RouteBuilder {@Overridepublic void configure() throws Exception {from('timer://helloTimer?period=3000').to('log:' + getClass().getName());}
}
体验骆驼之旅
要查看上面的内容,我们需要将其添加到CamelContext
并启动上下文。 对于Java独立程序,我们将在Main
类中编写此安装代码。 Camel实际上带有org.apache.camel.main.MainSupport
抽象类,您可以用来扩展自己的Main
。 但是,我认为如果Camel提供一个可以像这样运行的CamelRunner
会更好。
$ java CamelRunner deng.cameldemo.HelloRoute
这样的CamelRunner
将非常具有用户友好性,并且可重复使用,所以我就是这样做的。 我这样写:
package deng.cameldemo;import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;/** * A main program to start Camel and run as a server using RouteBuilder class names or * Spring config files.* * <p>Usage:* * java deng.cameldemo.CamelRunner deng.cameldemo.HelloRoute* * or* * java -Dspring=true deng.cameldemo.CamelRunner /path/to/camel-spring.xml* * @author Zemian Deng*/
public class CamelRunner {public static void main(String[] args) throws Exception {CamelRunner runner = new CamelRunner();runner.run(args);}private static Logger logger = LoggerFactory.getLogger(CamelRunner.class);public void run(String[] args) throws Exception {if (Boolean.parseBoolean(System.getProperty('spring', 'false')))runWithSpringConfig(args);elserunWithCamelRoutes(args);// Wait for user to hit CRTL+C to stop the servicesynchronized(this) {this.wait();}}private void runWithSpringConfig(String[] args) {final ConfigurableApplicationContext springContext = new FileSystemXmlApplicationContext(args);// Register proper shutdown.Runtime.getRuntime().addShutdownHook(new Thread() { @Overridepublic void run() {try {springContext.close();logger.info('Spring stopped.');} catch (Exception e) {logger.error('Failed to stop Spring.', e);}}});// Start springlogger.info('Spring started.');}private void runWithCamelRoutes(String[] args) throws Exception {final CamelContext camelContext = new DefaultCamelContext(); // Register proper shutdown.Runtime.getRuntime().addShutdownHook(new Thread() { @Overridepublic void run() {try {camelContext.stop();logger.info('Camel stopped for {}', camelContext);} catch (Exception e) {logger.error('Failed to stop Camel.', e);}}});// Added RouteBuilder from argsfor (String className : args) {Class<?> cls = Class.forName(className);if (RouteBuilder.class.isAssignableFrom(cls)) {Object obj = cls.newInstance();RouteBuilder routeBuilder = (RouteBuilder)obj;camelContext.addRoutes(routeBuilder);} else {throw new RuntimeException('Unable to add Camel RouteBuilder ' + className);}}// Start camelcamelContext.start();logger.info('Camel started for {}', camelContext);}
}
为了帮助您运行主类,我在项目的bin
目录下提供了一个run-java包装程序脚本,以便您无需设置类路径即可快速对其进行测试。
$ mvn package
$ bin/run-java deng.cameldemo.CamelRunner deng.cameldemo.HelloRoute
您将看到该程序将在DefaultCamelContext
加载HelloRoute
并将其作为服务器启动。 HelloRoute
本身将生成3秒钟的计时器消息,并将其发送到记录器,该记录器应打印在控制台屏幕上。 这将一直持续下去,直到您按CTRL+C
结束它为止。
注意:您只需要调用一次mvn package
命令,这样它将打包所有依赖项jar,以便run-java
自动检测到它们。 如果您在package
阶段不打算使用maven-assembly-plugin
,那么显式使用mvn dependency:copy-dependencies
命令也可以正常工作。
进行Camel测试,第2部分:使用Spring xml配置运行Camel
上面的HelloRoute
示例将仅提供通过使用组件URI形成的路由定义。 如果我们可以以声明的方式配置路由,以便我们可以更改路由而无需重新编译类文件,那将是很好的。 这将非常方便,特别是如果您不熟悉每个组件的选项并且想探索并尝试的话。 好吧,这就是camel-spring
用途。 除了为您提供在xml配置文件中加载路由的选项之外,它还提供了一种非常灵活的方式来在Spring IoC容器中注册自定义服务/处理器Bean。
如果您是一位敏锐的读者,您会在上面的CamelRunner
代码中注意到它还有一个额外的runWithSpringConfig
部分。 因此, CamelRunner
实际上可以引导任何Spring xml文件并作为服务器启动上下文。 您可以这样使用它:
$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/hellocamel-spring.xml
config/hellocamel-spring.xml
等效于我们的HelloRoute
代码,但形式为Spring xml:
<beans xmlns='http://www.springframework.org/schema/beans'xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xsi:schemaLocation='http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'><camelContext id='helloCamel' xmlns='http://camel.apache.org/schema/spring'><route><from uri='timer://jdkTimer?period=3000'/><to uri='log://deng.cameldemo.HelloCamel'/></route></camelContext></beans>
这样就无需编译/重新编译HelloRoute
来定义要运行的Camel路由。
使用Camel构建基于消息的应用程序
为了向您展示更实际的演示,我将进一步向您展示如何设置Camel来处理基于消息的应用程序。 在许多IT商店中,通常都有一台服务器将消息数据作为输入并进行处理。 一个实际的用例是获取任何JSON格式的消息并将其转换为对象并进行处理。 要在Camel中做到这一点,您想要构建的是一条路由,该路由将从TCP端口获取输入消息,然后使用可能具有的任何业务逻辑在管道流中对其进行处理。 您将把路由作为服务器运行,然后客户端可以使用任何方式将消息提交到TCP端口。 客户端甚至可能是另一个瘦的Camel客户端应用程序,也可以提交数据。 让我告诉您如何开始。
用骆驼路线写服务器端代码
服务器端将需要一个路由来侦听TCP端口,而这是由camel-mina
组件提供的。 第一步是您需要一条路线。
package deng.cameldemo;import org.apache.camel.builder.RouteBuilder;public class TcpMsgRoute extends RouteBuilder {@Overridepublic void configure() throws Exception {String port = System.getProperty('port', '12345');from('mina:tcp://localhost:' + port + '?sync=false').to('log:' + getClass().getName());}
}
然后,下一步就完成了! 没办法,您的意思是服务器就这些了吗? 难以置信? 好吧,让我们尝试一下
$ bin/run-java deng.cameldemo.CamelRunner deng.cameldemo.TcpMsgRoute -Dport=12345
15:21:41 main INFO org.apache.camel.impl.DefaultCamelContext:1391 | Apache Camel 2.10.1 (CamelContext: camel-1) is starting
15:21:41 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled.
15:21:42 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters
15:21:42 main INFO org.apache.camel.component.mina.MinaConsumer:59 | Binding to server address: localhost/127.0.0.1:12345 using acceptor: org.apache.mina.transport.socket.nio.SocketAcceptor@2ffad8fe
15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:2045 | Route: route1 started and consuming from: Endpoint[mina://tcp://localhost:12345?sync=true]
15:21:42 main INFO org.apache.camel.management.DefaultManagementLifecycleStrategy:859 | StatisticsLevel at All so enabling load performance statistics
15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:1426 | Total 1 routes, of which 1 is started.
15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:1427 | Apache Camel 2.10.1 (CamelContext: camel-1) started in 0.505 seconds
15:21:42 main INFO deng.cameldemo.CamelRunner:93 | Camel started for CamelContext(camel-1)
瞧! 服务器已启动,正在等待用户通过端口12345
发送消息。 几行代码还不错。
用Camel ProducerTemplate编写客户端代码
由于我们的服务器公开了一个TCP端口并接收任何文本内容消息,因此您可以创建任何能够写入TCP套接字的客户端。 在这里,我将向您展示如何使用Camel编写瘦客户机。
package deng.cameldemo.client;import java.io.FileReader;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class TcpMsgSender {public static void main(String[] args) throws Exception {TcpMsgSender runner = new TcpMsgSender();runner.run(args);}private static Logger logger = LoggerFactory.getLogger(TcpMsgSender.class);public void run(String[] args) throws Exception {String fileName = args.length > 0 ? args[0] : 'data/msg.txt';String[] hostPort = (args.length > 1 ? args[1] : 'localhost:12345').split(':');String host = hostPort[0];String port = hostPort.length > 1 ? hostPort[1] : '12345';logger.info('Sending tcp message {} to host={}, port={}', new Object[]{ fileName, host, port});String text = IOUtils.toString(new FileReader(fileName));logger.debug('File size={}', text.length());CamelContext camelContext = new DefaultCamelContext();ProducerTemplate producer = camelContext.createProducerTemplate();producer.sendBody('mina:tcp://' + host + ':' + port + '?sync=false', text);logger.info('Message sent.');}
}
该TcpMsgSender
可以将任何文本文件发送到您的服务器端点。 在服务器运行时尝试以下操作:
$ bin/run-java deng.cameldemo.client.TcpMsgSender data/test-msg.json localhost:12345
15:22:35 main INFO deng.cameldemo.client.TcpMsgSender:24 | Sending tcp message data/test-msg.json to host=localhost, port=12345
15:22:35 main DEBUG deng.cameldemo.client.TcpMsgSender:27 | File size=47
15:22:35 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters
15:22:35 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled.
15:22:35 main INFO deng.cameldemo.client.TcpMsgSender:32 | Message sent.
您应该能够从服务器控制台输出中验证它是否收到了消息。 我发送的味精在data/test-msg.json
,其中包含以下简单文本:
{ 'firstName' : 'Zemian', 'lastName' : 'Deng' }
请注意,我们的服务器仅接收纯文本并将其记录。 接下来,我们将讨论如何处理消息。
使用Camel和Spring xml配置以JSON格式处理消息数据
您认为服务器代码从上面很容易,请再猜一次。 实际上,您可以仅用一些简单的xml行替换TcpMsgRoute
!
<beans xmlns='http://www.springframework.org/schema/beans'xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xsi:schemaLocation='http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'><camelContext id='tcpMsgServer' xmlns='http://camel.apache.org/schema/spring'><route><from uri='mina:tcp://localhost:12345?sync=false'/><to uri='log://deng.cameldemo.TcpMsgServer'/></route></camelContext></beans>
将其另存为config/tcpmsgserver-spring.xml
。 然后重新运行服务器,您应该获得与上面相同的结果。
$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/tcpmsgserver-spring.xml
现在让我们改进上面的xml,以进一步处理JSON消息数据。 我们希望将纯文本转换为Java对象,然后由自定义bean处理。 为此,我们首先需要在路线中添加解组组件。 这就是camel-jackson
发挥作用的地方。 在我们的演示中,解组步骤会将JSON文本转换为java.util.Map
,然后将其传递给名为myMsgProcessor
的处理器bean。 让我们创建一个名为config/tcpmsgserver-json-spring.xml
的新xml文件,如下所示。
<beans xmlns='http://www.springframework.org/schema/beans'xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xsi:schemaLocation='http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'><camelContext id='tcpMsgServer' xmlns='http://camel.apache.org/schema/spring'><route><from uri='mina:tcp://localhost:12345?sync=false'/><to uri='log://deng.cameldemo.TcpMsgServer'/><unmarshal><json library='Jackson'/></unmarshal><to uri='bean:myMsgProcessor?method=process'/></route></camelContext><bean id='myMsgProcessor' class='deng.cameldemo.MyMsgProcessor'></bean></beans>
myMsgProcessor
是一个Spring bean,我们提供了自定义逻辑代码来处理数据。 至此,我们有一个完整的Java对象要操作。 处理器的内容可以是具有URI中指定的方法名称的任何POJO。 这是一个示例:
package deng.cameldemo;import org.apache.camel.builder.RouteBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;public class MyMsgProcessor {private static Logger logger = LoggerFactory.getLogger(MyMsgProcessor.class);public void process(Map<String, String> data) {logger.info('We should slice and dice the data: ' + data);}
}
尝试使用上面的新xml文件重新运行服务器,您应该能够重新调用相同的客户端命令进行测试。 这是服务器的示例输出:
$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/tcpmsgserver-json-spring.xml
17:05:25 main INFO org.springframework.context.support.FileSystemXmlApplicationContext:456 | Refreshing org.springframework.context.support.FileSystemXmlApplicationContext@4200309: startup date [Sat Sep 15 17:05:25 EDT 2012]; root of context hierarchy
17:05:25 main INFO org.springframework.beans.factory.xml.XmlBeanDefinitionReader:315 | Loading XML bean definitions from file [/Users/zemian/projects/sandbox/camel-demo/config/tcpmsgserver-json-spring.xml]
17:05:27 main INFO org.springframework.beans.factory.support.DefaultListableBeanFactory:557 | Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@27b75165: defining beans [template,consumerTemplate,tcpMsgServer:beanPostProcessor,tcpMsgServer,myMsgProcessor]; root of factory hierarchy
17:05:27 main INFO org.apache.camel.spring.SpringCamelContext:1391 | Apache Camel 2.10.1 (CamelContext: tcpMsgServer) is starting
17:05:27 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled.
17:05:27 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters
17:05:28 main INFO org.apache.camel.component.mina.MinaConsumer:59 | Binding to server address: localhost/127.0.0.1:12345 using acceptor: org.apache.mina.transport.socket.nio.SocketAcceptor@5a3cae4a
17:05:28 main INFO org.apache.camel.spring.SpringCamelContext:2045 | Route: route1 started and consuming from: Endpoint[mina://tcp://localhost:12345?sync=false]
17:05:28 main INFO org.apache.camel.management.DefaultManagementLifecycleStrategy:859 | StatisticsLevel at All so enabling load performance statistics
17:05:28 main INFO org.apache.camel.spring.SpringCamelContext:1426 | Total 1 routes, of which 1 is started.
17:05:28 main INFO org.apache.camel.spring.SpringCamelContext:1427 | Apache Camel 2.10.1 (CamelContext: tcpMsgServer) started in 0.695 seconds
17:05:28 main INFO deng.cameldemo.CamelRunner:61 | Spring started.
17:05:35 Camel (tcpMsgServer) thread #3 - MinaThreadPool INFO deng.cameldemo.TcpMsgServer:96 | Exchange[ExchangePattern:InOnly, BodyType:String, Body:{ 'firstName' : 'Zemian', 'lastName' : 'Deng' }]
17:05:35 Camel (tcpMsgServer) thread #3 - MinaThreadPool INFO deng.cameldemo.MyMsgProcessor:11 | We should slice and dice the data: {lastName=Deng, firstName=Zemian}
请注意,骆驼会自动转换您的路线中的数据格式! 我们的客户端仅以JSON格式发送纯文本,但是当服务器收到纯文本时,它将使用Jackson库将其解组,然后将其转换为Java Map对象。 然后,它将map对象传递到我们的处理器bean中。 另外,在此演示中,我选择使用通用的java.util.Map
作为处理器方法参数(这是JSON unmarshal的输出),但是您可以轻松定义自己的业务数据类型,例如MyCustomerData
。 这显示了Camel的强大功能,因为您无需在流程中推送消息,而只需担心将“处理器”编写为POJO。 骆驼将组件“粘合”在一起以形成一条路线,并通过管道流携带消息数据。
同样,当您在一个或多个处理器中编写业务逻辑时,最好将POJO逻辑限制为尽可能小的单位。 当您这样做时,则可以最大程度地提高处理器的可重用性。 您制作的POJO较大,并且混合了许多业务逻辑,因此也很难进行测试。 因此,我建议您在开发这些处理器bean时,尝试将它们视为乐高积木-小POJO。 您想让骆驼定义路线并将LEGO块粘合在一起。 一旦习惯了这种thicking的习惯,便可以更有效地使用Camel来解决许多域问题。
好了,今天的人们就这些了。 我希望您喜欢骑骆驼。
祝您编程愉快,别忘了分享!
参考: A程序员杂志博客上的JCG合作伙伴 Zemian Deng 使用Camel构建基于消息的应用程序 。
翻译自: https://www.javacodegeeks.com/2012/09/camel-build-message-based-application.html