kafka初识

kafka中文文档 本文环境:ubuntu:18.04

kafka安装、配置与基本使用(单节点)

安装kafka

下载 0.10.0.1版本并解压缩
在这里插入图片描述

> tar -xzf kafka_2.11-0.10.0.1.tgz
> cd kafka_2.11-0.10.0.1.tgz

kafka简单配置

> vi config/server.properties主要注意三个地方:broker.id:			标识本机log.dirs:			是kafka接收消息存放路径zookeeper.connect:	指定连接的zookeeper集群地址

在这里插入图片描述

启动服务器

启动ZooKeeper服务器:
如果没有ZooKeeper服务器,可以通过与kafka打包在一起的便捷脚本来创建一个单节点ZooKeeper实例:

> bin/zookeeper-server-start.sh config/zookeeper.properties
> #后台启动: bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
[2021-02-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

启动Kafka服务器:

> bin/kafka-server-start.sh config/server.properties
> #后台启动: bin/kafka-server-start.sh -daemon config/server.properties
[2021-02-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2021-02-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

创建一个 topic

创建一个名为“test”的topic,它有一个分区和一个副本:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看所有 topic列表:

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

删除一个 topic (标记):

> bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

发送一些消息

Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群。默认情况下,每行将作为单独的message发送。
运行 producer,然后在控制台输入一些消息以发送到服务器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

启动一个 consumer

Kafka 还有一个命令行consumer(消费者),将消息转储到标准输出。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

查看消费组

查看所有消费组

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer

查看单个消费组的消费详情

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test(消费组name) --new-consumer

发布-订阅模式

Topic:

  • Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据

  • 对于每一个topic, Kafka集群都会维持一个分区日志,每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commitlog文件

  • 分区中的每一个记录都会分配一个id号来表示顺序,我们称之为 offset,offset用来唯一的标识分区中每一条记录

  • Kafka 集群保留所有发布的记录—无论他们是否已被消费—并通过一个可配置的参数——保留期限来控制.

  • 每一个消费者中唯一保存的元数据是offset(偏移量)即消费在log中的位置.

偏移量由消费者所控制:通常读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录.

生产者

生产者可以将数据发布到所选择的topic(主题)中

消费者

  • 消费者使用一个 消费组 名称来进行标识,发布到topic中的每条记录被分配给 订阅消费组 中的一个消费者实例.

  • 如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例

  • 如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程

消费者消费是以消费组为单位,消费组之间数据互不影响,组内数据均分

Go实现(Go 1.15)

kafka_p.go

package kafka_pimport ("github.com/Shopify/sarama""log""time"
)// 定义Kafka生产者对象
type KafkaProducer struct {Config  *sarama.ConfigAddress []stringTopic   string
}// 定义生产者接口
type Producer interface {MsgContent() []byte
}//实例化sarama: Config
func (kP *KafkaProducer) ProducerConfigInit() {config := sarama.NewConfig()//是否开启消息发送成功后通知 successes channelconfig.Producer.Return.Successes = true//重试次数config.Producer.Retry.Max = 3//失败后再次尝试的间隔时间config.Producer.Retry.Backoff = 1 * time.Second//指定kafka版本,不指定,使用最小版本,高版本的新功能可能无法正常使用.config.Version = sarama.V0_10_0_1kP.Config = config
}//同步消息模式
func (kP *KafkaProducer) SyncProducer(p Producer) error {//初始化客户端producer, err := sarama.NewSyncProducer(kP.Address, kP.Config)if err != nil {log.Printf("sarama.NewSyncProducer err, message=%s \n", err)return err}defer producer.Close()//发送消息msg := &sarama.ProducerMessage{Topic: kP.Topic,Value: sarama.ByteEncoder(p.MsgContent()),}part, offset, err := producer.SendMessage(msg)if err != nil {log.Printf("send message(%s) err=%s \n", p.MsgContent(), err)return err} else {log.Printf("发送成功,partition=%d, offset=%d \n", part, offset)return nil}
}// 启动Kafka生产者
func (kP *KafkaProducer) StartP(p Producer) error {kP.ProducerConfigInit()return kP.SyncProducer(p)
}

kafka_c.go

package kafka_cimport ("github.com/Shopify/sarama"cluster "github.com/bsm/sarama-cluster""log""time"
)// 定义Kafka消费者对象
type KafkaConsumer struct {Config  *cluster.ConfigAddress []stringTopics  []stringGroupId string
}// 定义消费者接口
type Consumer interface {Consumer([]byte) error
}//实例化sarama: Config
func (kC *KafkaConsumer) ConsumerConfigInit() {config := cluster.NewConfig()//接收失败通知config.Consumer.Return.Errors = true//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置config.Version = sarama.V0_10_0_1//初始从最新的offset开始config.Consumer.Offsets.Initial = sarama.OffsetNewestkC.Config = config
}//cluster消费组 消费
func (kC *KafkaConsumer) ClusterConsumer(c Consumer) error {//初始化客户端consumer, err := cluster.NewConsumer(kC.Address, kC.GroupId, kC.Topics, kC.Config)if err != nil {log.Printf("Failed open consumer: %s \n", err)return err}defer consumer.Close()go func() {for err := range consumer.Errors() {log.Printf("consumer err: %s \n", err)}}()go func() {for note := range consumer.Notifications() {log.Println("Rebalanced: ", note)}}()//接收消息for msg := range consumer.Messages() {if err = c.Consumer(msg.Value); err != nil {return nil}else {// MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offsetconsumer.MarkOffset(msg, "")time.Sleep(1 * time.Second)}}return nil
}// 启动Kafka消费者
func (kC *KafkaConsumer) StartC(c Consumer) error {kC.ConsumerConfigInit()return kC.ClusterConsumer(c)
}

配置kafka_config.go

package kafka_configimport "fmt"// Address
var Address = []string{"IP:9092"}var Topic = "test"            // 生产者topic
var Topics = []string{"test"} // 消费者topics
var GroupId = "test-group-1"  // 消费组id//发送者
type TestP struct {MsgData []byte
}// 实现发送者
func (t *TestP) MsgContent() []byte {fmt.Println(string(t.MsgData))return t.MsgData
}//接收者
type TestC struct {MsgData string
}// 实现接收者
func (t *TestC) Consumer(dataByte []byte) error {fmt.Println(string(dataByte))return nil
}type KafkaMessage struct {FileDir     string `json:"dir"`FileName    string `json:"file"`OperateType string `json:"operation"`OldData     string `json:"old_data"`NewData     string `json:"new_data"`
}type KafkaMessageP struct {UserName string         `json:"user_name"`MsgId    string         `json:"msg_id"`Messages []KafkaMessage `json:"msg"`
}

消费者main.go

package mainimport ("golong/kafka_c""golong/kafka_config"
)func main() {//消费者c := &kafka_config.TestC{}kC := &kafka_c.KafkaConsumer{Address: kafka_config.Address,Topics:  kafka_config.Topics,GroupId: kafka_config.GroupId,}_ = kC.StartC(c)
}

生产者main.go

package mainimport ("encoding/json""fmt""golong/kafka_config""golong/kafka_p"
)func main() {//生产者msgKp := &kafka_config.KafkaMessageP{}msgKp.UserName = "fxm"msgKpBytes, _ := json.Marshal(msgKp)p := &kafka_config.TestP{MsgData: msgKpBytes,}kP := &kafka_p.KafkaProducer{Address: kafka_config.Address,Topic:   kafka_config.Topic,}err := kP.StartP(p)if err != nil {fmt.Println("添加异常 !!!")}
}

panic: non-positive interval for NewTicker 问题处理

//处理1: 找到这个consumer.go源码位置,上面的第二个报错有标注位置github.com/bsm/sarama-cluster.(*Consumer).cmLoop(0xc000212000, 0xc0002ba1e0)D:/work/mygo/pkg/mod/github.com/bsm/sarama-cluster@v2.1.15+incompatible/consumer.go:452 +0x61// 修改452行,//	ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval)ticker := time.NewTicker(c.client.config.Consumer.Offsets.AutoCommit.Interval)// 保存重新build即可//处理2:把 sarama 版本改成 从 v1.26.1 --> v1.24.1 就可以用啦 github.com/Shopify/sarama v1.24.1gomod 的配置改下版本号就可以github.com/Shopify/sarama v1.24.1github.com/bsm/sarama-cluster v2.1.15+incompatible

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/439951.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1.4)深度学习笔记------深层神经网络

目录 1)Deep L-layer neural network 2)Forward Propagation in a Deep Network(重点) 3)Getting your matrix dimensions right 4)Building blocks of deep neural networks 5)Forward and Backward Propagation…

