输出到MySQL
添加依赖
< dependency> < groupId> org.apache.flink</ groupId> < artifactId> flink-connector-jdbc</ artifactId> < version> 3.1.0-1.17</ version>
</ dependency>
< dependency> < groupId> com.mysql</ groupId> < artifactId> mysql-connector-j</ artifactId> < version> 8.0.32</ version>
</ dependency>
启动MySQL, 在test库下建表clicks
CREATE TABLE ` clicks` ( ` user` VARCHAR ( 100 ) NOT NULL , ` url` VARCHAR ( 100 ) DEFAULT NULL , ` ts` BIGINT DEFAULT NULL
) ENGINE = INNODB DEFAULT CHARSET = utf8
示例代码
public class Flink04_JdbcSink { public static void main ( String [ ] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ( ) ; env. setParallelism ( 1 ) ; DataStreamSource < Event > ds = Flink06_EventSource . getEventSource ( env) ; SinkFunction < Event > sink = JdbcSink . sink ( "insert into clicks(user, url, ts) values (?,?,?)" , new JdbcStatementBuilder < Event > ( ) { @Override public void accept ( PreparedStatement preparedStatement, Event event) throws SQLException { preparedStatement. setString ( 1 , event. getUser ( ) ) ; preparedStatement. setString ( 2 , event. getUrl ( ) ) ; preparedStatement. setLong ( 3 , event. getTs ( ) ) ; } } , JdbcExecutionOptions . builder ( ) . withBatchSize ( 5 ) . withBatchIntervalMs ( 10000 ) . withMaxRetries ( 3 ) . build ( ) , new JdbcConnectionOptions. JdbcConnectionOptionsBuilder ( ) . withDriverName ( "com.mysql.cj.jdbc.Driver" ) . withUsername ( "root" ) . withPassword ( "000000" ) . withUrl ( "jdbc:mysql://hadoop102:3306/flink" ) . build ( ) ) ; ds. addSink ( sink) ; try { env. execute ( ) ; } catch ( Exception e) { throw new RuntimeException ( e) ; } }
}
MySQL的幂等性处理
将插入关键字替换为replace,如果主键重复,将除了主键外的所有字段都替换。 使用on duplicate key update 字段名 = values(字段名)语法,如果主键重复,可以选择部分字段进行替换,其余字段保持不变。 示例代码
public class Flink05_JdbcSinkReplace { public static void main ( String [ ] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ( ) ; env. setParallelism ( 1 ) ; DataStreamSource < Event > ds = Flink06_EventSource . getEventSource ( env) ; SingleOutputStreamOperator < WordCount > countDs = ds. map ( event -> new WordCount ( event. getUrl ( ) , 1 ) ) . keyBy ( WordCount :: getWord ) . sum ( "count" ) ; SinkFunction < WordCount > sink = JdbcSink . sink (
"insert into url_count(url, cnt) values(?,?) on duplicate key update cnt = values(cnt)" , new JdbcStatementBuilder < WordCount > ( ) { @Override public void accept ( PreparedStatement preparedStatement, WordCount wordCount) throws SQLException { preparedStatement. setString ( 1 , wordCount. getWord ( ) ) ; preparedStatement. setInt ( 2 , wordCount. getCount ( ) ) ; } } , JdbcExecutionOptions . builder ( ) . withBatchSize ( 5 ) . withBatchIntervalMs ( 10000 ) . withMaxRetries ( 3 ) . build ( ) , new JdbcConnectionOptions. JdbcConnectionOptionsBuilder ( ) . withDriverName ( "com.mysql.cj.jdbc.Driver" ) . withUsername ( "root" ) . withPassword ( "000000" ) . withUrl ( "jdbc:mysql://hadoop102:3306/flink" ) . build ( ) ) ; countDs. addSink ( sink) ; try { env. execute ( ) ; } catch ( Exception e) { throw new RuntimeException ( e) ; } }
}