confluent connect写出到ES及ClickHouse

1 连接Elasticsearch测试

1.1 启动confluent

/home/kafka/.local/confluent/bin/confluent start

This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.htmlUsing CONFLUENT_CURRENT: /tmp/confluent.swpIapNw
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

1.2 增加配置

vim /home/kafka/.local/confluent/etc/kafka-connect-elasticsearch/iot-elasticsearch-sink.properties

name=iot-elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=road_traffic
key.ignore=true
connection.url=http://10.0.165.8:9200
type.name=iot-kafka-connect
batch.size=1
flush.timeout.ms=200000
topic.schema.ignore=road_traffic
schema.ignore=true
retry.backoff.ms=3000

1.3 增加connect

bin/confluent load iot-elasticsearch-sink -d etc/kafka-connect-elasticsearch/iot-elasticsearch-sink.properties

$ bin/confluent load iot-elasticsearch-sink -d etc/kafka-connect-elasticsearch/iot-elasticsearch-sink.properties
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.htmlWarning: Install 'jq' to add support for parsing JSON
{"name":"iot-elasticsearch-sink","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"1","topics":"road_traffic","key.ignore":"true","connection.url":"http://10.0.165.8:9200","type.name":"iot-kafka-connect","batch.size":"1","flush.timeout.ms":"200000","topic.schema.ignore":"road_traffic","schema.ignore":"true","retry.backoff.ms":"3000","name":"iot-elasticsearch-sink"},"tasks":[],"type":"sink"}

查看状态

$ bin/confluent status connectors
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
["iot-elasticsearch-sink"]$ bin/confluent status iot-elasticsearch-sink
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
{"name":"iot-elasticsearch-sink","connector":{"state":"RUNNING","worker_id":"10.0.165.9:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.0.165.8:8083"}],"type":"sink"}

1.4 创建kafkatopic

/home/kafka/.local/confluent/bin/kafka-topics --zookeeper fbi-local-08:2181,fbi-local-09:2181 --create --replication-factor 2 --partitions 2 --topic road_traffic

查看是否创建成功

/home/kafka/.local/confluent/bin/kafka-topics --zookeeper fbi-local-08:2181,fbi-local-09:2181 --list

1.5 生产数据

(1)添加如下的依赖

    <groupId>org.example</groupId><artifactId>Manufacturing_data</artifactId><version>1.0-SNAPSHOT</version><properties><scala.binary.version>2.11</scala.binary.version><kafka.version>1.0.0</kafka.version><avro.version>1.8.0</avro.version></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_${scala.binary.version}</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.66</version></dependency><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>${avro.version}</version></dependency><dependency><groupId>org.apache.avro</groupId><artifactId>avro-tools</artifactId><version>${avro.version}</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

(2)confluent的相关包在maven上是找不到的。需要自己手动添加,否则会报错找不到io.confluent.kafka.serializers.KafkaAvroSerializer。

confluent-4.0.0 解压后,其 share/java/目录下有 confluent 各个组件的 jar 包:我们需要 confluent-common 目录下的common-config-4.1.1.jarcommon-utils-4.1.1.jar和全部以jackson开头的 jar 包以及 kafka-serde-tools 目录下的kafka-schema-registry-client-4.1.1.jarkafka-avro-serializer-4.1.1.jar
复制出来在模块下新建一个lib包放入,然后右键Add as Libary…

common-config-4.0.0.jar
common-utils-4.0.0.jar
jackson-annotations-2.9.0.jar
jackson-core-2.9.1.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.9.1.jar
jackson-jaxrs-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
jackson-xc-1.9.13.jar
kafka-avro-serializer-4.0.0.jar
kafka-schema-registry-client-4.0.0.jar

生产者代码如下:

