【Flink-Kafka-To-Hive】使用 Flink 实现 Kafka 数据写入 Hive

【Flink-Kafka-To-Hive】使用 Flink 实现 Kafka 数据写入 Hive

  • 1)导入相关依赖
  • 2)代码实现
    • 2.1.resources
      • 2.1.1.appconfig.yml
      • 2.1.2.log4j.properties
      • 2.1.3.log4j2.xml
      • 2.1.4.flink_backup_local.yml
    • 2.2.utils
      • 2.2.1.DBConn
      • 2.2.2.CommonUtils
    • 2.3.conf
      • 2.3.1.ConfigTools
    • 2.4.po
      • 2.4.1.SchemaPo
    • 2.5.kafka2hive
      • 2.5.1.Kafka2Hive-ODS

需求描述:

1、数据从 Kafka 写入 Hive。

2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。

3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。

4、Flink 集成 Kafka 写入 Hive 需要进行 checkpoint 才能落盘至 HDFS。

5、先在 Hive 中创建表然后动态获取 Hive 的表结构。

6、Kafka 数据为 Json 格式,通过 FlatMap 扁平化处理后,根据表结构封装到 Row 中后完成写入。

7、写入时转换成临时视图模式,利用 Flink-Sql 实现数据写入。

8、本地测试时 Hive 相关文件要放置到 resources 目录下。

9、本地测试时可以编辑 resources.flink_backup_local.yml 通过 ConfigTools.initConf 方法获取配置。

1)导入相关依赖

这里的依赖比较冗余,大家可以根据各自需求做删除或保留。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>example.cn.test</groupId><artifactId>kafka2hive</artifactId><version>1.0.0</version><properties><hbase.version>2.3.3</hbase.version><hadoop.version>3.1.1</hadoop.version><spark.version>3.0.2</spark.version><scala.version>2.12.10</scala.version><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.14.0</flink.version><scala.binary.version>2.12</scala.binary.version><target.java.version>1.8</target.java.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.2</log4j.version><hadoop.version>3.1.2</hadoop.version><hive.version>3.1.2</hive.version></properties><dependencies><dependency><groupId>gaei.cn.x5l.bigdata.common</groupId><artifactId>x5l-bigdata-common</artifactId><version>1.1-SNAPSHOT</version><exclusions><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId></exclusion></exclusions></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-dist --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-dist_2.12</artifactId><version>1.14.0-csa1.7.0.0</version><scope>provided</scope><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.jyaml</groupId><artifactId>jyaml</artifactId><version>1.3</version></dependency><dependency><groupId>gaei.cn.x5l</groupId><artifactId>tsp-gb-decode</artifactId><version>1.0.0</version><exclusions><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId></exclusion></exclusions></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version><scope>runtime</scope></dependency><dependency><groupId>gaei.cn.x5l.flink.common</groupId><artifactId>x5l-flink-common</artifactId><version>1.2-SNAPSHOT</version><scope>compile</scope><exclusions><exclusion><artifactId>slf4j-api</artifactId><groupId>org.slf4j</groupId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-1.2-api</artifactId></exclusion></exclusions></dependency><!-- Flink Dependency --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.12</artifactId><version>1.14.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-3</artifactId><version>3.1.1.7.2.8.0-224-9.0</version><scope>provided</scope><exclusions><exclusion><artifactId>slf4j-log4j12</artifactId><groupId>org.slf4j</groupId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion></exclusions></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.10</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.4.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.11.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- 基础依赖  开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 基础依赖  结束--><!-- TABLE  开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>1.14.0</version><scope>provided</scope></dependency><!-- 使用 hive sql时注销,其他时候可以放开 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- TABLE  结束--><!-- sql  开始--><!-- sql解析 开始 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- sql解析 结束 --><!-- sql连接 kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- sql  结束--><!-- 检查点 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-state-processor-api_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.5</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 本地监控任务 结束 --><!-- DataStream 开始 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><!-- hdfs --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.1</version></dependency><!-- 重点,容易被忽略的jar --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-auth</artifactId><version>${hadoop.version}</version></dependency><!-- rocksdb_2 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 其他 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.23</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version><scope>provided</scope></dependency><!-- kafka2mongo 离线任务 --><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver</artifactId><version>3.12.6</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude><exclude>org.apache.flink:flink-runtime-web_2.11</exclude></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.owp.flink.kafka.KafkaSourceDemo</mainClass></transformer><!-- flink sql 需要  --><!-- The service transformer is needed to merge META-INF/services files --><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><!-- ... --></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.0.0,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build><repositories><repository><id>cdh.releases.repo</id><url>https://repository.cloudera.com/artifactory/libs-release-local/</url><name>Releases Repository</name></repository></repositories>
</project>

