这是一个策略:
来自Mapper的
:发出每个记录的三个副本并使用二级排序:
((复合键),值)=
((消息小时 - 一小时,当前消息的精确时间),消息)
((消息小时,消息的准确时间),消息)
((消息小时1小时,消息的准确时间),消息)
现在:您需要标准的二级排序:
setPartitioner只到密钥的前半部分(消息的小时)
setGroupingComparator只到键的前半部分(消息的小时)
setSortingComparator to(消息小时,消息的准确时间)
在reducer中:每个reducer组接收消息精确时间内/ - 60到120分钟内的所有消息 . reducer以排序顺序查看"precise time of message"的所有内容 . 因此,您可以在每个减速器中保留过去60分钟内查看的所有消息的滑动窗口
NOTE 以上假设60分钟消息的数据可以放在单个reducer任务的内存中 . 否则,您将需要求助于将数据写入磁盘作为窗口函数的一部分 .
Update OP要求进一步澄清窗口,所以我们走了 .
从Mapper发出的密钥的角度考虑:每个输入记录有三个密钥 . 现在在Reducer上,这意味着每个输入记录都出现在三个不同的组中 . 原因是我们需要针对每个输入记录考虑前导和滞后记录 . 因此,现在我们让每个组都可以访问所有输入记录,这些记录可能在最早记录的60分钟内以及最新记录的60分钟内 . 由于记录按每小时最早的秒数分组:这意味着-60(分钟)到120(最大)对比属于给定小时组的任何记录 .