import java.io.File
import java.util.Propertiesimport com.alibaba.fastjson.JSON
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}case class RoadTraffic(status:String,avgMeasuredTime:Int,avgSpeed: Int, extID:String,medianMeasuredTime: Int, timestamp: Long,vehicleCount:Int,id:Long,perort_id:String,process_time:Long)object KafkaToTraffic {def main(args: Array[String]): Unit = {// kafka配置参数val props = new Properties()props.put("bootstrap.servers","10.0.165.8:9092,10.0.165.9:9092")props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")props.put("schema.registry.url", "http://10.0.165.8:8081");// Avro Schema解析val schema:Schema = new Schema.Parser().parse(new File("E:\\working\\ideaWorking\\iot_road_traffic\\src\\main\\resources\\RoadTraffic.avsc"));//val recordInjection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)val avroRecord:GenericData.Record = new GenericData.Record(schema)//创建一个kafka生产者val producer: KafkaProducer[String,GenericRecord] = new KafkaProducer(props)val str= "{\"status\":\"OK\",\"avgMeasuredTime\":\"53\",\"avgSpeed\":\"58\",\"extID\":\"724\",\"medianMeasuredTime\":\"53\",\"TIMESTAMP\":\"2014-04-25T19:35:00\",\"vehicleCount\":\"1\",\"id\":\"8961146\",\"perort_id\":\"179444\",\"process_time\":\"1593386110\"}"val roadTraffic = JSON.parseObject(str, classOf[RoadTraffic])System.out.println(roadTraffic)avroRecord.put("status", roadTraffic.status);avroRecord.put("avgMeasuredTime", roadTraffic.avgMeasuredTime);avroRecord.put("avgSpeed", roadTraffic.avgSpeed);avroRecord.put("extID", roadTraffic.extID);avroRecord.put("medianMeasuredTime", roadTraffic.medianMeasuredTime);avroRecord.put("timestamp", roadTraffic.timestamp);avroRecord.put("vehicleCount", roadTraffic.vehicleCount);avroRecord.put("id", roadTraffic.id);avroRecord.put("perort_id", roadTraffic.perort_id);avroRecord.put("process_time", roadTraffic.process_time);try {val record = new ProducerRecord[String, GenericRecord]("road_traffic", avroRecord)System.out.println(record.toString)producer.send(record).get()} catch {case e: Exception => e.printStackTrace()}producer.close();}
}

RoadTraffic.avsc

{"type": "record","name": "traffic","fields": [{"name": "status", "type": "string"},{"name": "avgMeasuredTime",  "type": "int"},{"name": "avgSpeed",  "type": "int"},{"name": "extID", "type": "string"},{"name": "medianMeasuredTime", "type": "int"},{"name": "timestamp", "type": "long"},{"name": "vehicleCount", "type": "int"},{"name": "id", "type": "long"},{"name": "perort_id", "type": "string"},{"name": "process_time", "type": "long"}]
}

1.6 查看结果

加载iot-elasticsearch-sink后启动生产者,会自动在ES上建立与topic一样的索引,查看es

curl -GET http://10.0.165.8:9200/road_traffic/_search

$ curl -GET http://10.0.165.8:9200/road_traffic/_search
{"took":1,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"road_traffic","_type":"iot-kafka-connect","_id":"road_traffic+0+0","_score":1.0,"_source":{"status":"OK","avgMeasuredTime":53,"avgSpeed":58,"extID":"724","medianMeasuredTime":53,"timestamp":1398425700000,"vehicleCount":1,"id":8961146,"perort_id":"179444","process_time":1593386110}}]}}

2 连接ClickHouse测试

连接ClickHouse是通过jdbc的方式

2.1 增加配置

vim /home/kafka/.local/confluent/etc/kafka-connect-jdbc/iot-clickhouse-sink.properties

name=iot-clickhouse-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=road_traffic
connection.url=jdbc:clickhouse://10.0.50.1:8123/iot
connection.user=default
auto.create=false
insert.mode=INSERT
table.name.format=traffic_all
errors.log.enable=true
db.timezone=Asia/Shanghai

2.2 增加jar包

通过jdbc连接ClickHouse是需要在/home/kafka/.local/confluent/share/java/kafka-connect-jdbc目录下增加ClickHouse的jdbc连接的jar包:clickhouse-jdbc-0.2.4.jar

$ ll
total 10952
-rw-r--r-- 1 kafka kafka   20437 Mar 27 08:37 audience-annotations-0.5.0.jar
-rw-r--r-- 1 root  root   211574 Jun 29 12:30 clickhouse-jdbc-0.2.4.jar
-rw-r--r-- 1 kafka kafka   20903 Mar 27 08:37 common-utils-5.2.4.jar
-rw-r--r-- 1 kafka kafka   87325 Mar 27 08:37 jline-0.9.94.jar
-rw-r--r-- 1 kafka kafka  317816 Mar 27 08:37 jtds-1.3.1.jar
-rw-r--r-- 1 kafka kafka  223878 Mar 27 08:37 kafka-connect-jdbc-5.2.4.jar
-rw-r--r-- 1 kafka kafka 1292696 Mar 27 08:37 netty-3.10.6.Final.jar
-rw-r--r-- 1 kafka kafka  927447 Mar 27 08:37 postgresql-42.2.10.jar
-rw-r--r-- 1 kafka kafka   41203 Mar 27 08:37 slf4j-api-1.7.25.jar
-rw-r--r-- 1 kafka kafka 7064881 Mar 27 08:37 sqlite-jdbc-3.25.2.jar
-rw-r--r-- 1 kafka kafka   74798 Mar 27 08:37 zkclient-0.10.jar
-rw-r--r-- 1 kafka kafka  906708 Mar 27 08:37 zookeeper-3.4.13.jar

