Confluent介绍及其使用

1 confluent介绍

Confluent是用来管理和组织不同数据源的流媒体平台,可以实时地把不同源和位置的数据集成到一个中心的事件流平台。并且很可靠、性能很高。

Confluent目前提供了社区版(免费)和商业版(收费)两个版本,社区版提供了Connectors、REST Proxy、KSQL、Schema-Registry等基础服务。商业版为企业提供了控制面板、负载均衡,跨中心数据备份、安全防护等高级特性。

1.2 服务功能介绍

1.2.1 Zookeeper

Zookeeper是一个开放源码的分布式应用程序协调服务,主要功能包扩:维护配置信息、命名、提供分布式同步、组管理等集中式服务 。Kafka使用ZooKeeper对集群元数据进行持久化存储,如果ZooKeeper丢失了Kafka数据,集群的副本映射关系以及topic等配置信息都会丢失,最终导致Kafka集群不再正常工作,造成数据丢失的后果。

1.2.2 Kafka

Kafka是一个分布式流处理平台,基于zookeeper协调并支持分区和多副本的分布式消息系统,是一种高吞吐量的分布式发布订阅消息系统,消息队列中间件,主要功能是负责消息传输,Confluent就是依赖Kafka来进行消息传输。Kafka最大的特性就是可以实时的处理大量数据以满足各种需求场景。

1.2.3 Control Center

control center可以很容易地管理kafka的连接,创建,编辑,和管理与其他系统的连接。我们可以从producer到consumer监控data streams,保证我们的每一条消息都被传递,还能测量出消息的传输耗时多久。使用confluent control center能让开发人员不写一句代码,也能构建基于kafka的数据生产管道。

1.2.4 Kafka-rest

Kafka-rest是Kafka RESTful接口服务组件,可以通过Restful接口而不是本机Kafka协议或客户端的情况下,生成和使用消息,而且还可以查看集群状态以及执行管理操作。

1.2.5 Schema-Registry

Schema-Registry是为元数据管理提供的服务,同样提供了RESTful接口用来存储和获取schemas,它能够保存数据格式变化的所有版本,并可以做到向下兼容。Schema-Registry还为Kafka提供了Avro格式的序列化插件来传输消息。Confluent主要用Schema-Registry来对数据schema进行管理和序列化操作。

1.2.6 Connect

Kafka Connect是 Kafka的一个开源组件,是用来将Kafka与数据库、key-value存储系统、搜索系统、文件系统等外部系统连接起来的基础框架。通过使用Kafka Connect框架以及现有的连接器可以实现从源数据读入消息到Kafka,再从Kafka读出消息到目的地的功能。

1.2.7 ksql-server

KSQL是使用SQL语句对Apache Kafka执行流处理任务的流式SQL引擎,Confluent 使用KSQL对Kafka的数据提供查询服务.

2 confluent下载

使用的开源的confluent的5.2.4版本

下载链接:http://packages.confluent.io/archive/5.2/confluent-5.2.4-2.11.tar.gz

3 环境准备

分布式搭建建议至少3个节点,但是由于用于测试及节点紧张这里使用2个节点

节点zookeeperkafkacontrol-centerkafka-resetschema-registryconnectorksql-server
10.0.165.8
10.0.165.9
2181909290218082808180838088

4 安装

4.1 解压

将下载的文件上传至linux,然后解压至相应的目录下

tar -zxvf /opt/package/confluent-5.2.4-2.11.tar.gz -C /home/kafka/.local/

修改文件名并进入到相应的目录下

mv /home/kafka/.local/confluent-5.2.4 /home/kafka/.local/confluent
cd /home/kafka/.local/confluent

4.2 修改配置

修改10.0.165.8节点的相应配置

4.2.1 zookeeper配置

(1)vim /home/kafka/.local/confluent/etc/kafka/zookeeper.properties

##数据存放目录,默认为/tmp/zookeepe存在删除风险
dataDir=/data/confluent/zookeeper
clientPort=2181
maxClientCnxns=0
initLimit=5
syncLimit=2##多个zookeeper server,server的编号1、2等要与myid中的一致
server.1=10.0.165.8:2888:3888
server.2=10.0.165.9:2888:3888

(2)生成myid

echo 1 > /home/kafka/.local/confluent/etc/kafka/myid