Struts1工作原理

Struts1工作原理图 1、初始化:struts框架的总控制器ActionServlet是一个Servlet,它在web.xml中配置成自动启动的Servlet,在启动时总控制器会读取配置文件(struts-config.xml)的配置信息,为struts中不同的模块初始化相应的对象。(面…

【洛谷 - P1772 】[ZJOI2006]物流运输(dp)

题干: 题目描述 物流公司要把一批货物从码头A运到码头B。由于货物量比较大,需要n天才能运完。货物运输过程中一般要转停好几个码头。物流公司通常会设计一条固定的运输路线,以便对整个运输过程实施严格的管理和跟踪。由于各种因素的存在&am…

RabbitMQ初识

官方介绍 - 中文 本文环境:ubuntu:20.04 RabbitMQ安装、配置与基本使用 安装RabbitMQ # 简易脚本安装 curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash sudo apt-get install rabbitmq-server -y --f…

Apollo进阶课程 ⑦ | 高精地图的采集与生产

目录 1.高精地图采集过程中需要用到的传感器 1.1)GPS 1.2)IMU 1.3)轮速计 2.高精地图采集过程中的制图方案 2.1)方案一 激光雷达 2.2)Camera融合激光雷达 原文链接:Apollo进阶课程 ⑦ | 高精地图的采…