注意:需要重启confluent,否则会报错: java.sql.SQLException: No suitable driver found for jdbc:clickhouse://10.0.50.1:8123/iot

2.3 在clickhouse建库建表

2.4 增加connect

因为前面已经在进行es测试的时候往road_traffic的主题上写入了一条数据直接进行测试

bin/confluent load iot-clickhouse-sink -d etc/kafka-connect-jdbc/iot-clickhouse-sink.properties

[kafka@fbi-local-08 confluent]$ bin/confluent load iot-clickhouse-sink -d etc/kafka-connect-jdbc/iot-clickhouse-sink.properties
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.htmlWarning: Install 'jq' to add support for parsing JSON
{"name":"iot-clickhouse-sink","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max":"1","topics":"road_traffic","connection.url":"jdbc:clickhouse://10.0.50.1:8123/iot","connection.user":"default","auto.create":"false","insert.mode":"INSERT","table.name.format":"traffic_all","errors.log.enable":"true","db.timezone":"Asia/Shanghai","name":"iot-clickhouse-sink"},"tasks":[],"type":"sink"}
$  bin/confluent status iot-clickhouse-sink
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html{"name":"iot-clickhouse-sink","connector":{"state":"RUNNING","worker_id":"10.0.165.8:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"10.0.165.8:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Table \"traffic_all\" is missing and auto-creation is disabled\n\tat io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:88)\n\tat io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)\n\tat io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:85)\n\tat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)\n\t... 10 more\n"}],"type":"sink"}

当修改了iot-clickhouse-sink.properties中的表为本地表traffic时不报错。

解决如下:
修改kafka-connect-jdbc-5.2.4源码,增加clickhouse的连接然后将修改编译后的jar包上传到/home/kafka/.local/confluent/share/java/kafka-connect-jdbc,并删除原来的kafka-connect-jdbc-5.2.4.jar

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473600.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

tomcat内存溢出问题解决思路

1、修改启动时内存参数、并指定JVM时区 &#xff08;在windows server 2008 下时间少了8个小时&#xff09;在Tomcat上运行j2ee项目代码时&#xff0c;经常会出现内存溢出的情况&#xff0c;解决办法是在系统参数中增加系统参数&#xff1a; window下&#xff0c; 在catalina.b…

网站部署nginx--uwsgi

网站代码写完之后就是项目部署&#xff0c;主要包括两个方面&#xff1a; 1.nginx安装与配置&#xff1a; 1、Nginx 安装 系统平台&#xff1a;CentOS release 6.6 (Final) 64位。 一、安装编译工具及库文件 yum -y install make zlib zlib-devel gcc-c libtool openssl open…

天池 在线编程 滑动数独(滑动窗口)

文章目录1. 题目2. 解题1. 题目 描述 给定一个 3xn的矩阵 number&#xff0c;并且该矩阵只含有1到9的正整数。 考虑有一个大小为 3x3 滑动窗口&#xff0c;从左到右遍历该矩阵 number&#xff0c; 那么该滑动窗口在遍历整个矩阵的过程中会有n-2个。 现在你的任务是找出这些滑…

TIGK监控平台介绍

1 概述 众所周知监控平台对大数据平台是非常重要的&#xff0c;监控是故障诊断和分析的重要辅助利器&#xff0c;在发生事故之前就能预警&#xff0c;最大限度降低系统故障率。   监控系统我们可以分为业务层面&#xff0c;应用层面&#xff0c;系统层面 1.1 业务层面 业务系…

有意思的网站

谱聚类 http://blog.pluskid.org/?p287 Qt Graphics View 框架 http://yleesun.blog.163.com/blog/static/2941340220096110165817/ 谷歌编码规范 http://google-styleguide.googlecode.com/svn/trunk/cppguide.xml 匈牙利命名法 http://blog.csdn.net/buglu/article/details/…

天池 在线编程 队列检查(排序)

文章目录1. 题目2. 解题1. 题目 描述 班上的学生根据他们的年级照片的身高升序排列&#xff0c;确定当前未站在正确位置的学生人数 数组长度 < 10^5示例 输入: heights [1,1,3,3,4,1]输出: 3解释: 经过排序后 heights变成了[1,1,1,3,3,4]&#xff0c;有三个学生不在应在…

celery异步执行任务在Django中的应用实例

1. 创建django项目celery_demo, 创建应用demo: django-admin startproject celery_demo python manage.py startapp demo2.在celery_demo模块中创建celery.py模块, 文件目录为: celery.py模块内容为: from celery import Celery from django.conf import settings import os#…

Spring自学教程-注解的使用(三)

一、java中的注解定义注解下面是一个定义注解的实例。Target(ElementType.TYPE)Retention(RetentionPolicy.RUNTIME)DocumentedInheritedpublic interface Description { String value();}其中的interface是一个关键字&#xff0c;在设计annotations的时候必须把一个类型定义为…