(3)修改confluent服务启动脚本,将myid发布到confluent运行目录下。

bin/confluent start会启动confluent的各服务,且会将etc下的各配置,复制到confluent运行目录下。

vim /home/kafka/.local/confluent/bin/confluent

在config_zookeeper()方法块最后一行,添加

cp ${confluent_conf}/kafka/myid $confluent_current/zookeeper/data/

目的是将etc/kafka/myid拷贝到confluent运行目录下,否则会报myid is no found,zookeeper启动失败。

4.2.2 Kafka配置

vim /home/kafka/.local/confluent/etc/kafka/server.properties

broker.id=0#listeners与advertised.listeners可以只配一个,与当前机器网卡有关系,请注意。advertised.listeners可能通用性更强,值为当前机器的ip与端口,其他机器ip无需配置
advertised.listeners=PLAINTEXT://10.0.165.8:9092##根据实际情况调整
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100#log.dirs是最重要的配置,kafka数据所在
log.dirs=/data/confluent/kafka-logs
num.partitions=12num.recovery.threads.per.data.dir=1message.max.bytes=10000000
replica.fetch.max.bytes= 10485760
auto.create.topics.enable=true
auto.leader.rebalance.enable = true##备份因子数<=kafka节点数,若大于会报错
default.replication.factor=2
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1log.flush.interval.messages=20000
log.flush.interval.ms=10000
log.flush.scheduler.interval.ms=2000
log.retention.check.interval.ms=300000
log.cleaner.enable=true##log失效时间,单位小时
log.retention.hours=48
zookeeper.connect=10.0.165.8:2181,10.0.165.9:2181
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000confluent.metrics.reporter.bootstrap.servers=10.0.165.8:9092,10.0.165.9:9092
confluent.metrics.reporter.topic.replicas=2confluent.support.metrics.enable=true
confluent.support.customer.id=anonymousdelete.topic.enable=true
group.initial.rebalance.delay.ms=0

4.2.3 kafka-rest

vim /home/kafka/.local/confluent/etc/kafka-rest/kafka-rest.properties

id=kafka-rest-server-001
schema.registry.url=http://10.0.165.8:8081
zookeeper.connect=10.0.165.8:2181,10.0.165.9:2181
bootstrap.servers=PLAINTEXT://10.0.165.8:9092
port=8082
consumer.threads=8access.control.allow.methods=GET,POST,PUT,DELETE,OPTIONS
access.control.allow.origin=*

4.2.4 ksql

confluent-4没有这个

vim /home/kafka/.local/confluent/etc/ksql/ksql-server.properties

ksql.service.id=default_
bootstrap.servers=10.0.165.8:9092,10.0.165.9:9092
listeners=http://0.0.0.0:8088
ksql.schema.registry.url=http://10.0.165.8:8081,http://10.0.165.9:8081
ksql.sink.partitions=4

4.2.5 confluent-control-center

vim /home/kafka/.local/confluent/etc/confluent-control-center/control-center-dev.properties

bootstrap.servers=10.0.165.8:9092,10.0.165.9:9092
zookeeper.connect=10.0.165.8:2181,10.0.165.9:2181
confluent.controlcenter.rest.listeners=http://0.0.0.0:9021#每个id要唯一,不然只能启动一个
confluent.controlcenter.id=1
confluent.controlcenter.data.dir=/data/confluent/control-center
confluent.controlcenter.connect.cluster=http://10.0.165.8:8083,http://10.0.165.9:8083##每台都配置各自的ip
confluent.controlcenter.ksql.url=http://10.0.165.8:8088
confluent.controlcenter.schema.registry.url=http:/10.0.165.8:8081,http://10.0.165.9:8081confluent.controlcenter.internal.topics.replication=2
confluent.controlcenter.internal.topics.partitions=2
confluent.controlcenter.command.topic.replication=2
confluent.monitoring.interceptor.topic.partitions=2
confluent.monitoring.interceptor.topic.replication=2
confluent.metrics.topic.replication=2confluent.controlcenter.streams.num.stream.threads=30

4.2.6 schema-registry

vim /home/kafka/.local/confluent/etc/schema-registry/schema-registry.properties

listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://10.0.165.8:9092,10.0.165.9:9092
kafkastore.topic=_schemas
debug=false

4.2.7 connect

