librdkafka的简单使用

文章目录

    • 摘要
    • kafka是什么
    • 安装环境
    • librdkafka的简单使用
      • 生产者
      • 消费者

摘要

本文是Getting Started with Apache Kafka and C/C++的中文版, kafka的hello world程序。

本文完整代码见仓库,这里只列出producer/consumer的代码


kafka是什么

本节来源:Kafka - 维基百科,自由的百科全书、Kafka入门简介 - 知乎

首先我们得知道什么是Kafka。

Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理消息的机制。

在这里插入图片描述

kafka有以下一些基本概念:

  • Producer - 消息生产者,就是向kafka broker发消息的客户端。
  • Consumer - 消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。
  • Topic - 主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。
  • Partition - 消息分区,一个topic可以分为多个 partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
  • Broker - 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  • Consumer Group - 消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消费一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。
  • Offset - 消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消费者可以指定偏移量来指定要消费的消息。

安装环境

上一节,kafka的概念看着比较简单,发布-订阅/生产-消费的模型。

为了可以调用Kafka的C/C++ API, 需要先安装环境。

# almlinux8
# dnf search kafka
# dnf install librdkafka-devel# dnf search glib
# dnf install glib2-devel# ubuntu22
# 开发库apt install librdkafka-dev  libglib2.0-dev# 安装docker环境apt install docker.io docker-compose# 本地安装 Kafka
## ref: https://docs.confluent.io/confluent-cli/current/install.html#cpwget -qO - https://packages.confluent.io/confluent-cli/deb/archive.key | sudo apt-key add
➜ add-apt-repository "deb https://packages.confluent.io/confluent-cli/deb stable main"apt install confluent-cli## 启动kafka
## usage: https://docs.confluent.io/confluent-cli/current/command-reference/local/kafka/confluent_local_kafka_start.html
## error: https://stackoverflow.com/questions/63776518/error-2-matches-found-based-on-name-network-nameofservice-default-is-ambiguo
## error:https://stackoverflow.com/questions/77985757/kafka-in-docker-using-confluent-cli-doesnt-workwhereis confluent
confluent: /usr/bin/confluent➜ export CONFLUENT_HOME=/usr/bin/confluent# 我执行下面命令后,没有看到Plaintext Ports信息
➜ confluent local kafka start# 停止,然后重新启动,管用了
➜ confluent local kafka stop
➜ confluent local kafka startThe local commands are intended for a single-node development environment only, NOT for production usage. See more: https://docs.confluent.io/current/cli/index.htmlPulling from confluentinc/confluent-local
Digest: sha256:ad62269bf4766820c298f7581cf872a49f46a11dbaebcccb4fd2e71049288c5b
Status: Image is up to date for confluentinc/confluent-local:7.6.0
+-----------------+-------+
| Kafka REST Port | 8082  |
| Plaintext Ports | 43465 |
+-----------------+-------+
Started Confluent Local containers "8d72d911a4".
To continue your Confluent Local experience, run `confluent local kafka topic create <topic>` and `confluent local kafka topic produce <topic>`.# Create a new topic, purchases, which you will use to produce and consume events.
➜ confluent local kafka topic create purchases
Created topic "purchases".

librdkafka的简单使用

confluenceinc/librdkafka是Apache Kafka协议的 C 库实现 ,提供生产者、消费者和管理客户端。

下面运行的程序来自:Apache Kafka and C/C++ - Getting Started Tutorial

代码中kafka的API可以查询:librdkafka: librdkafka documentation

代码中使用了glib库,日常开发我不会使用这个库,因为感觉比较冷,它的API可查询:GLib – 2.0: Automatic Cleanup


生产者

总体逻辑:

  • 从配置文件中加载配置
  • 创建生产者
  • 生产者发送消息
