文章目录
- 前言
- 一、自定义Flink SourceFunction定时读取数据库
- 二、java代码实现
- 总结
前言
Source 是Flink获取数据输入的地方,可以用StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source,也可以通过实现继承 RichSourceFunction 类编写自定义的 sources。Flink提供了多种预定义的 stream source:基于文件、 套接字、集合等source;但没用提供数据库相关的Source。
一、自定义Flink SourceFunction定时读取数据库
有些场景需要定时的读取不断变化的数据库数据作为流数据。本文中的代码实现适用于所有关系数据库。
- 在构造方法中传递数据库连接参数、定时周期等信息
- run:在run中定时读取数据库数据并emit到发送到下一节点。
- cancel: 取消一个 source,running状态改为false将 run 中的循环 emit 元素的行为终止。
二、java代码实现
/*** 关系库流数据源 **/
public class DbSourceFunction extends RichSourceFunction<Row> {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(DbSourceFunction.class);private volatile boolean isRunning = true;private String driver = null;//执行周期(秒)private Long period = null;private JSONObject conf;private DataBaseType baseType;public DbFullSourceFunction(JSONObject conf, DataBaseType baseType) {this.conf = conf;this.baseType = baseType;this.driver = baseType.getDriverClassName();// 执行周期period = conf.getLong("period");//周期单位String unit = conf.getString("executionWay", "seconds");if (period != null && period > 0) {//根据时间单位转换为秒period = FuntionUtil.getSeconds(unit, period);}}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);}@Overridepublic void run(SourceContext<Row> ctx) throws Exception {while (isRunning) {String querySql = conf.getString(Key.QUERY_SQL);List<JSONObject> columnList = conf.getList(Key.COLUMN);int len = columnList.size();Connection connect = null;PreparedStatement ps = null;ResultSet rs = null;try {while (connect == null) {try {connect = getConnection();if (connect != null) {break;}} catch (Exception w) {LOG.error("获取连接异常", w.getMessage());}}ps = connect.prepareStatement(querySql);try {rs = ps.executeQuery();while (rs.next()) {Row row = new Row(len);for (int i = 0; i < len; i++) {JSONObject column = columnList.get(i);Integer columnType = column.getInt(Key.COLUMN_TYPE);//将ResultSet数据转换为Flink RowRowSetFieldUtil.rowSetFieldResultSet(row, rs, i, columnType, baseType);}// 发送结果ctx.collect(row);}} catch (Exception e) {LOG.error("查询出现异常",e);if (ps != null) {ps.close();}if (connect != null) {connect.close();}}} catch (Exception e) {LOG.error("查询数据异常", e);throw e;} finally {if (rs != null) {rs.close();}if (ps != null) {ps.close();}if (connect != null) {connect.close();}}if (period == null || period <= 0) {isRunning = false;} else {Long takeTime = (end - start) / 1000;//去掉执行消耗时间LOG.error("sleep time:" + (period - takeTime));TimeUnit.SECONDS.sleep(period - takeTime);}}}@Overridepublic void cancel() {isRunning = false;}private Connection getConnection() {Connection connection = null;try {String username = conf.getString(Key.USERNAME);String password = conf.getString(Key.PASSWORD);password = PubFunction.decryptStr(password);String jdbcUrl = conf.getString(String.format("%s[0]", Key.JDBC_URL));// 创建连接connection = DriverManager.getConnection(jdbcUrl, username, password);} catch (Exception e) {LOG.error("get connection occur exception", e);throw new RuntimeException("get connection occur exception", e);}return connection;}
}
总结
完整代码请点击下载自定义Flink SourceFunction定时读取数据库java代码下载。