vim /home/kafka/.local/confluent/etc/schema-registry/connect-avro-distributed.properties

bootstrap.servers=10.0.165.8:9092,10.0.165.9:9092
group.id=connect-clusterkey.converter=org.apache.kafka.connect.storage.StringConverter 
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statusesconfig.storage.replication.factor=2
offset.storage.replication.factor=2
status.storage.replication.factor=2internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=falserest.port=8083
rest.advertised.port=8083plugin.path=/home/kafka/.local/confluent/share/java

4.2.8 将confluent发送到其他节点

scp -r confluent/ kafka@10.0.165.9:/home/kafka/.local/

然后修改其他节点的配置

vi myid

2

vi /home/kafka/.local/confluent/etc/kafka/server.properties

broker.id=1
advertised.listeners=PLAINTEXT://10.0.165.9:9092

vi /home/kafka/.local/confluent/etc/kafka-rest/kafka-rest.properties

id=kafka-rest-server-002
schema.registry.url=http://10.0.165.9:8081
bootstrap.servers=PLAINTEXT://10.0.165.9:9092

vi /home/kafka/.local/confluent/etc/confluent-control-center/control-center-dev.properties

confluent.controlcenter.id=2
confluent.controlcenter.ksql.url=http://10.0.165.9:8088

然后在两个节点的/data目录下新建confluent并修改权限

sudo mkdir /data/confluent
sudo chown kafka:kafka /data/confluent

4.3 服务启动与停止

4.3.1 全部服务启动

启动:bin/confluent start

查看状态:bin/confluent status

停止:bin/confluent stop

4.3.2 单独启动服务

服务单独启动

启动kafka-rest

bin/kafka-rest-start   etc/kafka-rest/kafka-rest.properties

上面的这种方式是前台启动,也可以以后台方式启动。

nohup bin/kafka-rest-start   etc/kafka-rest/kafka-rest.properties &

启动zookeeper

bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties 

启动kafka broker

bin/kafka-server-start -daemon  etc/kafka/server.properties

启动schema registry

bin/schema-registry-start -daemon  etc/schema-registry/schema-registry.properties

5 安装过程常见报错

5.1 KafkaServer启动失败

[2020-06-27 04:28:15,713] FATAL [KafkaServer id=2] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.KafkaException: Socket server failed to bind to 10.0.165.8:9092: Cannot assign requested address.at kafka.network.Acceptor.openServerSocket(SocketServer.scala:331)at kafka.network.Acceptor.<init>(SocketServer.scala:256)at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:97)at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:89)at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)at kafka.network.SocketServer.startup(SocketServer.scala:89)at kafka.server.KafkaServer.startup(KafkaServer.scala:229)at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:112)at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:58)
Caused by: java.net.BindException: Cannot assign requested addressat sun.nio.ch.Net.bind0(Native Method)at sun.nio.ch.Net.bind(Net.java:433)at sun.nio.ch.Net.bind(Net.java:425)at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)at kafka.network.Acceptor.openServerSocket(SocketServer.scala:327)... 9 more
[2020-06-27 04:28:15,715] INFO [KafkaServer id=2] shutting down (kafka.server.KafkaServer)
[2020-06-27 04:28:15,717] INFO [SocketServer brokerId=2] Shutting down (kafka.network.SocketServer)
[2020-06-27 04:28:15,718] INFO [SocketServer brokerId=2] Shutdown completed (kafka.network.SocketServer)
[2020-06-27 04:28:15,721] INFO Shutting down. (kafka.log.LogManager)
[2020-06-27 04:28:15,760] INFO Shutdown complete. (kafka.log.LogManager)
[2020-06-27 04:28:15,761] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2020-06-27 04:28:15,762] INFO Session: 0x27297ff0225a5a9 closed (org.apache.zookeeper.ZooKeeper)
[2020-06-27 04:28:15,764] INFO EventThread shut down for session: 0x27297ff0225a5a9 (org.apache.zookeeper.ClientCnxn)
[2020-06-27 04:28:15,765] INFO [KafkaServer id=2] shut down completed (kafka.server.KafkaServer)
[2020-06-27 04:28:15,766] INFO [KafkaServer id=2] shutting down (kafka.server.KafkaServer)

