这篇文章详细介绍了从数据库到对该数据感兴趣的任何其他组件进行流更新的幼稚实现。 更确切地说,如何更改Spring Data R2DBC存储库以向相关订阅者发出事件。
对R2DBC和Spring的一点背景知识将对这篇文章有所帮助。 我以前的著作“ 使用 Microsoft SQL Server的 Spring Data R2DBC和Spring Data R2DBC进行 异步RDBMS访问”在这方面应该有所帮助。
如前所述,这将是一个幼稚的实现。 因此,代码将不会花哨。
为此,我劫持了SimpleR2dbcRepository
以创建一个存储库实现,该存储库实现在每次保存新记录时都会发出事件。 新事件将添加到DirectProcessor
,并发送到订阅它的任何Publisher
。 看起来像:
class PersonRepository(entity: RelationalEntityInformation<Person, Int>,databaseClient: DatabaseClient,converter: R2dbcConverter,accessStrategy: ReactiveDataAccessStrategy
) : SimpleR2dbcRepository<Person, Int>(entity, databaseClient, converter, accessStrategy) {private val source: DirectProcessor<Person> = DirectProcessor.create<Person>()val events: Flux<Person> = sourceoverride fun <S : Person> save(objectToSave: S): Mono<S> {return super.save(objectToSave).doOnNext(source::onNext)}
}
来自SimpleR2dbcRepository
唯一需要重写的函数是save
( saveAll
委托来save
)。 doOnNext
添加到原始保存调用中,该调用通过调用onNext
将新事件推送到source
( DirectorProcessor
)。
source
被强制转换为Flux
以防止来自存储库外部的类添加新事件。 从技术上讲,他们仍然可以添加事件,但是他们需要自己进行转换。
您可能已经注意到,存储库正在加载参数并将其传递到SimpleR2dbcRepository
。 存储库的一个实例需要手动创建,因为其某些依赖项无法自动注入:
@Configuration
class RepositoryConfiguration {@Beanfun personRepository(databaseClient: DatabaseClient,dataAccessStrategy: ReactiveDataAccessStrategy): PersonRepository {val entity: RelationalPersistentEntity<Person> = dataAccessStrategy.converter.mappingContext.getRequiredPersistentEntity(Person::class.java) as RelationalPersistentEntity<Person>val relationEntityInformation: MappingRelationalEntityInformation<Person, Int> =MappingRelationalEntityInformation(entity, Int::class.java)return PersonRepository(relationEntityInformation,databaseClient,dataAccessStrategy.converter,dataAccessStrategy)}
}
至此,一切都准备就绪,可以使用了。 以下是其工作的示例:
personRepository.events.doOnComplete { log.info("Events flux has closed") }.subscribe { log.info("From events stream - $it") }
// insert people records over time
MARVEL_CHARACTERS.toFlux().delayElements(Duration.of(1, SECONDS)).concatMap { personRepository.save(it) }.subscribe()
哪个输出:
29-08-2019 09:08:27.674 [reactor-tcp-nio-1] From events stream - Person(id=481, name=Spiderman, age=18)
29-08-2019 09:08:28.550 [reactor-tcp-nio-2] From events stream - Person(id=482, name=Ironman, age=48)
29-08-2019 09:08:29.555 [reactor-tcp-nio-3] From events stream - Person(id=483, name=Thor, age=1000)
29-08-2019 09:08:30.561 [reactor-tcp-nio-4] From events stream - Person(id=484, name=Hulk, age=49)
29-08-2019 09:08:31.568 [reactor-tcp-nio-5] From events stream - Person(id=485, name=Antman, age=49)
29-08-2019 09:08:32.571 [reactor-tcp-nio-6] From events stream - Person(id=486, name=Blackwidow, age=34)
29-08-2019 09:08:33.576 [reactor-tcp-nio-7] From events stream - Person(id=487, name=Starlord, age=38)
29-08-2019 09:08:34.581 [reactor-tcp-nio-8] From events stream - Person(id=488, name=Captain America, age=100)
29-08-2019 09:08:35.585 [reactor-tcp-nio-9] From events stream - Person(id=489, name=Warmachine, age=50)
29-08-2019 09:08:36.589 [reactor-tcp-nio-10] From events stream - Person(id=490, name=Wasp, age=26)
29-08-2019 09:08:37.596 [reactor-tcp-nio-11] From events stream - Person(id=491, name=Winter Soldier, age=101)
29-08-2019 09:08:38.597 [reactor-tcp-nio-12] From events stream - Person(id=492, name=Black Panther, age=42)
29-08-2019 09:08:39.604 [reactor-tcp-nio-1] From events stream - Person(id=493, name=Doctor Strange, age=42)
29-08-2019 09:08:40.609 [reactor-tcp-nio-2] From events stream - Person(id=494, name=Gamora, age=29)
29-08-2019 09:08:41.611 [reactor-tcp-nio-3] From events stream - Person(id=495, name=Groot, age=4)
29-08-2019 09:08:42.618 [reactor-tcp-nio-4] From events stream - Person(id=496, name=Hawkeye, age=47)
29-08-2019 09:08:43.620 [reactor-tcp-nio-5] From events stream - Person(id=497, name=Pepper Potts, age=44)
29-08-2019 09:08:44.627 [reactor-tcp-nio-6] From events stream - Person(id=498, name=Captain Marvel, age=59)
29-08-2019 09:08:45.631 [reactor-tcp-nio-7] From events stream - Person(id=499, name=Rocket Raccoon, age=30)
29-08-2019 09:08:46.637 [reactor-tcp-nio-8] From events stream - Person(id=500, name=Drax, age=49)
29-08-2019 09:08:47.639 [reactor-tcp-nio-9] From events stream - Person(id=501, name=Nebula, age=30)
每秒保存一条记录,该记录与从存储库发出的事件相匹配。
请注意, doOnComplete
事件永远不会触发。 源永远不会关闭,因此永远不会向其任何订户发出完成事件。
至少对于此基本实现而言,这就是全部。 我敢肯定还有很多事情可以做,但是我首先需要弄清楚该怎么做……总结一下,通过添加一些内容,您可以将插入数据库的数据流式传输到对记录感兴趣的组件被添加。
翻译自: https://www.javacodegeeks.com/2019/09/streaming-live-updates-reactive-spring-data-repository.html