kafka初识

kafka中文文档 本文环境:ubuntu:18.04

kafka安装、配置与基本使用(单节点)

安装kafka

下载 0.10.0.1版本并解压缩
在这里插入图片描述

> tar -xzf kafka_2.11-0.10.0.1.tgz
> cd kafka_2.11-0.10.0.1.tgz

kafka简单配置

> vi config/server.properties主要注意三个地方:broker.id:			标识本机log.dirs:			是kafka接收消息存放路径zookeeper.connect:	指定连接的zookeeper集群地址

在这里插入图片描述

启动服务器

启动ZooKeeper服务器:
如果没有ZooKeeper服务器,可以通过与kafka打包在一起的便捷脚本来创建一个单节点ZooKeeper实例:

> bin/zookeeper-server-start.sh config/zookeeper.properties
> #后台启动: bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
[2021-02-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

启动Kafka服务器:

> bin/kafka-server-start.sh config/server.properties
> #后台启动: bin/kafka-server-start.sh -daemon config/server.properties
[2021-02-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2021-02-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

创建一个 topic

创建一个名为“test”的topic,它有一个分区和一个副本:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看所有 topic列表:

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

删除一个 topic (标记):

> bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

发送一些消息

Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群。默认情况下,每行将作为单独的message发送。
运行 producer,然后在控制台输入一些消息以发送到服务器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

启动一个 consumer

Kafka 还有一个命令行consumer(消费者),将消息转储到标准输出。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

查看消费组

查看所有消费组

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer

查看单个消费组的消费详情

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test(消费组name) --new-consumer

发布-订阅模式

Topic:

  • Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据

  • 对于每一个topic, Kafka集群都会维持一个分区日志,每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commitlog文件

  • 分区中的每一个记录都会分配一个id号来表示顺序,我们称之为 offset,offset用来唯一的标识分区中每一条记录

  • Kafka 集群保留所有发布的记录—无论他们是否已被消费—并通过一个可配置的参数——保留期限来控制.

  • 每一个消费者中唯一保存的元数据是offset(偏移量)即消费在log中的位置.

偏移量由消费者所控制:通常读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录.

生产者

生产者可以将数据发布到所选择的topic(主题)中

消费者

  • 消费者使用一个 消费组 名称来进行标识,发布到topic中的每条记录被分配给 订阅消费组 中的一个消费者实例.

  • 如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例

  • 如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程

消费者消费是以消费组为单位,消费组之间数据互不影响,组内数据均分

Go实现(Go 1.15)

kafka_p.go

package kafka_pimport ("github.com/Shopify/sarama""log""time"
)// 定义Kafka生产者对象
type KafkaProducer struct {Config  *sarama.ConfigAddress []stringTopic   string
}// 定义生产者接口
type Producer interface {MsgContent() []byte
}//实例化sarama: Config
func (kP *KafkaProducer) ProducerConfigInit() {config := sarama.NewConfig()//是否开启消息发送成功后通知 successes channelconfig.Producer.Return.Successes = true//重试次数config.Producer.Retry.Max = 3//失败后再次尝试的间隔时间config.Producer.Retry.Backoff = 1 * time.Second//指定kafka版本,不指定,使用最小版本,高版本的新功能可能无法正常使用.config.Version = sarama.V0_10_0_1kP.Config = config
}//同步消息模式
func (kP *KafkaProducer) SyncProducer(p Producer) error {//初始化客户端producer, err := sarama.NewSyncProducer(kP.Address, kP.Config)if err != nil {log.Printf("sarama.NewSyncProducer err, message=%s \n", err)return err}defer producer.Close()//发送消息msg := &sarama.ProducerMessage{Topic: kP.Topic,Value: sarama.ByteEncoder(p.MsgContent()),}part, offset, err := producer.SendMessage(msg)if err != nil {log.Printf("send message(%s) err=%s \n", p.MsgContent(), err)return err} else {log.Printf("发送成功,partition=%d, offset=%d \n", part, offset)return nil}
}// 启动Kafka生产者
func (kP *KafkaProducer) StartP(p Producer) error {kP.ProducerConfigInit()return kP.SyncProducer(p)
}

kafka_c.go

package kafka_cimport ("github.com/Shopify/sarama"cluster "github.com/bsm/sarama-cluster""log""time"
)// 定义Kafka消费者对象
type KafkaConsumer struct {Config  *cluster.ConfigAddress []stringTopics  []stringGroupId string
}// 定义消费者接口
type Consumer interface {Consumer([]byte) error
}//实例化sarama: Config
func (kC *KafkaConsumer) ConsumerConfigInit() {config := cluster.NewConfig()//接收失败通知config.Consumer.Return.Errors = true//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置config.Version = sarama.V0_10_0_1//初始从最新的offset开始config.Consumer.Offsets.Initial = sarama.OffsetNewestkC.Config = config
}//cluster消费组 消费
func (kC *KafkaConsumer) ClusterConsumer(c Consumer) error {//初始化客户端consumer, err := cluster.NewConsumer(kC.Address, kC.GroupId, kC.Topics, kC.Config)if err != nil {log.Printf("Failed open consumer: %s \n", err)return err}defer consumer.Close()go func() {for err := range consumer.Errors() {log.Printf("consumer err: %s \n", err)}}()go func() {for note := range consumer.Notifications() {log.Println("Rebalanced: ", note)}}()//接收消息for msg := range consumer.Messages() {if err = c.Consumer(msg.Value); err != nil {return nil}else {// MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offsetconsumer.MarkOffset(msg, "")time.Sleep(1 * time.Second)}}return nil
}// 启动Kafka消费者
func (kC *KafkaConsumer) StartC(c Consumer) error {kC.ConsumerConfigInit()return kC.ClusterConsumer(c)
}

配置kafka_config.go

package kafka_configimport "fmt"// Address
var Address = []string{"IP:9092"}var Topic = "test"            // 生产者topic
var Topics = []string{"test"} // 消费者topics
var GroupId = "test-group-1"  // 消费组id//发送者
type TestP struct {MsgData []byte
}// 实现发送者
func (t *TestP) MsgContent() []byte {fmt.Println(string(t.MsgData))return t.MsgData
}//接收者
type TestC struct {MsgData string
}// 实现接收者
func (t *TestC) Consumer(dataByte []byte) error {fmt.Println(string(dataByte))return nil
}type KafkaMessage struct {FileDir     string `json:"dir"`FileName    string `json:"file"`OperateType string `json:"operation"`OldData     string `json:"old_data"`NewData     string `json:"new_data"`
}type KafkaMessageP struct {UserName string         `json:"user_name"`MsgId    string         `json:"msg_id"`Messages []KafkaMessage `json:"msg"`
}

消费者main.go

package mainimport ("golong/kafka_c""golong/kafka_config"
)func main() {//消费者c := &kafka_config.TestC{}kC := &kafka_c.KafkaConsumer{Address: kafka_config.Address,Topics:  kafka_config.Topics,GroupId: kafka_config.GroupId,}_ = kC.StartC(c)
}

生产者main.go

package mainimport ("encoding/json""fmt""golong/kafka_config""golong/kafka_p"
)func main() {//生产者msgKp := &kafka_config.KafkaMessageP{}msgKp.UserName = "fxm"msgKpBytes, _ := json.Marshal(msgKp)p := &kafka_config.TestP{MsgData: msgKpBytes,}kP := &kafka_p.KafkaProducer{Address: kafka_config.Address,Topic:   kafka_config.Topic,}err := kP.StartP(p)if err != nil {fmt.Println("添加异常 !!!")}
}

panic: non-positive interval for NewTicker 问题处理

//处理1: 找到这个consumer.go源码位置,上面的第二个报错有标注位置github.com/bsm/sarama-cluster.(*Consumer).cmLoop(0xc000212000, 0xc0002ba1e0)D:/work/mygo/pkg/mod/github.com/bsm/sarama-cluster@v2.1.15+incompatible/consumer.go:452 +0x61// 修改452行,//	ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval)ticker := time.NewTicker(c.client.config.Consumer.Offsets.AutoCommit.Interval)// 保存重新build即可//处理2:把 sarama 版本改成 从 v1.26.1 --> v1.24.1 就可以用啦 github.com/Shopify/sarama v1.24.1gomod 的配置改下版本号就可以github.com/Shopify/sarama v1.24.1github.com/bsm/sarama-cluster v2.1.15+incompatible

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/439951.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1.4)深度学习笔记------深层神经网络

目录 1)Deep L-layer neural network 2)Forward Propagation in a Deep Network(重点) 3)Getting your matrix dimensions right 4)Building blocks of deep neural networks 5)Forward and Backward Propagation…