自己copy了server.properties文件到各个节点没有修改下面的配置 监听器的配置,应该指向节点本身的主机名和端口,我全部四台机器都指向了10.0.165.8,所以导致了只有节点1是正常的

advertised.listeners=PLAINTEXT://10.0.165.9:9092

5.2 Confluent schema-registry启动失败

[2020-06-27 16:09:39,872] WARN The replication factor of the schema topic _schemas is less than the desired one of 3. If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic. (io.confluent.kafka.schemaregistry.storage.KafkaStore:242)
[2020-06-27 16:09:50,095] ERROR Server died unexpectedly:  (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
java.lang.IllegalArgumentException: Unable to subscribe to the Kafka topic _schemas backing this data store. Topic may not exist.at io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread.<init>(KafkaStoreReaderThread.java:125)at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:130)at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:199)at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:64)at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:42)at io.confluent.rest.Application.createServer(Application.java:157)at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43)

因为kafkaserver没有启动

6 常用操作

(1)启动

confluent start

(2)查看日志文件目录

confluent current

(3)列出连接

confluent list connectors

(4)查看加载的连接器

confluent status connectors

[
"file-source"
]

(5)查看具体连接器状态

confluent status file-source

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473603.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何使用 Pylint 来规范 Python 代码风格

Pylint 是什么 Pylint 是一个 Python 代码分析工具&#xff0c;它分析 Python 代码中的错误&#xff0c;查找不符合代码风格标准&#xff08;Pylint 默认使用的代码风格是 PEP 8&#xff0c;具体信息&#xff0c;请参阅参考资料&#xff09;和有潜在问题的代码。目前 Pylint 的…

LeetCode 809. 情感丰富的文字

文章目录1. 题目2. 解题1. 题目 有时候人们会用重复写一些字母来表示额外的感受&#xff0c;比如 "hello" -> "heeellooo", "hi" -> "hiii"。 我们将相邻字母都相同的一串字符定义为相同字母组&#xff0c;例如&#xff1a;&qu…

confluent connect写出到ES及ClickHouse

1 连接Elasticsearch测试 1.1 启动confluent /home/kafka/.local/confluent/bin/confluent start This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.htmlUsing CONFLUENT_CURRENT: /tmp/confluent.swpIapNw Sta…

tomcat内存溢出问题解决思路

1、修改启动时内存参数、并指定JVM时区 &#xff08;在windows server 2008 下时间少了8个小时&#xff09;在Tomcat上运行j2ee项目代码时&#xff0c;经常会出现内存溢出的情况&#xff0c;解决办法是在系统参数中增加系统参数&#xff1a; window下&#xff0c; 在catalina.b…

网站部署nginx--uwsgi

网站代码写完之后就是项目部署&#xff0c;主要包括两个方面&#xff1a; 1.nginx安装与配置&#xff1a; 1、Nginx 安装 系统平台&#xff1a;CentOS release 6.6 (Final) 64位。 一、安装编译工具及库文件 yum -y install make zlib zlib-devel gcc-c libtool openssl open…

天池 在线编程 滑动数独(滑动窗口)

文章目录1. 题目2. 解题1. 题目 描述 给定一个 3xn的矩阵 number&#xff0c;并且该矩阵只含有1到9的正整数。 考虑有一个大小为 3x3 滑动窗口&#xff0c;从左到右遍历该矩阵 number&#xff0c; 那么该滑动窗口在遍历整个矩阵的过程中会有n-2个。 现在你的任务是找出这些滑…

TIGK监控平台介绍

1 概述 众所周知监控平台对大数据平台是非常重要的&#xff0c;监控是故障诊断和分析的重要辅助利器&#xff0c;在发生事故之前就能预警&#xff0c;最大限度降低系统故障率。   监控系统我们可以分为业务层面&#xff0c;应用层面&#xff0c;系统层面 1.1 业务层面 业务系…

有意思的网站

谱聚类 http://blog.pluskid.org/?p287 Qt Graphics View 框架 http://yleesun.blog.163.com/blog/static/2941340220096110165817/ 谷歌编码规范 http://google-styleguide.googlecode.com/svn/trunk/cppguide.xml 匈牙利命名法 http://blog.csdn.net/buglu/article/details/…

天池 在线编程 队列检查(排序)

