添加依赖 < dependency> < groupId> </ groupId> < artifactId> </ artifactId> < version> </ version> </ dependency> < dependency> < groupId> </ groupId> < artifactId> </ artifactId> < version> </ version> </ dependency> 启动MySQL, 在test库下建表clicks CREATE  TABLE  ` clicks` ( ` user` VARCHAR ( 100 )  NOT  NULL , ` url` VARCHAR ( 100 )  DEFAULT  NULL , ` ts` BIGINT  DEFAULT  NULL 
)  ENGINE = INNODB  DEFAULT  CHARSET = utf8
示例代码 public  class  Flink04_JdbcSink  { public  static  void  main ( String [ ]  args)  { StreamExecutionEnvironment  env =  StreamExecutionEnvironment . getExecutionEnvironment ( ) ; env. setParallelism ( 1 ) ; DataStreamSource < Event > =  Flink06_EventSource . getEventSource ( env) ; SinkFunction < Event > =  JdbcSink . sink ( "insert into clicks(user, url, ts) values (?,?,?)" ,  new  JdbcStatementBuilder < Event > ( )  { @Override public  void  accept ( PreparedStatement  preparedStatement,  Event  event)  throws  SQLException  { preparedStatement. setString ( 1 ,  event. getUser ( ) ) ; preparedStatement. setString ( 2 ,  event. getUrl ( ) ) ; preparedStatement. setLong ( 3 ,  event. getTs ( ) ) ; } } , JdbcExecutionOptions . builder ( ) . withBatchSize ( 5 ) . withBatchIntervalMs ( 10000 ) . withMaxRetries ( 3 ) . build ( ) , new  JdbcConnectionOptions. JdbcConnectionOptionsBuilder ( ) . withDriverName ( "com.mysql.cj.jdbc.Driver" ) . withUsername ( "root" ) . withPassword ( "000000" ) . withUrl ( "jdbc:mysql://hadoop102:3306/flink" ) . build ( ) ) ; ds. addSink ( sink) ; try  { env. execute ( ) ; }  catch  ( Exception  e)  { throw  new  RuntimeException ( e) ; } } 
} 
将插入关键字替换为replace,如果主键重复,将除了主键外的所有字段都替换。 使用on duplicate key update 字段名 = values(字段名)语法,如果主键重复,可以选择部分字段进行替换,其余字段保持不变。 示例代码 public  class  Flink05_JdbcSinkReplace  { public  static  void  main ( String [ ]  args)  { StreamExecutionEnvironment  env =  StreamExecutionEnvironment . getExecutionEnvironment ( ) ; env. setParallelism ( 1 ) ; DataStreamSource < Event > =  Flink06_EventSource . getEventSource ( env) ; SingleOutputStreamOperator < WordCount > = ds. map ( event ->  new  WordCount ( event. getUrl ( ) ,  1 ) ) . keyBy ( WordCount :: getWord ) . sum ( "count" ) ; SinkFunction < WordCount > =  JdbcSink . sink ( 
"insert into url_count(url, cnt) values(?,?) on duplicate key update cnt = values(cnt)" , new  JdbcStatementBuilder < WordCount > ( )  { @Override public  void  accept ( PreparedStatement  preparedStatement,  WordCount  wordCount)  throws  SQLException  { preparedStatement. setString ( 1 ,  wordCount. getWord ( ) ) ; preparedStatement. setInt ( 2 ,  wordCount. getCount ( ) ) ; } } , JdbcExecutionOptions . builder ( ) . withBatchSize ( 5 ) . withBatchIntervalMs ( 10000 ) . withMaxRetries ( 3 ) . build ( ) , new  JdbcConnectionOptions. JdbcConnectionOptionsBuilder ( ) . withDriverName ( "com.mysql.cj.jdbc.Driver" ) . withUsername ( "root" ) . withPassword ( "000000" ) . withUrl ( "jdbc:mysql://hadoop102:3306/flink" ) . build ( ) ) ; countDs. addSink ( sink) ; try  { env. execute ( ) ; }  catch  ( Exception  e)  { throw  new  RuntimeException ( e) ; } } 
}