你看不懂的spring原理是因为不知道这几个概念

背景 问题从一杯咖啡开始。 今天我去楼下咖啡机买了一杯「粉黛拿铁」。制作过程中显示: 我取了做好的粉黛拿铁,喝了一口,果然就是一杯热巧克力。咦咦咦,说好的拿铁呢?虽然我对「零点吧」的咖啡评价很高,觉…

EasyOcr 安装(linux、docker)、使用(gin、python)

EasyOcr git地址 EasyOCR是一款用python语言编写的OCR第三方库,同时支持GPU和CPU,目前已经支持超过70种语言. 安装(CPU) 注意: 本文是在仅在cpu下使用。如要使用CUDA版本,请在pytorch网站上选择正确的,并关闭此文章。…

Python之Numpy入门实战教程(2):进阶篇之线性代数

Numpy、Pandas、Matplotlib是Python的三个重要科学计算库,今天整理了Numpy的入门实战教程。NumPy是使用Python进行科学计算的基础库。 NumPy以强大的N维数组对象为中心,它还包含有用的线性代数,傅里叶变换和随机数函数。 本文主要介绍Numpy库…

【牛客 - 369F】小D的剑阵(最小割建图,二元关系建图,网络流最小割)

题干: 链接:https://ac.nowcoder.com/acm/contest/369/F 来源:牛客网 题目描述 现在你有 n 把灵剑,其中选择第i把灵剑会得到的 wiw_iwi​ 攻击力。 于此同时,还有q个约束,每个约束形如: …

【HDU - 3870】Catch the Theves(平面图转对偶图最短路,网络流最小割)