#include <glib.h>
#include <librdkafka/rdkafka.h>#include "common.c"#define ARR_SIZE(arr) ( sizeof((arr)) / sizeof((arr[0])) )/* Optional per-message delivery callback (triggered by poll() or flush())* when a message has been successfully delivered or permanently* failed delivery (after retries).*/
static void dr_msg_cb (rd_kafka_t *kafka_handle,const rd_kafka_message_t *rkmessage,void *opaque) {if (rkmessage->err) {g_error("Message delivery failed: %s", rd_kafka_err2str(rkmessage->err));}
}int main (int argc, char **argv) {rd_kafka_t *producer;rd_kafka_conf_t *conf;char errstr[512];// Parse the command line.if (argc != 2) {g_error("Usage: %s <config.ini>", argv[0]);return 1;}// Parse the configuration.// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.mdconst char *config_file = argv[1];g_autoptr(GError) error = NULL;g_autoptr(GKeyFile) key_file = g_key_file_new();if (!g_key_file_load_from_file (key_file, config_file, G_KEY_FILE_NONE, &error)) {g_error ("Error loading config file: %s", error->message);return 1;}// Load the relevant configuration sections.conf = rd_kafka_conf_new();load_config_group(conf, key_file, "default");// Install a delivery-error callback.rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);// Create the Producer instance.producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));if (!producer) {g_error("Failed to create new producer: %s", errstr);return 1;}// Configuration object is now owned, and freed, by the rd_kafka_t instance.conf = NULL;// Produce data by selecting random values from these lists.int message_count = 10;const char *topic = "purchases";const char *user_ids[6] = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"};const char *products[5] = {"book", "alarm clock", "t-shirts", "gift card", "batteries"};for (int i = 0; i < message_count; i++) {const char *key =  user_ids[random() % ARR_SIZE(user_ids)];const char *value =  products[random() % ARR_SIZE(products)];size_t key_len = strlen(key);size_t value_len = strlen(value);rd_kafka_resp_err_t err;err = rd_kafka_producev(producer,RD_KAFKA_V_TOPIC(topic),RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),RD_KAFKA_V_KEY((void*)key, key_len),RD_KAFKA_V_VALUE((void*)value, value_len),RD_KAFKA_V_OPAQUE(NULL),RD_KAFKA_V_END);if (err) {g_error("Failed to produce to topic %s: %s", topic, rd_kafka_err2str(err));return 1;} else {g_message("Produced event to topic %s: key = %12s value = %12s", topic, key, value);}rd_kafka_poll(producer, 0);}// Block until the messages are all sent.g_message("Flushing final messages..");rd_kafka_flush(producer, 10 * 1000);if (rd_kafka_outq_len(producer) > 0) {g_error("%d message(s) were not delivered", rd_kafka_outq_len(producer));}g_message("%d events were produced to topic %s.", message_count, topic);rd_kafka_destroy(producer);return 0;
}

消费者

总体逻辑:

  • 从配置文件中加载配置
  • 创建消费者
  • 订阅topic
  • 轮询消费者的消息
