flink stream数据 动态写入多个topic
flink1.15之前
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecordobject DynamicKafkaProducer {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 数据流中的元素类型为 (topic: String, message: String)val stream: DataStream[(String, String)] = ...// 定义 Kafka 序列化器val kafkaSerializationSchema = new KafkaSerializationSchema[(String, String)] {override def serialize(element: (String, String), timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {new ProducerRecord(element._1, element._2.getBytes("UTF-8"))}}// 创建 FlinkKafkaProducer 实例val kafkaProducer = new FlinkKafkaProducer[(String, String)]("localh