请查看我在Karaf的OSGi中构建普通CXF服务(不使用Camel)的其他文章 。
这是有关如何
- 创建一个CXF REST服务
- 使用骆驼多播(并并行化)传入的请求
- 来自两个不同服务的源数据
- 汇总响应并
- 最后将合并结果作为JSON返回给最终用户。
您可以从github下载整个代码库 。
简单来说,此应用程序可以做什么
此服务的预期结果是一个硬编码的响应,看起来像
从图像中可以看到,响应的顶部来自一个名为NameEmailService
的服务,响应的第二部分来自一个名为AgePhoneService
的服务。 同时充实数据的调用和合并结果实体– ConsolidatedSearchResult
被填充。
项目结构如下所示:
步骤1有两个婴儿步骤。
步骤1.a –创建CXF REST服务
您可能已经猜到了,这一步没有什么复杂的。 只是一个接口和一个实现。
接口
@Path("rest")
public interface RestService {@GET@Path("query/{queryString}")@Produces(MediaType.APPLICATION_JSON)public String sourceResultsFromTwoSources(@PathParam("queryString") String queryString);}
实作
public class RestServiceImpl implements RestService {private static Logger logger= LoggerFactory.getLogger(AgePhoneServiceImpl.class);private NameEmailService nameEmailService;private AgePhoneService agePhoneService;public RestServiceImpl(){}//Do nothing. Camel intercepts and routes the requestspublic String sourceResultsFromTwoSources(String queryString) {return null;}public NameEmailResult getNameEmailResult(String queryString){logger.info("Invoking getNameEmailResult from RestServiceImpl");return nameEmailService.getNameAndEmail(queryString);}public AgePhoneResult getAgePhoneResult(String queryString){logger.info("Invoking getAgePhoneResult from RestServiceImpl");return agePhoneService.getAgePhoneResult(queryString);}public NameEmailService getNameEmailService() {return nameEmailService;}public AgePhoneService getAgePhoneService() {return agePhoneService;}public void setNameEmailService(NameEmailService nameEmailService) {this.nameEmailService = nameEmailService;}public void setAgePhoneService(AgePhoneService agePhoneService) {this.agePhoneService = agePhoneService;}
}
请注意,方法实现sourceResultsFromTwoSources
返回null。 事实是,进行REST调用时甚至不会调用此方法。 骆驼拦截所有对URL的请求并将其路由到各个端点(在我们的例子中,调用两个方法– getNameEmailResult()
和getAgePhoneResult()
)。
步骤1.b –创建服务实现
NameEmailService和AgePhoneService的Kiddish实现如下:
NameEmailServiceImpl
public class NameEmailServiceImpl implements NameEmailService {public NameEmailResult getNameAndEmail(String queryString){return new NameEmailResult("Arun", "arun@arunma.com");}}
AgePhoneServiceImpl
public class AgePhoneServiceImpl implements AgePhoneService {public AgePhoneResult getAgePhoneResult(String queryString){return new AgePhoneResult(32, "111-222-333");}
}
第2、3、4和5步
好吧,当我说2、3、4和5是4个步骤时,我撒了谎。 使用Camel路由及其企业集成模式实现,所有这些操作都只需一步即可完成。
RestToBeanRouter
public class RestToBeanRouter extends RouteBuilder {@Overridepublic void configure() throws Exception {from ("cxfrs://bean://rsServer").multicast().parallelProcessing().aggregationStrategy(new ResultAggregator()).beanRef("restServiceImpl", "getNameEmailResult").beanRef("restServiceImpl", "getAgePhoneResult").end().marshal().json(JsonLibrary.Jackson).to("log://camelLogger?level=DEBUG");}
}
我们的路由说明
简而言之,routerbuilder所做的就是
1) from ("cxfrs://bean://rsServer")
拦截对在rest-blueprint.xml
定义的JAX-RS服务器端点的所有请求,如下所示:
rest-blueprint.xml
<cxf:rsServer id="rsServer" address="/karafcxfcamel"serviceClass="me.rerun.karafcxfcamel.rest.RestServiceImpl"loggingFeatureEnabled="true" />
2) .multicast()
将未更改的原始请求转发到
1. `getNameEmailResult` &
2. `getAgePhoneResult` methods in `RestServiceImpl`
3) .parallelProcessing()
并发调用这些方法。
4) .aggregationStrategy(new ResultAggregator())
指定如何汇总来自各种多播源的结果。
我们的聚合器看起来像:
结果聚合器
public class ResultAggregator implements AggregationStrategy {@Overridepublic Exchange aggregate(Exchange oldExchange, Exchange newExchange) {ConsolidatedSearchResult consolidatedSearchResult=null;if (oldExchange==null){consolidatedSearchResult=new ConsolidatedSearchResult();}else{consolidatedSearchResult=oldExchange.getIn().getBody(ConsolidatedSearchResult.class);}NameEmailResult nameEmailResult=newExchange.getIn().getBody(NameEmailResult.class);AgePhoneResult agePhoneResult=newExchange.getIn().getBody(AgePhoneResult.class);if (nameEmailResult!=null){consolidatedSearchResult.setNameEmailResult(nameEmailResult);}if (agePhoneResult!=null){consolidatedSearchResult.setAgePhoneResult(agePhoneResult);}newExchange.getIn().setBody(consolidatedSearchResult);return newExchange;}
}
我们的聚合器说明
我们的ResultAggregator中的aggregate
方法虽然有些粗糙,但是可以完成工作。
- 每当所有多播端点完成时,都将调用
aggregate
方法。 - 因此,第一次,oldExchange将为null。 我们以此为契机来构建我们想要响应用户的最终合并结果实体。
- 我们检查传入的newExchange是调用NameEmailService还是AgePhoneService的结果,并相应地填充合并的实体。
- 最后,我们返回合并的实体–返还完成两项工作。
- 合并实体以oldExchange的形式出现,以供下一次调用
aggregate
方法。 (更像是链接–实体最后返回的对象是作为下一个调用的传入交换传入的对象) - 如果它是
aggregate
的最后一个调用(所有多播端点调用已完成),则将其返回给用户。
- 合并实体以oldExchange的形式出现,以供下一次调用
翻译自: https://www.javacodegeeks.com/2013/10/building-camel-cxf-rest-service-in-osgi-for-karaf-multicasting-and-aggregation.html