#include <glib.h>
#include <librdkafka/rdkafka.h>#include "common.c"static volatile sig_atomic_t run = 1;/*** @brief Signal termination of program*/
static void stop(int sig) { run = 0; }int main(int argc, char **argv) {rd_kafka_t *consumer;rd_kafka_conf_t *conf;rd_kafka_resp_err_t err;char errstr[512];// Parse the command line.if (argc != 2) {g_error("Usage: %s <config.ini>", argv[0]);return 1;}// Parse the configuration.// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.mdconst char *config_file = argv[1];g_autoptr(GError) error = NULL;g_autoptr(GKeyFile) key_file = g_key_file_new();if (!g_key_file_load_from_file(key_file, config_file, G_KEY_FILE_NONE,&error)) {g_error("Error loading config file: %s", error->message);return 1;}// Load the relevant configuration sections.conf = rd_kafka_conf_new();load_config_group(conf, key_file, "default");load_config_group(conf, key_file, "consumer");// Create the Consumer instance.consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));if (!consumer) {g_error("Failed to create new consumer: %s", errstr);return 1;}rd_kafka_poll_set_consumer(consumer);// Configuration object is now owned, and freed, by the rd_kafka_t instance.conf = NULL;// Convert the list of topics to a format suitable for librdkafka.const char *topic = "purchases";rd_kafka_topic_partition_list_t *subscription =rd_kafka_topic_partition_list_new(1);rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA);// Subscribe to the list of topics.err = rd_kafka_subscribe(consumer, subscription);if (err) {g_error("Failed to subscribe to %d topics: %s", subscription->cnt,rd_kafka_err2str(err));rd_kafka_topic_partition_list_destroy(subscription);rd_kafka_destroy(consumer);return 1;}rd_kafka_topic_partition_list_destroy(subscription);// Install a signal handler for clean shutdown.signal(SIGINT, stop);// Start polling for messages.while (run) {rd_kafka_message_t *consumer_message;consumer_message = rd_kafka_consumer_poll(consumer, 500);if (!consumer_message) {g_message("Waiting...");continue;}if (consumer_message->err) {if (consumer_message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {/* We can ignore this error - it just means we've read* everything and are waiting for more data.*/} else {g_message("Consumer error: %s",rd_kafka_message_errstr(consumer_message));return 1;}} else {g_message("Consumed event from topic %s: key = %.*s value = %s",rd_kafka_topic_name(consumer_message->rkt),(int)consumer_message->key_len, (char *)consumer_message->key,(char *)consumer_message->payload);}// Free the message when we're done.rd_kafka_message_destroy(consumer_message);}// Close the consumer: commit final offsets and leave the group.g_message("Closing consumer");rd_kafka_consumer_close(consumer);// Destroy the consumer.rd_kafka_destroy(consumer);return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/777696.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【VMware Workstation】公司所有主机和虚拟机ip互通,以及虚拟机目录迁移

文章目录 1、场景2、环境3、实战3.1、所有主机和虚拟机ip互通Stage 1 : 【虚拟机】设置为桥接模式Stage 2 : 【虚拟机】设置ipStage 3 : 【路由器】ARP 静态绑定MACStage 3-1 ping 路由器 ipStage 3-2 【静态绑定】虚拟机查看mac地址Stage 3-3 【静态绑定】路由器ARP 静态绑定 …

更高效稳定 | 基于ACM32 MCU的编程直流电源应用方案

随着电子设备的多样化发展&#xff0c;面对不同的应用场景&#xff0c;需要采用特定的供电电源。因此&#xff0c;在电子产品的开发测试过程中&#xff0c;必不可少使用编程直流电源来提供测试电压&#xff0c;协助完成初步的开发测试过程。 编程直流电源概述 编程直流电源结构…

浅谈C语言编译与链接

个人主页&#xff08;找往期文章包括但不限于本期文章中不懂的知识点&#xff09;&#xff1a;我要学编程(ಥ_ಥ)-CSDN博客 翻译环境和运行环境 在ANSI C&#xff08;标准 C&#xff09;的任何一种实现中&#xff0c;存在两个不同的环境。 第1种是翻译环境&#xff0c;在这个…

IntelliJ IDEA中遇到的“cannot access java.lang.String“错误及其解决方案(day8)

intelliJ 今天遇到使用intelliJ遇到了一个新错误&#xff0c;有问题就解决问题是一个程序员最基本的修养&#xff0c;如下&#xff1a; 在上面的代码中&#xff0c;我使用了this.这个关键字&#xff0c;发现出现了以上问题&#xff0c;找了一些资料&#xff0c;不是很明白&am…

LDO(低压差线性稳压器)

一般压差较小的降压模块就用LDO 一、CJ78L05 芯片描述&#xff1a;可实现VCC转5v 二、ME6215C33M5G 芯片描述&#xff1a;可实现VCC转3.3V 三、AMS1117-3.3&#xff08;a&#xff09; 芯片描述&#xff1a;一般用来实现5V转3.3V AMS1117-3.3&#xff08;b&#xff09; 芯…

理解JVM:从字节码到程序运行

大家好&#xff0c;我是程序员大猩猩。 今天我们来讲一下JVM&#xff0c;好多面试者在面试的时候&#xff0c;都会被问及JVM相关知识。那么JVM到底是什么&#xff0c;要理解它到底是出于什么原因&#xff1f; JVM俗称Java虚拟机&#xff0c;它是一个抽象的计算机&#xff0c;…

蓝桥杯2017年第十三届省赛真题-承压计算

一、题目 承压计算 X星球的高科技实验室中整齐地堆放着某批珍贵金属原料。 每块金属原料的外形、尺寸完全一致&#xff0c;但重量不同。 金属材料被严格地堆放成金字塔形。 7 5 8 7 8 8 …

腾讯云4核8g服务器多少钱?2024轻量和CVM收费价格表

2024年腾讯云4核8G服务器租用优惠价格&#xff1a;轻量应用服务器4核8G12M带宽646元15个月&#xff0c;CVM云服务器S5实例优惠价格1437.24元买一年送3个月&#xff0c;腾讯云4核8G服务器活动页面 txybk.com/go/txy 活动链接打开如下图&#xff1a; 腾讯云4核8G服务器优惠价格 轻…

5.6 物联网RK3399项目开发实录-Android开发之(wulianjishu666)

物联网入门到项目实干案例下载&#xff1a; https://pan.baidu.com/s/1fHRxXBqRKTPvXKFOQsP80Q?pwdh5ug --------------------------------------------------------------------------------------------------------------------------------- U-Boot 使用 前言 RK U-B…

AMD本月发布的成本优化型Spartan UltraScale+ FPGA系列

随着 FPGA 在更多应用中的使用&#xff0c;AMD 推出了最新的成本、功耗与性能平衡的系列产品。为了扩展其可编程逻辑产品组合&#xff0c;AMD最近推出了最新的成本优化型 Spartan FPGA 系列。随着 FPGA 应用于越来越多的产品和设备&#xff0c;设计人员可能经常发现自己正在寻找…

Git,GitHub,Gitee,GitLab 四者有什么区别?

目录 1. Git 2. GitHub 3. Gitee 4. GitLab 5. 总结概括 1. Git Git 是一个版本管理工具&#xff0c;常应用于本地代码的管理&#xff0c;下载完毕之后&#xff0c;我们可以使用此工具对本地的资料&#xff0c;代码进行版本管理。 下载链接&#xff1a; Git - Downlo…

Eclipse+Java+Swing实现斗地主游戏

一. 视频演示效果 java斗地主源码演示 ​ 二.项目结构 代码十分简洁&#xff0c;只有简单的7个类&#xff0c;实现了人机对战 素材为若干的gif图片 三.项目实现 启动类为Main类&#xff0c;继承之JFrame&#xff0c;JFrame 是 Java Swing 库中的一个类&#xff0c;用于创建窗…

【计算机图形学】3D Implicit Transporter for Temporally Consistent Keypoint Discovery

对3D Implicit Transporter for Temporally Consistent Keypoint Discovery的简单理解 文章目录 1. 现有方法限制和文章改进2. 方法2.1 寻找时间上一致的3D特征点2.1.1 3D特征Transporter2.1.2 几何隐式解码器2.1.3 损失函数 2.2 使用一致特征点的操纵 1. 现有方法限制和文章改…

阿里云CentOS7安装Hadoop3伪分布式

ECS准备 开通阿里云ECS 略 控制台设置密码 连接ECS 远程连接工具连接阿里云ECS实例&#xff0c;这里远程连接工具使用xshell 根据提示接受密钥 根据提示写用户名和密码 用户名&#xff1a;root 密码&#xff1a;在控制台设置的密码 修改主机名 将主机名从localhost改为需要…

HarmonyOS 应用开发之Want的定义与用途

Want 是一种对象&#xff0c;用于在应用组件之间传递信息。 其中&#xff0c;一种常见的使用场景是作为 startAbility() 方法的参数。例如&#xff0c;当UIAbilityA需要启动UIAbilityB并向UIAbilityB传递一些数据时&#xff0c;可以使用Want作为一个载体&#xff0c;将数据传递…

如何在Flutter中进行网络请求?

Hello&#xff01;大家好&#xff0c;我是咕噜铁蛋&#xff0c;你们的好朋友&#xff01;今天&#xff0c;我想和大家分享一下在Flutter中如何进行网络请求。Flutter作为一个跨平台的开发框架&#xff0c;网络请求是其实现数据交互的重要一环。下面&#xff0c;我将详细介绍几种…

构建一个基础的大型语言模型(LLM)应用程序

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

docker部署-RabbitMq

1. 参考 RabbitMq官网 docker官网 2. 拉取镜像 这里改为自己需要的版本即可&#xff0c;下面容器也需要同理修改 docker pull rabbitmq:3.12-management3. 运行容器 docker run \ --namemy-rabbitmq-01 \ -p 5672:5672 \ -p 15672:15672 \ -d \ --restart always \ -…

java算法day37 | 贪心算法 part06 ● 738.单调递增的数字 ● 968.监控二叉树

738.单调递增的数字 思路&#xff1a; 从后向前遍历&#xff0c;如果前一个数比后一个数大&#xff0c;则前一个数-1&#xff0c;后面的数都变成9. 思路不难&#xff0c;但实现的代码还是有一点繁琐的。 以下是用List实现的代码。 class Solution {public int monotoneIncrea…

【群晖】部署UptimeKuma监控服务

【群晖】部署UptimeKuma监控服务 点击标题查看原文 本文讲解在群晖系统中使用docker方式部署UptimeKuma服务并通过外网地址正确访问 配置及版本 DSM&#xff1a;7.2&#xff08;7.x以上均可&#xff09; UptimeKuma&#xff1a;louislam/uptime-kuma:latest 安装 docker中下…