Django单元测试

一.前言/准备 测Django的东西仅限于在MTV模型。哪些可以测&#xff1f;哪些不可以。 1.html里的东西不能测。①Html里的HTML代码大部分都是写死的②嵌套在html中的Django模板语言也不能测&#xff0c;即使有部分逻辑。 但写测试用例时至少要调用一个类或者方法。模板语言没有出…

Telegraf安装及使用

1 安装 1.1 创建用户 &#xff08;1&#xff09;添加用户 # useradd tigk # passwd tigk Changing password for user tigk. New password: BAD PASSWORD: The password is shorter than 8 characters Retype new password: passwd: all authentication tokens updated suc…

天池 在线编程 中位数

文章目录1. 题目2. 解题1. 题目 描述 给定一个长度为N的整数数组arr 返回一个长度为N的整数答案数组ans ans[i] 表示删除arr数组第i个数后&#xff0c;arr数组的中位数 N为偶数 2 < N < 10^5 示例 输入:[1,2,3,4,5,6] 输出:[4,4,4,3,3,3] 解释:删去1后 剩下的数组为[…

自动化运维Shell课堂笔记

1、课程回顾 2、课程大纲 1、shell编程 开发和运维 shell基础知识 shell变量 shell表达式 shell流程控制语句 2、代码发布 项目周期 代码部署的方式 代码部署流程 服务器环境部署 手工方式部署代码 脚本方式部署代码 3、shell 3.1、开发和运维 3.1.1 开发 开发是什么&…

InfluxDB安装及使用

1 安装 1.1 Tar包安装 &#xff08;1&#xff09;获取tar包 wget https://dl.influxdata.com/influxdb/releases/influxdb-1.8.0_linux_amd64.tar.gz&#xff08;2&#xff09;解压tar包   tar xvfz influxdb-1.8.0_linux_amd64.tar.gz $ su - tigk $ tar xvfz /opt/packa…

倒排索引原理和实现

关于倒排索引 搜索引擎通常检索的场景是&#xff1a;给定几个关键词&#xff0c;找出包含关键词的文档。怎么快速找到包含某个关键词的文档就成为搜索的关键。这里我们借助单词——文档矩阵模型&#xff0c;通过这个模型我们可以很方便知道某篇文档包含哪些关键词&#xff0c;某…

天池 在线编程 Character deletion

文章目录1. 题目2. 解题1. 题目 描述 Enter two strings and delete all characters in the second string from the first string 字符串长度&#xff1a;[1, 10^5] Example 1: Input: str”They are students”&#xff0c;sub”aeiou” Output: ”Thy r stdnts”来源&am…

【翻译】在Ext JS中创建特定主题的重写

Ext JS提供了大量的功能来使类的创建和处理变得简单&#xff0c;还提供了一系列的功能来扩展和重新现有的Javascript类。这意味着可以为类添加行为和创建属于自己的类&#xff0c;或者重写某些函数的行为。在本文&#xff0c;将展示如何实现特定主题类的重写。原文&#xff1a;…

Kapacitor安装及使用

1 安装 1.1 Tar包安装 &#xff08;1&#xff09;下载 wget https://dl.influxdata.com/kapacitor/releases/kapacitor-1.5.5_linux_amd64.tar.gz&#xff08;2&#xff09;安装 $ tar xvfz /opt/package/kapacitor-1.5.5-static_linux_amd64.tar.gz -C /home/tigk/.local/ …

Python答题:LinteCode简单题库(一)

366. 斐波纳契数列&#xff1a;查找斐波纳契数列中第 N 个数。 所谓的斐波纳契数列是指&#xff1a; 前2个数是 0 和 1 。 第 i 个数是第 i-1 个数和第i-2 个数的和。 斐波纳契数列的前10个数字是&#xff1a; 0, 1, 1, 2, 3, 5, 8, 13, 21, 34 ... 给定 1&#xff0c;返回 …

天池 在线编程 扫雷(BFS)

文章目录1. 题目2. 解题1. 题目 描述 现在有一个简易版的扫雷游戏&#xff0c;你将得到一个n*m大小的二维数组作为游戏地图。 每个位置上有一个值&#xff08;0或1&#xff0c;1代表此处没有雷&#xff0c;0表示有雷&#xff09;。 你将获得一个起点的位置坐标&#xff08;x&a…

Linux搭建高并发高可用Redis集群

安装Redis Redis 是一个高性能的key-value数据库。常用作缓存服务器使用。 1. 下载redis安装包&#xff0c;redis-3.2.11.tar.gz&#xff08;http://download.redis.io/releases/redis-3.2.11.tar.gz&#xff09; > wget http://download.redis.io/releases/redis-3.2.11.…