Struts1工作原理

Struts1工作原理图 1、初始化:struts框架的总控制器ActionServlet是一个Servlet,它在web.xml中配置成自动启动的Servlet,在启动时总控制器会读取配置文件(struts-config.xml)的配置信息,为struts中不同的模块初始化相应的对象。(面…

【洛谷 - P1772 】[ZJOI2006]物流运输(dp)

题干: 题目描述 物流公司要把一批货物从码头A运到码头B。由于货物量比较大,需要n天才能运完。货物运输过程中一般要转停好几个码头。物流公司通常会设计一条固定的运输路线,以便对整个运输过程实施严格的管理和跟踪。由于各种因素的存在&am…

RabbitMQ初识

官方介绍 - 中文 本文环境:ubuntu:20.04 RabbitMQ安装、配置与基本使用 安装RabbitMQ # 简易脚本安装 curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash sudo apt-get install rabbitmq-server -y --f…

Apollo进阶课程 ⑦ | 高精地图的采集与生产

目录 1.高精地图采集过程中需要用到的传感器 1.1)GPS 1.2)IMU 1.3)轮速计 2.高精地图采集过程中的制图方案 2.1)方案一 激光雷达 2.2)Camera融合激光雷达 原文链接:Apollo进阶课程 ⑦ | 高精地图的采…

【BZOJ 3831】【Poi2014】Little Bird(单调队列优化dp)