文章目录1. 题目2. 解题1. 题目 描述 班上的学生根据他们的年级照片的身高升序排列&#xff0c;确定当前未站在正确位置的学生人数 数组长度 < 10^5示例 输入: heights [1,1,3,3,4,1]输出: 3解释: 经过排序后 heights变成了[1,1,1,3,3,4]&#xff0c;有三个学生不在应在…

celery异步执行任务在Django中的应用实例

1. 创建django项目celery_demo, 创建应用demo: django-admin startproject celery_demo python manage.py startapp demo2.在celery_demo模块中创建celery.py模块, 文件目录为: celery.py模块内容为: from celery import Celery from django.conf import settings import os#…

Spring自学教程-注解的使用(三)

一、java中的注解定义注解下面是一个定义注解的实例。Target(ElementType.TYPE)Retention(RetentionPolicy.RUNTIME)DocumentedInheritedpublic interface Description { String value();}其中的interface是一个关键字&#xff0c;在设计annotations的时候必须把一个类型定义为…

Django单元测试

一.前言/准备 测Django的东西仅限于在MTV模型。哪些可以测&#xff1f;哪些不可以。 1.html里的东西不能测。①Html里的HTML代码大部分都是写死的②嵌套在html中的Django模板语言也不能测&#xff0c;即使有部分逻辑。 但写测试用例时至少要调用一个类或者方法。模板语言没有出…

Telegraf安装及使用

1 安装 1.1 创建用户 &#xff08;1&#xff09;添加用户 # useradd tigk # passwd tigk Changing password for user tigk. New password: BAD PASSWORD: The password is shorter than 8 characters Retype new password: passwd: all authentication tokens updated suc…

天池 在线编程 中位数

文章目录1. 题目2. 解题1. 题目 描述 给定一个长度为N的整数数组arr 返回一个长度为N的整数答案数组ans ans[i] 表示删除arr数组第i个数后&#xff0c;arr数组的中位数 N为偶数 2 < N < 10^5 示例 输入:[1,2,3,4,5,6] 输出:[4,4,4,3,3,3] 解释:删去1后 剩下的数组为[…

自动化运维Shell课堂笔记

1、课程回顾 2、课程大纲 1、shell编程 开发和运维 shell基础知识 shell变量 shell表达式 shell流程控制语句 2、代码发布 项目周期 代码部署的方式 代码部署流程 服务器环境部署 手工方式部署代码 脚本方式部署代码 3、shell 3.1、开发和运维 3.1.1 开发 开发是什么&…

InfluxDB安装及使用

1 安装 1.1 Tar包安装 &#xff08;1&#xff09;获取tar包 wget https://dl.influxdata.com/influxdb/releases/influxdb-1.8.0_linux_amd64.tar.gz&#xff08;2&#xff09;解压tar包   tar xvfz influxdb-1.8.0_linux_amd64.tar.gz $ su - tigk $ tar xvfz /opt/packa…

倒排索引原理和实现

关于倒排索引 搜索引擎通常检索的场景是&#xff1a;给定几个关键词&#xff0c;找出包含关键词的文档。怎么快速找到包含某个关键词的文档就成为搜索的关键。这里我们借助单词——文档矩阵模型&#xff0c;通过这个模型我们可以很方便知道某篇文档包含哪些关键词&#xff0c;某…

天池 在线编程 Character deletion

文章目录1. 题目2. 解题1. 题目 描述 Enter two strings and delete all characters in the second string from the first string 字符串长度&#xff1a;[1, 10^5] Example 1: Input: str”They are students”&#xff0c;sub”aeiou” Output: ”Thy r stdnts”来源&am…

【翻译】在Ext JS中创建特定主题的重写

Ext JS提供了大量的功能来使类的创建和处理变得简单&#xff0c;还提供了一系列的功能来扩展和重新现有的Javascript类。这意味着可以为类添加行为和创建属于自己的类&#xff0c;或者重写某些函数的行为。在本文&#xff0c;将展示如何实现特定主题类的重写。原文&#xff1a;…

Kapacitor安装及使用

1 安装 1.1 Tar包安装 &#xff08;1&#xff09;下载 wget https://dl.influxdata.com/kapacitor/releases/kapacitor-1.5.5_linux_amd64.tar.gz&#xff08;2&#xff09;安装 $ tar xvfz /opt/package/kapacitor-1.5.5-static_linux_amd64.tar.gz -C /home/tigk/.local/ …