2)代码实现

2.1.resources

2.1.1.appconfig.yml

mysql.url: "jdbc:mysql://1.1.1.1:3306/test?useSSL=false"
mysql.username: "test"
mysql.password: "123456"
mysql.driver: "com.mysql.jdbc.Driver"

2.1.2.log4j.properties

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

2.1.3.log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="5"><Properties><property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" /><property name="LOG_LEVEL" value="ERROR" /></Properties><appenders><console name="console" target="SYSTEM_OUT"><PatternLayout pattern="${LOG_PATTERN}"/><ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/></console><File name="log" fileName="tmp/log/job.log" append="false"><PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/></File></appenders><loggers><root level="${LOG_LEVEL}"><appender-ref ref="console"/><appender-ref ref="log"/></root></loggers>
</configuration>

2.1.4.flink_backup_local.yml

hdfs:checkPointPath: 'hdfs://nameserver/user/flink/rocksdbcheckpoint'checkpointTimeout: 360000checkpointing: 300000maxConcurrentCheckpoints: 1minPauseBetweenCheckpoints: 10000restartInterval: 60restartStrategy: 3
hive:defaultDatabase: 'ods'hiveConfDir: 'D:/WorkSpace/bigdata-flink-backup/kafka2hive/src/main/resources/'sourceTopic: 'topicA,topicB'tableName: 'table_name'
kafka-consumer:prop:auto.offset.reset: 'earliest'bootstrap.servers: 'kfk01:9092,kfk02:9092,kfk03:9092'enable.auto.commit: 'false'fetch.max.bytes: '52428700'group.id: 'test'isKerberized: '1'keytab: 'D:/keytab/test.keytab'krb5Conf: 'D:/keytab/krb5.conf'max.poll.interval.ms: '300000'max.poll.records: '1000'principal: 'test@PRE.TEST.COM'security_protocol: 'SASL_PLAINTEXT'serviceName: 'kafka'session.timeout.ms: '600000'useTicketCache: 'false'topics: 'topicA,topicB'
kafka-producer:defaultTopic: 'kafka2hive_error'prop:acks: 'all'batch.size: '1048576'bootstrap.servers: 'kfk01:9092,kfk02:9092,kfk03:9092'compression.type: 'lz4'key.serializer: 'org.apache.kafka.common.serialization.StringSerializer'retries: '3'value.serializer: 'org.apache.kafka.common.serialization.StringSerializer'

2.2.utils

2.2.1.DBConn

import java.sql.*;public class DBConn {private static final String driver = "com.mysql.jdbc.Driver";		//mysql驱动private static Connection conn = null;private static PreparedStatement ps = null;private static ResultSet rs = null;private static final CallableStatement cs = null;/*** 连接数据库* @return*/public static Connection conn(String url,String username,String password) {Connection conn = null;try {Class.forName(driver);  //加载数据库驱动try {conn = DriverManager.getConnection(url, username, password);  //连接数据库} catch (SQLException e) {e.printStackTrace();}} catch (ClassNotFoundException e) {e.printStackTrace();}return conn;}/*** 关闭数据库链接* @return*/public static void close() {if(conn != null) {try {conn.close();  //关闭数据库链接} catch (SQLException e) {e.printStackTrace();}}}
}

2.2.2.CommonUtils

