目录
- 前言
- 使用
- 1、整体结构
- 1.1、序列化
- 2、 Kafka Streams 常用的 API
- 2.1、 StreamsBuilder
- 2.2、 KStream 和 KTable
- 2.3、 filter和 filterNot
- 2.4、 map 和 mapValues
- 2.5、 flatMap 和 flatMapValues
- 2.6、 groupByKey 和 groupBy
- 2.7、 count、reduce 和 aggregate
- 2.8、 join 和 leftJoin
- 2.9、 to 和 toTable
- 2.10、 foreach
- 3、简单案例练习
- 3.1、通过streams实现数据流处理,把字符串装为大写
- 3.2、map 使用,把内容转为key,把value转为内容长度
- 3.3、filter和 filterNot使用过滤消息
- 3.4、通过selectKey配置key
- 4、复杂对象案例练习
- 4.1、自定义序列化
- 4.2、发送消息
- 4.3、通过KafkaStreams接收复杂对象,并且使用split
前言
上一篇文章已经对kafka streams
进行了大致的介绍以及简单案例使用,这一片文章主要是对kafka streams
常用API的介绍以及使用,如果需要看概念介绍的可以看
Kafka Streams详细介绍与具体使用
下面直接开始进行kafka streams
使用操作
使用
1、整体结构
在我们整体处理
streams
时,总共就分为三部分
第一部分是创建配置,告诉kafka我们的连接信息,通过StreamsConfig传递,然后创建一个StreamsBuilder类用于构建
kafka streams
拓扑的主要类。该类主要用于定义数据流处理的拓扑结构。
还有就是通过Serde
进行一个序列化第二部分主要是构建流处理拓扑,通过
StreamsBuilder
类得到源Processor
处理器也就是KStream
类,然后通过KStream的API方法得到不同的Processor
处理器(重点)第三部分通过配置流处理拓扑,创建流处理实例,然后通过
kafkaStreams.start()
方法启动,结束时再通过kafkaStreams.close()
关闭
我们在使用时,1和3基本上都是相同的,一般我们要处理的就是2,根据不同的业务需求,我们得到不同的Processor
处理器,然后发送到不同的topic
1.1、序列化
在使用Kafka Streams
时,我们需要把从topic接收到的消息进行序列化,然后再把返回topic的消息进行反序列化,
可以和kafka发送一样使用Properties类直接定义全部的,但这样会有很大的局限性,因为我们发送给不同的处理过的topic消息往往都会是不同的类型,所以我们会使用到Serde进行该操作,再每次和topic进行读取或写入消息时通过Serde进行指定key和消息的类型。
基础数据类型都有默认的写好的通过Serdes方法可以获得,如果需要定义复杂的则需要自己定义一个序列化和反序列化的类。
如下:
Serde<String> stringSerde = Serdes.String()
:String类型Serde<Purchase> propertiesSerde = Serdes.serdeFrom(serializer, deserializer)
:自定义复杂类型JsonSerializer<Purchase> serializer = new JsonSerializer<>();
:自定义的序列化类JsonDeserializer<Purchase> deserializer = new JsonDeserializer<>(Purchase.class);
:反序列化类
kafka也有默认的可以直接用的:org.springframework.kafka.support.serializer.JsonSerializer
,但一般建议自定义序列化
2、 Kafka Streams 常用的 API
2.1、 StreamsBuilder
StreamsBuilder
是构建 Kafka Streams 拓扑的主要类。它用于定义数据流处理的拓扑结构。
StreamsBuilder builder = new StreamsBuilder();
2.2、 KStream 和 KTable
KStream
:表示一个无界的数据流,每个记录都是一个键值对。
KStream<String, String> stream = builder.stream("input-topic");
KTable
:表示一个表,每个键对应一个最新的值。
KTable<String, Long> table = builder.table("input-topic");
2.3、 filter和 filterNot
filter
:根据条件过滤记录。
KStream<String, String> filteredStream = stream.filter((key, value) -> value.contains("important"));
filterNot
:过滤掉不符合条件的记录。
KStream<String, String> filteredStream = stream.filterNot((key, value) -> value.contains("unimportant"));
2.4、 map 和 mapValues
map
:对每个记录的键和值进行转换。
KStream<String, Integer> mappedStream = stream.map((key, value) -> new KeyValue<>(key, value.length()));
mapValues
:仅对记录的值进行转换。
KStream<String, Integer> mappedStream = stream.mapValues(value -> value.length());
2.5、 flatMap 和 flatMapValues
flatMap
:将每个输入记录映射为零个或多个输出记录。
KStream<String, String> flatMappedStream = stream.flatMap((key, value) -> {List<KeyValue<String, String>> result = new ArrayList<>();for (String word : value.split(" ")) {result.add(new KeyValue<>(key, word));}return result;});
flatMapValues
:仅将每个输入记录的值映射为零个或多个输出值。
****KStream<String, String> flatMappedStream = stream.flatMapValues(value -> Arrays.asList(value.split(" ")));
2.6、 groupByKey 和 groupBy
groupByKey
:根据记录的键进行分组。
KGroupedStream<String, String> groupedStream = stream.groupByKey();
groupBy
:根据新的键进行分组。
KGroupedStream<String, String> groupedStream = stream.groupBy((key, value) -> value);
2.7、 count、reduce 和 aggregate
count
:计算每个分组中的记录数。
KTable<String, Long> countTable = groupedStream.count();
reduce
:对每个分组中的记录进行归约操作。
KTable<String, String> reducedTable = groupedStream.reduce((aggValue, newValue) -> aggValue + newValue);
aggregate
:自定义聚合操作。
KTable<String, Integer> aggregatedTable = groupedStream.aggregate(() -> 0,(key, value, aggregate) -> aggregate + value.length(),Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("aggregated-store"));
2.8、 join 和 leftJoin
join
:对两个流或表进行内连接操作。
KStream<String, String> joinedStream = stream.join(otherStream,(value1, value2) -> value1 + value2,JoinWindows.of(Duration.ofMinutes(5)));
leftJoin
:对两个流或表进行左连接操作。
KStream<String, String> leftJoinedStream = stream.leftJoin(otherStream,(value1, value2) -> value1 + (value2 == null ? "" : value2),JoinWindows.of(Duration.ofMinutes(5)));
2.9、 to 和 toTable
to
:将流中的记录写入到Kafka主题。
**stream.to("output-topic");**
toTable
:将流转换为表。
KTable<String, String> table = stream.toTable();
2.10、 foreach
对每个记录执行一个操作,但不返回新的流。
stream.foreach((key, value) -> System.out.println(key + ": " + value));
上面这些就是KafkaStreams常用到的一些API,通过组合使用这些API,你可以构建复杂的流处理拓扑,以满足各种数据处理需求。
3、简单案例练习
使用脚本
#先进入容器
docker exec -it kafka-server /bin/bash#创建topic(把ip换为自己的)
/opt/kafka/bin/kafka-topics.sh --create --topic sell.purchase.transaction --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1# 进入生产者发消息
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic input-topic#进入消费者监听
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sell.purchase.transaction
3.1、通过streams实现数据流处理,把字符串装为大写
@Slf4j
public class KafkaStreamsYellingApp {
// appidprivate final static String APPLICATION_ID = "yelling_app_id";private final static String INPUT_TOPIC = "input-topic";private final static String OUTPUT_TOPIC = "out-topic";private final static String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) throws InterruptedException {//1、配置客户端和序列化器
// 配置kafka stream属性连接Properties properties = new Properties();properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);StreamsConfig streamsConfig = new StreamsConfig(properties);
// 配置键值对的序列化/反序列化Serdes对象Serde<String> stringSerde = Serdes.String();
// 构建流处理拓扑(用于输出)StreamsBuilder builder = new StreamsBuilder();//2、构建流处理拓扑
// 数据源处理器:从指定的topic中取出数据KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//KStream<String, String> upperStream = inputStream.peek((key, value) -> {log.info("[收集]key:{},value:{}", key, value);}).filter((key, value) -> value.length() > 5).mapValues(time -> time.toUpperCase()).peek((key, value) -> log.info("[过滤结束]key:{},value:{}", key, value));
// 日志打印upperStream处理器的数据upperStream.print(Printed.toSysOut());
// 把upperStream处理器的数据输出到指定的topic中upperStream.to(OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));//3、通过配置流处理拓扑,创建流处理实例KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
// jvm关闭时,把流也关闭CountDownLatch downLatch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread(() -> {kafkaStreams.close();downLatch.countDown();log.info("关闭流处理");}));kafkaStreams.start();log.info("启动执行!");}
}
3.2、map 使用,把内容转为key,把value转为内容长度
@Slf4j
public class KafkaStreamsYellingApp2 {private final static String APPLICATION_ID = "yelling_app_id";private final static String INPUT_TOPIC = "input-topic";private final static String OUTPUT_TOPIC = "output-topic";private final static String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) throws InterruptedException {// 配置kafka stream属性连接Properties properties = new Properties();properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);StreamsConfig streamsConfig = new StreamsConfig(properties);
// 配置键值对的序列化/反序列化Serdes对象Serde<String> stringSerde = Serdes.String();Serde<Integer> integerSerde = Serdes.Integer();
// 构建流处理拓扑(用于输出)StreamsBuilder builder = new StreamsBuilder();
// 数据源处理器:从指定的topic中取出数据KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//KStream<String, Integer> k1 = inputStream.map((noKey, value) -> KeyValue.pair(value, value.length()));k1.print(Printed.<String,Integer>toSysOut().withLabel("map"));// k1.to(OUTPUT_TOPIC, Produced.with(stringSerde, integerSerde));KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);// jvm关闭时,把流也关闭CountDownLatch downLatch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread(() -> {kafkaStreams.close();downLatch.countDown();log.info("关闭流处理");}));kafkaStreams.start();log.info("启动执行!");}}
3.3、filter和 filterNot使用过滤消息
/*** filter和 filterNot使用*/
@Slf4j
public class KafkaStreamsYellingApp3 {private final static String APPLICATION_ID = "yelling_app_id";private final static String INPUT_TOPIC = "input-topic";private final static String OUTPUT_TOPIC = "output-topic";private final static String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) throws InterruptedException {// 配置kafka stream属性连接Properties properties = new Properties();properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);StreamsConfig streamsConfig = new StreamsConfig(properties);
// 配置键值对的序列化/反序列化Serdes对象Serde<String> stringSerde = Serdes.String();Serde<Integer> integerSerde = Serdes.Integer();
// 构建流处理拓扑(用于输出)StreamsBuilder builder = new StreamsBuilder();
// 数据源处理器:从指定的topic中取出数据KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//inputStream.filter((key, value) -> (value.contains("kafka")), Named.as("filtering-processor")).print(Printed.<String,String>toSysOut().withLabel("filtering"));inputStream.filterNot((key, value) -> (value.contains("kafka")), Named.as("filtering-not-processor")).print(Printed.<String,String>toSysOut().withLabel("filtering-not"));KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);// jvm关闭时,把流也关闭CountDownLatch downLatch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread(() -> {kafkaStreams.close();downLatch.countDown();log.info("关闭流处理");}));kafkaStreams.start();log.info("启动执行!");}
}
3.4、通过selectKey配置key
@Slf4j
public class KafkaStreamsYellingApp4 {private final static String APPLICATION_ID = "yelling_app_id";private final static String INPUT_TOPIC = "input-topic";private final static String OUTPUT_TOPIC = "output-topic";private final static String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) throws InterruptedException {// 配置kafka stream属性连接Properties properties = new Properties();properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);StreamsConfig streamsConfig = new StreamsConfig(properties);
// 配置键值对的序列化/反序列化Serdes对象Serde<String> stringSerde = Serdes.String();Serde<Integer> integerSerde = Serdes.Integer();
// 构建流处理拓扑(用于输出)StreamsBuilder builder = new StreamsBuilder();
// 数据源处理器:从指定的topic中取出数据KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//inputStream.flatMap( //把一个消息的内容通过转为多条消息以map的方式返回,(key, value) -> Arrays.stream(value.split(" ")) //通过使用java的stream把内容转为map.map(e -> KeyValue.pair(e, e.length())).collect(Collectors.toList())).print(Printed.<String,Integer>toSysOut().withLabel("flatMap"));inputStream.flatMapValues( //不改变key,直接转换valuevalue -> Arrays.stream(value.split(" ")).map(String::toUpperCase).toList()).print(Printed.<String,String>toSysOut().withLabel("flatMapValues"));// 配置keyinputStream.selectKey((key, value) -> value).print(Printed.<String,String>toSysOut().withLabel("selectKey"));KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);// jvm关闭时,把流也关闭CountDownLatch downLatch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread(() -> {kafkaStreams.close();downLatch.countDown();log.info("关闭流处理");}));kafkaStreams.start();log.info("启动执行!");}}
4、复杂对象案例练习
4.1、自定义序列化
对接收topic的消息到拓扑还是发送消息到topic都需要进行序列化和反序列化
- 序列化
public class JsonSerializer<T> implements Serializer<T> {private Gson gson= new Gson();public void configure(Map<String ,?> map, boolean b) {}public byte[] serialize(String topic, T t) {return gson.toJson(t).getBytes();}@Overridepublic void close() {Serializer.super.close();}
}
- 反序列化
/*** 反序列化* @param <T>*/
public class JsonDeserializer<T> implements Deserializer<T> {private Gson gson= new Gson();private Class<T> deserializeClass;public JsonDeserializer(Class<T> deserializeClass){this.deserializeClass=deserializeClass;}public JsonDeserializer(){}@Override@SuppressWarnings("unchecked")public void configure(Map<String,?> map, boolean b){if (deserializeClass == null){deserializeClass = (Class<T>) map.get("serializedClass");}}@Overridepublic T deserialize(String topic, byte[] data) {if (data == null){return null;}return gson.fromJson(new String(data),deserializeClass);}@Overridepublic void close() {}
}
4.2、发送消息
/*** 生产消息到kafka*/
@Slf4j
public class MyProducer {private static final String BOOTSTRAP_SERVERS = "localhost:9092";private final static String TOPIC_NAME = "production-topic";public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();
// 设置参数props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
// 设置序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 连接客户端KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 5; i++) {Purchase purchase = new Purchase();purchase.setId("12431234253");purchase.setDate(new Date().toString());purchase.setName("苹果");purchase.setPrice(100.0+i);String json = new JSONObject(purchase).toString();ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, 0,"my-keyValue3", json);// 同步send(producer,producerRecord);}}/*** @param producer: 客户端对象* @return void* 同步发送* @date 2024/3/22 17:09*/private static void send(KafkaProducer<String, String> producer,ProducerRecord<String, String> producerRecord) throws InterruptedException, ExecutionException {// 等待发送成功的阻塞方法RecordMetadata metadata = producer.send(producerRecord).get();log.info("同步发送消息"+ "topic-"+metadata.topic()+"====partition:"+metadata.partition()+"=====offset:"+metadata.offset());}}
4.3、通过KafkaStreams接收复杂对象,并且使用split
split使用介绍
BranchedKStream<K, V> split(final Named named)
:split方法一个参数,该参数主要是用来定义分裂后的分支的一个统一前缀。会返回一个BranchedKStream
对象。
BranchedKStream
:主要用来使用分裂校验的
branch(Predicate<? super K, ? super V> predicate, Branched<K, V> branched)
:会接收两个参数,前面的是Predicate
用做校验判断作为判断条件的,后面的是一个Branched
对象,用作处理满足该分支条件拆分出来的消息进行处理defaultBranch(Branched<K, V> branched)
:作为结尾方法用作对不满足前面的全部branch
条件的消息,进行一个最后处理noDefaultBranch()
:结尾方法,表示对接下来的消息不做处理
Predicate
:判断验证条件,满足该条件的消息会被拆分出来到当前分支
Branched
:对分裂到当前分支的消息进行处理
as(final String name)
:给该分支添加一个名字,不做处理。withFunction()
:对该分支的消息进行处理,然后会把处理后的消息返回到结果的map中去withConsumer()
:对该分支的消息进行处理,不会把消息返回回去
- 具体代码
在测试使用前需要把几个topic先创建出来
@Slf4j
public class ZMartKafkaStreamsApp {private final static String APPLICATION_ID = "ZMart_app_id";private static final String BOOTSTRAP_SERVERS = "localhost:9092";private final static String TOPIC_NAME = "production-topic";private final static String APPLE_TOPIC_NAME = "apple-topic";private final static String WATERMELON_TOPIC_NAME = "watermelon-topic";private final static String OUT_TOPIC_NAME = "out-topic";public static void main(String[] args) throws InterruptedException {StreamsConfig streamsConfig = new StreamsConfig(getProperties());JsonSerializer<Purchase> serializer = new JsonSerializer<>();JsonDeserializer<Purchase> deserializer = new JsonDeserializer<>(Purchase.class);Serde<Purchase> propertiesSerde = Serdes.serdeFrom(serializer, deserializer);Serde<String> stringSerde = Serdes.String();StreamsBuilder builder = new StreamsBuilder();KStream<String, Purchase> inputStream = builder.stream(TOPIC_NAME, Consumed.with(stringSerde, propertiesSerde));// 谓词条件 判断把流输送到哪个分支上Predicate<String, Purchase> isWatermelon = (key, value) -> {String name = value.getName();return name.equals("西瓜");};// split:分裂流Map<String, KStream<String, Purchase>> stringKStreamMap = inputStream.peek((k,v)-> log.info("[分裂消息===>] k:{},value:{}" ,k,v)).split(Named.as("split-"))//拼接的key前缀.branch(isWatermelon, Branched.withFunction(ks -> { //对满足isWatermelon条件的分支的消息进行处理的处理器
// ks.print(Printed.<String, Purchase>toSysOut().withLabel("西瓜分支"));return ks.mapValues(v -> {String modifiedNumber = v.getId().replaceAll("(?<=\\d)\\d(?=\\d)", "*");v.setId( modifiedNumber);return v;});},"watermelon")) //这个地方需要指定name,用作为返回的map的key 如该分支存放到返回的map的key为:split-watermelon.branch((key, value) -> value.getName().equals("苹果"),Branched.withConsumer( //不满足上面的条件的会继续向下匹配,满足条件直接发送ks -> ks.peek((k, v) -> log.info("[苹果分支] value:{}" ,v)).to(APPLE_TOPIC_NAME, Produced.with(stringSerde, propertiesSerde)),"apple")).defaultBranch(Branched.withConsumer( //把不满足前面所以branch条件的消息发送到默认主题ks -> ks.to(OUT_TOPIC_NAME, Produced.with(stringSerde, propertiesSerde)),"defaultBranch"));// 把上面的消息打印出来stringKStreamMap.forEach((s, kStream) ->kStream.print(Printed.<String, Purchase>toSysOut().withLabel(s)));KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
// jvm关闭时,把流也关闭CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread(() -> {kafkaStreams.close();latch.countDown();log.info("The Kafka Streams 执行关闭!");}));kafkaStreams.start();log.info("kafka streams 启动成功!>>>>");latch.await();}@NotNullprivate static Properties getProperties() {Properties properties = new Properties();properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);return properties;}
}