题干: A group of thieves is approaching a museum in the country of zjsxzy,now they are in city A,and the museum is in city B,where keeps many broken legs of zjsxzy.Luckily,GW learned the conspiracy when he is watching stars and told it to zjsxz…

Apollo进阶课程 ⑧ | 高精地图的格式规范

目录 高精地图规范格式分类 NDS格式规范 Open DRIVE格式规范 原文链接:Apollo进阶课程 ⑧ | 高精地图的格式规范 上周阿波君为大家详细介绍了「Apollo进阶课程⑦高精地图的采集与生产」。 高精地图采集过程中需要用到的传感器有GPS、IMU和轮速计。 无论是哪种传感…

Apollo进阶课程 ⑨ | 业界的高精地图产品

目录 高精地图的格式规范-OpenDRIVE HERE HD LIve Map HERE HD LIVE MAP-MAP COLLECTION HERE HD Live Map-Crowdsourced Update HERE HD Live Map-Learning HERE HD Live Map-Product MobileEye MobileEye-Pillars of Autonomous Driving MobileEye-Map as back-up s…

Apollo进阶课程⑩ | Apollo地图采集方案

目录 TomTom的高精地图和RoadDNA APOLLO地图采集流程 基站搭建 Apollo地图采集硬件方案 地图数据服务平台 原文链接:进阶课程⑩ | Apollo地图采集方案 上周阿波君为大家详细介绍了「Apollo进阶课程⑨业界的高精地图产品」。 出现在课程中的业界制作高精地图的厂…

用Python写Shell

环境 ubuntu: 18.04python: 3.6.9xnosh: 0.11.0 下载 pip3 install xonsh 简单使用 # 开启xonsh xonsh # 下载小工具(也可不下):高亮提示、智能补全 xpip install -U xonsh[full]# 随便下载一个包 pip3 install moneyimport money m1 money.Money(…

Apollo进阶课程⑪ | Apollo地图生产技术

目录 高精地图生产流程 数据采集 数据处理 元素识别 人工验证 全自动数据融合加工 基于深度学习的地图要素识别 人工验证生产 地图成果 原文链接:进阶课程⑪ | Apollo地图生产技术 高精地图是自动驾驶汽车的「千里眼」和「透视镜」。 摄像头、激光雷达、传…

Jenkins初识

Jenkins是啥 官方文档 Jenkins是一款开源 CI&CD 软件,用于自动化各种任务,包括构建、测试和部署软件。 Jenkins 支持各种运行方式,可通过系统包、Docker 或者通过一个独立的 Java 程序。CI(Continuous integration,持续集成…

Apollo进阶课程 ⑫ | Apollo高精地图

目录 Apollo高精地图表征元素 Apollo车道模型 UTM坐标系 84坐标系 Track坐标系 Apollo opDRIVE规范 HDMAP引擎 高精地图在政策方面的挑战 原文链接:进阶课程 ⑫ | Apollo高精地图 高精地图与普通地图不同,高精地图主要服务于自动驾驶车辆&#…

一步步编写操作系统 6 启动bochs

运行bochs 终于安装完成了,虽然这过程中有可能会出现各种各样的问题,但还是值得庆祝的,对Linux不熟的朋友第一次就搞定了这么个硬货,我理解您此时的喜大普奔之情,哈哈,给大家点赞。顺便说一句,…

Apollo技能图谱2.0焕新发布 更新7大能力91个知识点

阿波君 Apollo开发者社区 2月26日 过去的一年里,Apollo发展迅速,向智能交通不断渗透。从2.5到3.5版本,无论控制系统的升级、高清地图的泛用和车路协同技术服务的推进,无不在推动自动驾驶技术从开源向开辟商业化新格局位移。 在开…

一步步编写操作系统 07 开机启动bios

bios是如何苏醒的 bios其实一直睡在某个地方,直到被唤醒……前面热火朝天的说了bios的功能和内存布局,似乎还没说到正题上,bios是如何启动的呢。因为bios是计算机上第一个运行的软件,所以它不可能自己加载自己,由此可…