@Slf4j
public class CommonUtils {public static StreamExecutionEnvironment setCheckpoint(StreamExecutionEnvironment env) throws IOException {
//        ConfigTools.initConf("local");Map hdfsMap = (Map) ConfigTools.mapConf.get("hdfs");env.enableCheckpointing(((Integer) hdfsMap.get("checkpointing")).longValue(), CheckpointingMode.EXACTLY_ONCE);//这里会造成offset提交的延迟env.getCheckpointConfig().setMinPauseBetweenCheckpoints(((Integer) hdfsMap.get("minPauseBetweenCheckpoints")).longValue());env.getCheckpointConfig().setCheckpointTimeout(((Integer) hdfsMap.get("checkpointTimeout")).longValue());env.getCheckpointConfig().setMaxConcurrentCheckpoints((Integer) hdfsMap.get("maxConcurrentCheckpoints"));env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.fixedDelayRestart((Integer) hdfsMap.get("restartStrategy"), // 尝试重启的次数,不宜过小,分布式任务很容易出问题(正常情况),建议3-5次Time.of(((Integer) hdfsMap.get("restartInterval")).longValue(), TimeUnit.SECONDS) // 延时));//设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);//设置状态后端存储方式env.setStateBackend(new RocksDBStateBackend((String) hdfsMap.get("checkPointPath"), true));
//        env.setStateBackend(new FsStateBackend((String) hdfsMap.get("checkPointPath"), true));
//        env.setStateBackend(new HashMapStateBackend(());return env;}public static FlinkKafkaConsumer<ConsumerRecord<String, String>> getKafkaConsumer(Map<String, Object> kafkaConf) throws IOException {String[] topics = ((String) kafkaConf.get("topics")).split(",");log.info("监听的topic: {}", topics);Properties properties = new Properties();Map<String, String> kafkaProp = (Map<String, String>) kafkaConf.get("prop");for (String key : kafkaProp.keySet()) {properties.setProperty(key, kafkaProp.get(key).toString());}if (!StringUtils.isBlank((String) kafkaProp.get("isKerberized")) && "1".equals(kafkaProp.get("isKerberized"))) {System.setProperty("java.security.krb5.conf", kafkaProp.get("krb5Conf"));properties.put("security.protocol", kafkaProp.get("security_protocol"));properties.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "+ "useTicketCache=" + kafkaProp.get("useTicketCache") + " "+ "serviceName=\"" + kafkaProp.get("serviceName") + "\" "+ "useKeyTab=true "+ "keyTab=\"" + kafkaProp.get("keytab").toString() + "\" "+ "principal=\"" + kafkaProp.get("principal").toString() + "\";");}properties.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");properties.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");FlinkKafkaConsumer<ConsumerRecord<String, String>> consumerRecordFlinkKafkaConsumer = new FlinkKafkaConsumer<ConsumerRecord<String, String>>(Arrays.asList(topics), new KafkaDeserializationSchema<ConsumerRecord<String, String>>() {@Overridepublic TypeInformation<ConsumerRecord<String, String>> getProducedType() {return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() {});}@Overridepublic boolean isEndOfStream(ConsumerRecord<String, String> stringStringConsumerRecord) {return false;}@Overridepublic ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {return new ConsumerRecord<String, String>(record.topic(),record.partition(),record.offset(),record.timestamp(),record.timestampType(),record.checksum(),record.serializedKeySize(),record.serializedValueSize(),new String(record.key() == null ? "".getBytes(StandardCharsets.UTF_8) : record.key(), StandardCharsets.UTF_8),new String(record.value() == null ? "{}".getBytes(StandardCharsets.UTF_8) : record.value(), StandardCharsets.UTF_8));}}, properties);return consumerRecordFlinkKafkaConsumer;}
}

2.3.conf

2.3.1.ConfigTools

@Slf4j
public class ConfigTools {public static Map<String, Object> mapConf;/*** 获取对应的配置文件** @param option*/public static void initConf(String option) {String confFile = "/flink_backup_" + option + ".yml";try {InputStream dumpFile = ConfigTools.class.getResourceAsStream(confFile);mapConf = Yaml.loadType(dumpFile, HashMap.class);} catch (Exception e) {e.printStackTrace();}}/*** 获取对应的配置文件** @param option*/public static void initMySqlConf(String option, Class clazz) {String className = clazz.getName();String confFile = "/appconfig.yml";Map<String, String> mysqlConf;try {InputStream dumpFile = ConfigTools.class.getResourceAsStream(confFile);mysqlConf = Yaml.loadType(dumpFile, HashMap.class);String username = mysqlConf.get("mysql.username");String password = mysqlConf.get("mysql.password");String url = mysqlConf.get("mysql.url");Connection conn = DBConn.conn(url, username, password);Map<String, Object> config = getConfig(conn, className, option);if (config == null || config.size() == 0) {log.error("获取配置文件失败");return;}mapConf = config;} catch (Exception e) {e.printStackTrace();}}private static Map<String, Object> getConfig(Connection conn, String className, String option) throws SQLException {PreparedStatement preparedStatement = null;try {String sql = "select config_context from app_config where app_name = '%s' and config_name = '%s'";preparedStatement = conn.prepareStatement(String.format(sql, className, option));ResultSet rs = preparedStatement.executeQuery();Map<String, String> map = new LinkedHashMap<>();String config_context = "";while (rs.next()) {config_context = rs.getString("config_context");}System.out.println("配置信息config_context:"+config_context);
//            if(StringUtils.isNotBlank(config_context)){
//                System.out.println(JSONObject.toJSONString(JSONObject.parseObject(config_context), SerializerFeature.PrettyFormat));
//            }Map<String, Object> mysqlConfMap = JSON.parseObject(config_context, Map.class);return mysqlConfMap;}finally {if (preparedStatement != null) {preparedStatement.close();}if (conn != null) {conn.close();}}}public static void main(String[] args) {
//        initMySqlConf("local", TboxPeriodBackoutA3K.class);initConf("local");String s = JSON.toJSONString(mapConf);System.out.println(s);}
}

2.4.po

2.4.1.SchemaPo

/*** 字段属性对象*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SchemaPo implements Serializable {private String signal;private String type;
}

2.5.kafka2hive

2.5.1.Kafka2Hive-ODS

从 Kafka 中获取到的数据不做任何处理直接写入到 Hive 的 ODS 层

public class Kafka2Hive_ODS {public static Logger logger = Logger.getLogger(Kafka2Hive_ODS.class);public static void main(String[] args) throws Exception {ConfigTools.initMySqlConf(args[0], AcpBackoutAll_X9E_ORIGINAL.class);Map<String, Object> mapConf = ConfigTools.mapConf;Map<String, Object> kafkaConsumerConf = (Map<String, Object>) mapConf.get("kafka-consumer");Map<String, Object> hiveConf = (Map<String, Object>) mapConf.get("hive");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().disableOperatorChaining();CommonUtils.setCheckpoint(env);EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);StreamStatementSet statementSet = tableEnv.createStatementSet();// 使用Hive的sql方言tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);//自定义一个名字(没有限制随意取)String name = "myhive";//这里需要配置hive表中的默认库(不是mysql的hive元数据库)String defaultDatabase = "ods";//hive-site.xml文件目录String hiveConfDir = (String) hiveConf.get("hiveConfDir");//创建HiveCatalogHiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, "3.1.2");//注册HiveCatalogtableEnv.registerCatalog("myhive", hive);//使用HiveCatalogtableEnv.useCatalog("myhive");//        FlinkKafkaConsumer<String> myConsumer = CommonUtils.getKafkaConsumer();FlinkKafkaConsumer<ConsumerRecord<String, String>> myConsumer = CommonUtils.getKafkaConsumer(kafkaConsumerConf);DataStream<ConsumerRecord<String, String>> stream = env.addSource(myConsumer);String tableName = (String) hiveConf.get("tableName");List<FieldSchema> schemas = hive.getHiveTable(new ObjectPath(defaultDatabase, tableName)).getSd().getCols();List<FieldSchema> partitionKeys = hive.getHiveTable(new ObjectPath(defaultDatabase, tableName)).getPartitionKeys();schemas.addAll(partitionKeys);List<SchemaPo> schemaPos = new ArrayList<>();List<String> fieldLists = new ArrayList<>();List<TypeInformation> typeList = new ArrayList<>();for (FieldSchema schema : schemas) {SchemaPo schemaPo = new SchemaPo();schemaPo.setSignal(schema.getName());schemaPo.setType(schema.getType());schemaPos.add(schemaPo);fieldLists.add(schema.getName());String type = schema.getType();if (type.equalsIgnoreCase("bigint")) {typeList.add(Types.LONG);} else {typeList.add(Types.STRING);}}String[] fieldNames = fieldLists.toArray(new String[fieldLists.size()]);TypeInformation[] types = typeList.toArray(new TypeInformation[typeList.size()]);SingleOutputStreamOperator<Row> originalRow = stream.flatMap(new KafkaMsgFormatFunction(schemaPos), new RowTypeInfo(types, fieldNames)).uid("ORIGINAL");tableEnv.createTemporaryView("originalRow", originalRow);StringBuilder sql = new StringBuilder();tableEnv.executeSql("alter table table_name set TBLPROPERTIES ('sink.partition-commit.policy.kind'='metastore')");sql.append("insert into `table_name` select ");sql.append(" * ");sql.append(" from `myhive`.`ods`.originalRow");tableEnv.executeSql(sql.toString());
//        env.execute();}static class KafkaMsgFormatFunction extends RichFlatMapFunction<ConsumerRecord<String,String>, Row> {private List<SchemaPo> schemaPos;public KafkaMsgFormatFunction(List<SchemaPo> schemaPos) {this.schemaPos = schemaPos;}@Overridepublic void open(Configuration parameters) {}@Overridepublic void flatMap(ConsumerRecord<String,String> record, Collector<Row> out) {String key = null;try {HashMap<String, Object> infoMap = JSON.parseObject((String) record.value(), HashMap.class);for (String signalkey : infoMap.keySet()) {resultMap.put(signalkey.toLowerCase(), String.valueOf(infoMap.get(signalkey)));}Row row = new Row(schemaPos.size());for (int i = 0; i < schemaPos.size(); i++) {SchemaPo schemaPo = schemaPos.get(i);String v = resultMap.get(schemaPo.getSignal());if (StringUtils.isBlank(v)) {row.setField(i, null);continue;}if ("bigint".equalsIgnoreCase(schemaPo.getType())) {Long svalue = Long.valueOf(resultMap.get(schemaPo.getSignal()));row.setField(i, svalue);} else {String svalue = resultMap.get(schemaPo.getSignal());row.setField(i, svalue);}}out.collect(row);} catch (Exception e) {e.printStackTrace();}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/236704.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Java】【SQL】DATE_FORMAT函数详解

在实际应用开发中&#xff0c;使用sql语句也属于开发者的一部分&#xff0c;这次来说说DATE_FORMAT函数。 引言&#xff1a;实际上在使用Java开发过程中&#xff0c;有很多业务场景下&#xff0c;都有时间类型的参数参与。前后端进行交互的时候&#xff0c;针对时间类型的格式…

设计模式03结构型模式

结构型模式 参考网课:黑马程序员Java设计模式详解 博客笔记 https://zgtsky.top/ 结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式&#xff0c;前者采用继承机制来组织接口和类&#xff0c;后者釆用组合或聚合来组合对象。 由于…

数字生态文明:构建可持续发展的未来

数字技术的快速发展给人类社会带来了巨大的变革,同时也对生态环境产生了深远的影响。在这个背景下,数字生态文明的概念应运而生,它强调在数字时代实现经济、社会和环境的协调发展,构建可持续的未来。 一、数字生态文明的内涵 数字生态文明是指在数字经济发展过程中,遵循…

Windows Nginx版本升级

记录windows系统上nginx版本从1.22.1直接升级到1.25.3&#xff0c;全程一步到位&#xff01; nginx官网: https://nginx.org/ C:\Windows\system32>cd C:\nginx# 查看当前nginx版本C:\nginx>nginx -v nginx version: nginx/1.22.1# 停止nginx服务C:\nginx>net stop ng…

什么是逆变器电源?逆变器需要测试哪些指标?

逆变器是一种将低压直流电(12v/24v/48v)转化为220v交流电的电子设备&#xff0c;由逆变桥、控制逻辑、滤波电路组成&#xff0c;被广泛应用于电脑、洗衣机、空调、太阳能发电系统、电池储能系统、风力发电系统等。逆变器有以下特点&#xff1a; 1.转换效率高、启动快; 2.安全性…

Java 第14章 集合 课堂练习

文章目录 HashSet判断是否两次add都能加入成功HashSet编码遍历HashMap判断输出中是否有"abc" HashSet判断是否两次add都能加入成功 HashSet set new HashSet(); set.add(new String("hsp")); set.add(new String("hsp"));第一次可以&#xff0…

直播电商“去网红化”势在必行,AI数字人打造品牌专属IP

近年来&#xff0c;网红直播带货“翻车”事件频发&#xff0c;给品牌商带来了信任危机和负面口碑的困扰&#xff0c;严重损害了企业的声誉。这证明强大的个人IP,对于吸引粉丝和流量确实能起到巨大的好处,堪称“金牌销售”,但太过强势的个人IP属性也会给企业带来一定风险&#x…

.NET 药厂业务系统 CPU爆高分析

Windbg 分析 1. CPU 真的爆高吗 还是老规矩&#xff0c;要想找到这个答案&#xff0c;可以使用 !tp 命令。 0:044> !tp logStart: 1 logSize: 200 CPU utilization: 88 % Worker Thread: Total: 8 Running: 4 Idle: 4 MaxLimit: 1023 MinLimit: 4 Work Request in Queue: …

如何本地搭建Zblog网站并通过内网穿透将个人博客发布到公网

文章目录 1. 前言2. Z-blog网站搭建2.1 XAMPP环境设置2.2 Z-blog安装2.3 Z-blog网页测试2.4 Cpolar安装和注册 3. 本地网页发布3.1. Cpolar云端设置3.2 Cpolar本地设置 4. 公网访问测试5. 结语 正文开始前给大家推荐个网站&#xff0c;前些天发现了一个巨牛的 人工智能学习网站…

Navicat里MySQL表的创建(详细)

我以Navicat连接MySQL为例&#xff0c;演示表的创建方法。 前提 创建表的语法&#xff1a; create table 表名 &#xff08; 字段名1&#xff0c;字段类型&#xff0c; 字段名2&#xff0c;字段类型&#xff0c; ...... 字段名n&#xff0c;字段类型 ); 我计划在test库存放一…

第三方登录-pc支付宝扫码登录流程

最近有个奇葩的需求&#xff0c;用户要支持支付宝扫码登录。这个需求很少见&#xff0c;那就做一下&#xff0c;看起来有点难&#xff0c;其实很简单。 先看结果 流程梳理 核心代码 获取支付宝扫码页面的url // 获取支付宝扫码登录页面的urlasync function getZFBLoginUrl()…

机器学习之朴素贝叶斯(Naive Bayes)附代码

概念 朴素贝叶斯(Naive Bayes)是一种基于贝叶斯定理的机器学习算法,它被广泛用于分类和文本分析任务。该算法的"朴素"体现在对特征之间的条件独立性的假设,即给定类别,特征之间是相互独立的。尽管这个假设在实际情况中并不总是成立,但这种简化有助于降低计算复…

动态规划--三步问题

本题题目链接备战技术面试&#xff1f;力扣提供海量技术面试资源&#xff0c;帮助你高效提升编程技能&#xff0c;轻松拿下世界 IT 名企 Dream Offer。https://leetcode.cn/problems/three-steps-problem-lcci/ 个人主页&#xff1a;Lei宝啊 愿所有美好如期而遇 动态规划&…

MCU平台下确定栈空间大小的方法

本文介绍MCU平台下确定栈空间大小的方法。 通常使用IDE开发MCU程序在生成Image文件时&#xff0c;Image文件被划分为代码区&#xff0c;数据区&#xff0c;BSS区&#xff0c;堆区&#xff0c;栈区。其中&#xff0c;代码区&#xff0c;数据区&#xff0c;BSS区空间大小由编译器…

Scrum敏捷转型培训公司有哪些?

对于企业而言&#xff0c;敏捷转型是提升竞争力、适应市场变化的重要手段。为了实现这一目标&#xff0c;许多知名的培训公司提供了专业的敏捷培训课程和认证。其中&#xff0c;Leangoo领歌以其全面的敏捷研发管理解决方案和多种认证课程而备受认可。 1、Leangoo领歌&#xff…

Centos 8.5 Oracle12c安装

由于多次安装踩坑&#xff0c;所以本次写了一份12c安装的完整版。可以直接使用。 一、安装数据库基本信息 名称 值 主机名 database 操作系统 CentOS Linux release 8.5.2111 Oracle用户名/密码 oracle Oracle 版本 12c Enterprise Edition Release 12.2.0.1.0 oracle…

Java 基础学习(十五)集合排序、Lambda和Stream

1 集合排序 1.1 集合排序API 1.1.1 集合排序概述 集合排序是指对一个集合中的元素按照特定规则进行重新排列&#xff0c;以使得集合中的元素按照预定义的顺序呈现。 在集合排序中&#xff0c;通常需要定义一个比较规则&#xff0c;这个比较规则用于决定集合中的元素在排序后…

Python 使用 python-dateutil 获取间隔时间

当前环境&#xff1a;Win10 Python3.7 python-dateutil2.8.2 from datetime import datetime # 获取当前日期 current_date datetime.now()from dateutil.relativedelta import relativedelta# 计算下一年的日期 next_year_date current_date relativedelta(years1) # 计…

【Linux C | 文件I/O】文件的打开关闭 | open、creat、colse 函数

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; &#x1f923;本文内容&#x1f923;&a…

Python学习6

大家好&#xff0c;这里是七七&#xff0c;今天来介绍的是LSTM模型实现代码。 总代码 import pandas as pd import numpy as np from keras.models import Sequential from keras.layers import LSTM,Dense from sklearn.preprocessing import MinMaxScaler from sklearn.met…