场景:从kafka读数据,通过jdbc写入mysql
示例:
#往kafka测试主题写入数据
kafka-console-producer.sh --broker-list wh01t:21007 --topic ypg_test --producer.config /client/Kafka/kafka/config/producer.properties
–创建mysql测试表
– dsg.test definition
CREATE TABLE test
(
id
varchar(50) NOT NULL,
c_date
date DEFAULT NULL,
PRIMARY KEY (id
)
) ;
flink主类:
package com.pinko.testcaseimport com.security.InitKafkaUtil
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011import scala.collection.JavaConverters.seqAsJavaListConverter/* 测试 */
object Test05FromKafkaToMysql {def main(args: Array[String]): Unit = {val prop = InitKafkaUtil.initPros()InitKafkaUtil.securityPrepare// 加载执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//参数消费多个topicval topics: List[String] = List("ypg_test")val kafkaConsumer = new FlinkKafkaConsumer011[String](topics.asJava, new SimpleStringSchema(), prop)
// val kafka = env.fromElements("ypghello", "ypgworld")println("flink环境加载完成,开始处理数据...")/* kafka消息处理逻辑 */val kafka = env.addSource(kafkaConsumer)kafka.print()kafka.addSink(new MysqlSink())env.execute("Test05FromKafka")}
}
package com.pinko.testcaseimport org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}import java.sql.{Connection, DriverManager, PreparedStatement}class MysqlSink extends RichSinkFunction[String] {var conn: Connection = _var ps: PreparedStatement = _override def open(parameters: Configuration): Unit = {val conn_str = "jdbc:mysql://10.22.33.44:2883/testdb|test|test#123";val conns = conn_str.split("\\|")val url: String = conns(0)val username: String = conns(1)val password: String = conns(2)conn = DriverManager.getConnection(url, username, password)println(conn)}override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {val sql = s"insert into test values ('$value', now()) on duplicate key update id = values(id),c_date = values(c_date)"println(sql)ps = conn.prepareStatement(sql)val rowsAffected = ps.executeUpdate()if (rowsAffected > 0) {println("更新成功")} else {println("没有进行更新操作")}}override def close(): Unit = {ps.close()conn.close()}
}