今天,我将把第二部分带到我以前关于Java EE 7批处理和《魔兽世界–第1部分》的帖子中。 在本文中,我们将了解如何从第1部分中获得的数据中汇总和提取指标。
概括
批处理目的是下载魔兽世界拍卖行的数据,处理拍卖并提取指标。 这些指标将建立拍卖项目价格随时间变化的历史记录。 在第1部分中 ,我们已经下载了数据并将其插入数据库。
应用程序
处理作业
在将原始数据添加到数据库之后,我们将添加一个带有Chunk样式处理的步骤。 在块中,我们将读取聚合的数据,然后将其插入数据库中的另一个表中以便于访问。 这是在process-job.xml
:
process-job.xml
<step id="importStatistics"><chunk item-count="100"><reader ref="processedAuctionsReader"/><processor ref="processedAuctionsProcessor"/><writer ref="processedAuctionsWriter"/></chunk>
</step>
块一次读取一个数据,并在事务内创建要写出的块。 从ItemReader
读入一项,交给ItemProcessor
并进行聚合。 一旦读取的项目数等于提交间隔,就通过ItemWriter
写入整个块,然后提交事务。
ProcessedAuctionsReader
在读者中,我们将使用数据库功能选择和汇总指标。
ProcessedAuctionsReader.java
@Named
public class ProcessedAuctionsReader extends AbstractAuctionFileProcess implements ItemReader {@Resource(name = "java:comp/DefaultDataSource")protected DataSource dataSource;private PreparedStatement preparedStatement;private ResultSet resultSet;@Overridepublic void open(Serializable checkpoint) throws Exception {Connection connection = dataSource.getConnection();preparedStatement = connection.prepareStatement("SELECT" +" itemid as itemId," +" sum(quantity)," +" sum(bid)," +" sum(buyout)," +" min(bid / quantity)," +" min(buyout / quantity)," +" max(bid / quantity)," +" max(buyout / quantity)" +" FROM auction" +" WHERE auctionfile_id = " +getContext().getFileToProcess().getId() +" GROUP BY itemid" +" ORDER BY 1",ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY,ResultSet.HOLD_CURSORS_OVER_COMMIT);// Weird bug here. Check https://java.net/bugzilla/show_bug.cgi?id=5315//preparedStatement.setLong(1, getContext().getFileToProcess().getId());resultSet = preparedStatement.executeQuery();}@Overridepublic void close() throws Exception {DbUtils.closeQuietly(resultSet);DbUtils.closeQuietly(preparedStatement);}@Overridepublic Object readItem() throws Exception {return resultSet.next() ? resultSet : null;}@Overridepublic Serializable checkpointInfo() throws Exception {return null;}
在此示例中,我们通过使用具有简单可滚动结果集的纯JDBC获得最佳性能结果。 这样,仅执行一个查询,并根据需要在readItem
中提取结果。 您可能想探索其他替代方法。
Plain JPA在标准中没有可滚动的结果集,因此您需要对结果进行分页。 这将导致多个查询,这将减慢阅读速度。 另一个选择是使用新的Java 8 Streams API来执行聚合操作。 这些操作很快,但是您需要从数据库中选择整个数据集到流中。 最终,这会削弱您的性能。
我确实尝试了这两种方法,并通过使用数据库聚合功能获得了最佳结果。 我并不是说这始终是最好的选择,但是在这种情况下,这是最好的选择。
在实施过程中,我还发现了Batch中的错误。 您可以在这里检查。 在PreparedStatement中设置参数时会引发异常。 解决方法是将参数直接注入查询SQL中。 丑陋,我知道...
ProcessedAuctionsProcessor
在处理器中,让我们将所有聚合值存储在一个holder对象中,以存储在数据库中。
ProcessedAuctionsProcessor.java
@Named
public class ProcessedAuctionsProcessor extends AbstractAuctionFileProcess implements ItemProcessor {@Override@SuppressWarnings("unchecked")public Object processItem(Object item) throws Exception {ResultSet resultSet = (ResultSet) item;AuctionItemStatistics auctionItemStatistics = new AuctionItemStatistics();auctionItemStatistics.setItemId(resultSet.getInt(1));auctionItemStatistics.setQuantity(resultSet.getLong(2));auctionItemStatistics.setBid(resultSet.getLong(3));auctionItemStatistics.setBuyout(resultSet.getLong(4));auctionItemStatistics.setMinBid(resultSet.getLong(5));auctionItemStatistics.setMinBuyout(resultSet.getLong(6));auctionItemStatistics.setMaxBid(resultSet.getLong(7));auctionItemStatistics.setMaxBuyout(resultSet.getLong(8));auctionItemStatistics.setTimestamp(getContext().getFileToProcess().getLastModified());auctionItemStatistics.setAvgBid((double) (auctionItemStatistics.getBid() / auctionItemStatistics.getQuantity()));auctionItemStatistics.setAvgBuyout((double) (auctionItemStatistics.getBuyout() / auctionItemStatistics.getQuantity()));auctionItemStatistics.setRealm(getContext().getRealm());return auctionItemStatistics;}
}
由于指标会及时记录数据的准确快照,因此计算仅需执行一次。 这就是为什么我们要保存汇总指标。 它们永远不会改变,我们可以轻松地检查历史。
如果您知道源数据是不可变的,并且需要对其进行操作,那么建议您将结果保留在某处。 这样可以节省您的时间。 当然,如果将来要多次访问此数据,则需要平衡。 如果不是这样,也许您就不需要经历持久化数据的麻烦了。
ProcessedAuctionsWriter
最后,我们只需要将数据写到数据库中即可:
ProcessedAuctionsWriter.java
@Named
public class ProcessedAuctionsWriter extends AbstractItemWriter {@PersistenceContextprotected EntityManager em;@Override@SuppressWarnings("unchecked")public void writeItems(List items) throws Exception {List<AuctionItemStatistics> statistis = (List<AuctionItemStatistics>) items;statistis.forEach(em::persist);}
}
指标
现在,为了对数据做一些有用的事情,我们将公开一个REST端点,以对所计算的指标执行查询。 方法如下:
WowBusinessBean.java
@Override @GET@Path("items")public List<AuctionItemStatistics> findAuctionItemStatisticsByRealmAndItem(@QueryParam("realmId") Long realmId,@QueryParam("itemId") Integer itemId) {Realm realm = (Realm) em.createNamedQuery("Realm.findRealmsWithConnectionsById").setParameter("id", realmId).getSingleResult();// Workaround for https://bugs.eclipse.org/bugs/show_bug.cgi?id=433075 if using EclipseLinkList<Realm> connectedRealms = new ArrayList<>();connectedRealms.addAll(realm.getConnectedRealms());List<Long> ids = connectedRealms.stream().map(Realm::getId).collect(Collectors.toList());ids.add(realmId);return em.createNamedQuery("AuctionItemStatistics.findByRealmsAndItem").setParameter("realmIds", ids).setParameter("itemId", itemId).getResultList();}
如果您还记得第1部分中的一些细节,那么魔兽世界服务器称为Realms 。 这些领域可以相互链接并共享同一拍卖行 。 为此,我们还拥有有关领域之间如何相互联系的信息。 这很重要,因为我们可以在所有连接的领域中搜索拍卖品 。 其余的逻辑只是简单的查询以获取数据。
在开发过程中,我还发现了Eclipse Link (如果您在Glassfish中运行)和Java 8的错误。显然, Eclipse Link返回的基础Collection的元素计数设置为0。尝试内联查询调用以及Stream操作。 流将认为它为空,并且不会返回任何结果。 您可以在这里有关此的内容。
接口
我还使用Angular和Google Charts开发了一个小界面来显示指标。 看一看:
在这里,我在寻找一个名为“Aggra(葡萄牙语)”的境界与拍卖项目编号72092对应于鬼铁矿石 。 如您所见,我们可以检查待售数量,出价和买断值以及价格随时间的波动。 整齐? 我可能会写另一篇关于将来构建Web Interface的文章。
资源资源
您可以从我的github存储库中克隆完整的工作副本,然后将其部署到Wildfly或Glassfish中 。 您可以在那里找到部署说明: 魔兽世界拍卖
也请检查Java EE示例项目,其中包含大量完整的批处理示例。
翻译自: https://www.javacodegeeks.com/2015/01/java-ee-7-batch-processing-and-world-of-warcraft-part-2.html