题干: Description In the Byteotian Line Forest there are trees in a row. On top of the first one, there is a little bird who would like to fly over to the top of the last tree. Being in fact very little, the bird might lack the strength to f…

你看不懂的spring原理是因为不知道这几个概念

背景 问题从一杯咖啡开始。 今天我去楼下咖啡机买了一杯「粉黛拿铁」。制作过程中显示: 我取了做好的粉黛拿铁,喝了一口,果然就是一杯热巧克力。咦咦咦,说好的拿铁呢?虽然我对「零点吧」的咖啡评价很高,觉…

EasyOcr 安装(linux、docker)、使用(gin、python)

EasyOcr git地址 EasyOCR是一款用python语言编写的OCR第三方库,同时支持GPU和CPU,目前已经支持超过70种语言. 安装(CPU) 注意: 本文是在仅在cpu下使用。如要使用CUDA版本,请在pytorch网站上选择正确的,并关闭此文章。…

Python之Numpy入门实战教程(2):进阶篇之线性代数

Numpy、Pandas、Matplotlib是Python的三个重要科学计算库,今天整理了Numpy的入门实战教程。NumPy是使用Python进行科学计算的基础库。 NumPy以强大的N维数组对象为中心,它还包含有用的线性代数,傅里叶变换和随机数函数。 本文主要介绍Numpy库…

【牛客 - 369F】小D的剑阵(最小割建图,二元关系建图,网络流最小割)

题干: 链接:https://ac.nowcoder.com/acm/contest/369/F 来源:牛客网 题目描述 现在你有 n 把灵剑,其中选择第i把灵剑会得到的 wiw_iwi​ 攻击力。 于此同时,还有q个约束,每个约束形如: …

一步步编写操作系统 1 部署工作环境 1

1.1工欲善其事,必先利其器。 如果您觉得操作系统已属于很底层的东西,我双手赞成。但是如果您像我之前一样,觉得底层的东西无法用上层高级的东西来构建,现在可以睁大眼睛好好看看下面要介绍的东西了。 首先,操作系统是…

多用户操作git“远程仓库“(本地)

设置本地远程仓库 准备远程仓库文件 cd ~/git-repo.git初始化 git init --shared修改git的接收配置 git config receive.denyCurrentBranch ignore初始化git仓库 git config user.email "fxmfxm.com" git config user.name "fxm" git add . git commit -m …

10点43博客文章汇总(2018年度)

今天是春节后上班第一天,将2018年度的文章进行汇总。总共分为三类:翻译、转载、原创。 1.翻译 翻译类目前完结的有Kaggle上的文章和斯坦福CS231n的文章。 Kaggle Learn的Python课程的中文翻译,链接为:Python;Kaggle …

【HDU - 3870】Catch the Theves(平面图转对偶图最短路,网络流最小割)

题干: A group of thieves is approaching a museum in the country of zjsxzy,now they are in city A,and the museum is in city B,where keeps many broken legs of zjsxzy.Luckily,GW learned the conspiracy when he is watching stars and told it to zjsxz…

一步步编写操作系统 2 部署工作环境 2

1.22汇编语言编译器新贵,NASM "新"是相对于旧来说的,老的汇编器MASM和TASM已经过时了,从名称上可以看出字母n是在m之后,其功能必然有所超越才会被大家接受。 请用一句话概括NASM优势在哪里?免费语法简洁使…

Apollo进阶课程 ⑧ | 高精地图的格式规范

目录 高精地图规范格式分类 NDS格式规范 Open DRIVE格式规范 原文链接:Apollo进阶课程 ⑧ | 高精地图的格式规范 上周阿波君为大家详细介绍了「Apollo进阶课程⑦高精地图的采集与生产」。 高精地图采集过程中需要用到的传感器有GPS、IMU和轮速计。 无论是哪种传感…

Casbin初识

Casbin中文文档 环境 go:1.15casbin:v2mysql:5.7 代码 package mycasbinimport ("fmt""github.com/casbin/casbin/v2""github.com/casbin/casbin/v2/model"gormAdapter "github.com/casbin/gorm-adapter/v3""gorm.io/driver/…

Apollo进阶课程 ⑨ | 业界的高精地图产品

目录 高精地图的格式规范-OpenDRIVE HERE HD LIve Map HERE HD LIVE MAP-MAP COLLECTION HERE HD Live Map-Crowdsourced Update HERE HD Live Map-Learning HERE HD Live Map-Product MobileEye MobileEye-Pillars of Autonomous Driving MobileEye-Map as back-up s…

【 HDU - 3062】Party(2-sat)

题干: 有n对夫妻被邀请参加一个聚会,因为场地的问题,每对夫妻中只有1人可以列席。在2n 个人中,某些人之间有着很大的矛盾(当然夫妻之间是没有矛盾的),有矛盾的2个人是不会同时出现在聚会上的。…

微博API接入初识【cxn专用】

微博API官方文档 本文介绍 本文环境成为微博开发者通过鉴权获取单条微博内容 环境 WindowsPython 3.8.10sinaweibopy3-1.3 (pip3 install sinaweibopy3)requests 成为微博开发者 微博官方新手教程 (cxn可以跳过,用博主的即可…