Sink
将计算好结果输出到外部系统, 调用 addSink()
传入指定的SinkFunction()
- 将结果输出到 Kafka 中
- 将结果输出到 Redis 中
- 将结果输出到 ES 中
- 将结果输出到 Mysql 中: 事先创建好表结构
pom.xml 事先导入对应的 connector:
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><!--依赖的一些组件需要 Scala 环境--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency><!--kafka依赖--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.12</artifactId><version>1.10.1</version></dependency><!--rabbit依赖--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-rabbitmq --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq_2.12</artifactId><version>1.10.1</version></dependency><!--flink 数据存入 redis--><!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch-base --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.10.1</version></dependency><!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version></dependency></dependencies>
实操代码如下:
import com.regotto.entity.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.HashMap;/*** @author regotto*/
public class SinkTest {private static void saveToRedis(DataStream<SensorReading> dataStream) {FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder();builder.setHost("localhost");// 顶级接口 SinkFunction, 核心方法 invokedataStream.addSink(new RedisSink<>(builder.build(), new RedisMapper<SensorReading>() {/*** 将温度数据保存为 id-temperature hash 形式到 redis* @return*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "sensor");}@Overridepublic String getKeyFromData(SensorReading sensorReading) {return sensorReading.getId();}@Overridepublic String getValueFromData(SensorReading sensorReading) {return sensorReading.getTemperature().toString();}}));}private static void saveToKafka(DataStream<SensorReading> dataStream) {// 将数据输出到 Kafka 中dataStream.map((MapFunction<SensorReading, String>) value -> value.toString()).addSink(new FlinkKafkaProducer011<String>("localhost:9092", "test", new SimpleStringSchema()));}private static void saveToEs(DataStream<SensorReading> dataStream) {// 将数据输出到 ElasticSearchArrayList<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("localhost", 9200));//真正的 SinkFunction 是 ElasticsearchSink(使用构建者构建), ElasticsearchSinkFunction 只是负责处理以哪种方式存入dataStream.addSink(new ElasticsearchSink.Builder<>(httpHosts, (ElasticsearchSinkFunction<SensorReading>) (sensorReading, runtimeContext, requestIndexer) -> {HashMap<String, String> source = new HashMap<>();source.put("id", sensorReading.getId());source.put("temp", sensorReading.getTemperature().toString());source.put("time", sensorReading.getTimestamp().toString());IndexRequest indexRequest = Requests.indexRequest().index("sensor").type("readingData").source(source);requestIndexer.add(indexRequest);}).build());}private static void saveToMysql(DataStream<SensorReading> dataStream) {/*由于性能问题, 官方未提供 mysqlSink, 将数据存入 mysql, 自定义 sinkjdbc 要连接处理, 使用 RichSinkFunction, 利用 open, close 方法*/dataStream.addSink(new RichSinkFunction<SensorReading>() {Connection connection = null;PreparedStatement insertStatement = null;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.jdbc.Driver");connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456");insertStatement = connection.prepareStatement("insert into sensorreading (id, timestamp, temperature)values(?,?,?)");}@Overridepublic void invoke(SensorReading value, Context context) throws Exception {insertStatement.setString(1, value.getId());insertStatement.setLong(2, value.getTimestamp());insertStatement.setDouble(3, value.getTemperature());insertStatement.execute();}@Overridepublic void close() throws Exception {insertStatement.close();connection.close();}});}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> input = env.readTextFile("sensor.txt");DataStream<SensorReading> dataStream = input.map((MapFunction<String, SensorReading>) value -> {String[] split = value.split(",");return new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));});saveToMysql(dataStream);env.execute();}
}
总结
进行数据存储时, 指定对应 SinkFunction 即可.