当flink消费kafka时,只需要简单配置就能使用并正常运行
val env = StreamExecutionEnvironment.getExecutionEnvironmentval props = new Properties()props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.30:9092")props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_test")val consumer1 = new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), props)val stream1 = env.addSource(consumer1)stream1.print()env.execute("KafkaSourceStreaming")
但是,这里用的是最简单的SimpleStringSchema,所以接收到的数据只是我们所理解的一条消息里的值,其包含的时间戳、offset、topic、partition等元信息都不能正常获取,当需要该部分信息时,可以利用KafkaDeserializationSchema
接口来实现自定义的反序列化逻辑。
object KafkaSourceStreaming {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval props = new Properties()props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.30:9092")props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_test")val consumer1 = new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), props)val stream1 = env.addSource(consumer1)stream1.print()val consumer = new FlinkKafkaConsumer("mytest",new CustomKafkaDeserializationSchema(), props)val stream = env.addSource(consumer)stream.print()env.execute("KafkaSourceStreaming")}/*** 获取kafka元数据信息*/class CustomKafkaDeserializationSchema extends KafkaDeserializationSchema[ConsumerRecord[String, String]] {override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[String, String] = {val key = if (record.key() == null) null else new String(record.key())val value = new String(record.value())new ConsumerRecord[String, String](record.topic(),record.partition(),record.offset(),record.timestamp(),record.timestampType(),record.checksum(),record.serializedKeySize(),record.serializedValueSize(),key,value,record.headers(),record.leaderEpoch())}override def isEndOfStream(nextElement: ConsumerRecord[String, String]): Boolean = falseoverride def getProducedType: TypeInformation[ConsumerRecord[String, String]] = {TypeInformation.of(new TypeHint[ConsumerRecord[String, String]]() {})}}
}