使用David van Geest非常巧妙地描述的方法,我能够避免在客户端连接时获取任何上游数据here
归结为在Consumer上有一个BroadcastHub:
val liveSource = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic1, topic2))
.map(kafkaObject => utils.WebSockets.kafkaWrapper(kafkaObject.topic(), kafkaObject.value()))
.toMat(BroadcastHub.sink)(Keep.right)
.run()
并连接静态消费者吃掉所有上游数据
liveSource.to(Sink.ignore).run()
从此,我可以让WebSocket客户端订阅消费者收到的所有数据:
def source(): Flow[Any, String, NotUsed] = {Flow.fromSinkAndSource(Sink.ignore, liveSource)}
或者根据KafkaTopic(或其他任何你想要的)过滤
def KafkaSpecificSource(kafkaTopic: String): Flow[Any, String, NotUsed] = {
Flow.fromSinkAndSource(Sink.ignore, liveSource.filter({
x =>
(Json.parse(x) \ "topic").asOpt[String] match {
case Some(str) => str.equals(kafkaTopic)
case None => false
}
}))
}
这并不能解决在第一次连接时向用户提供x数据量的问题,但我预见到我们会为任何历史数据添加简单的数据库查询,并且让WebSocket连接只关注直播数据 .