Kafka使用案例

1、Kafka 生产者(Producer)示例

#include <iostream>
#include <string>
#include <librdkafka/rdkafkacpp.h>class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:void dr_cb (RdKafka::Message &message) {std::cout << "Message delivery for (" << message.len() << " bytes): " <<message.errstr() << std::endl;}
};int main() {std::string brokers = "localhost:9092"; // Kafka brokersstd::string errstr;RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);conf->set("bootstrap.servers", brokers, errstr);ExampleDeliveryReportCb ex_dr_cb;conf->set("dr_cb", &ex_dr_cb, errstr);RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);if (!producer) {std::cerr << "Failed to create producer: " << errstr << std::endl;delete tconf;delete conf;return 1;}std::string topic_str = "test_topic";RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str,tconf, errstr);if (!topic) {std::cerr << "Failed to create topic: " << errstr << std::endl;delete producer;delete tconf;delete conf;return 1;}std::string key;std::string payload = "Hello, Kafka!";RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA,RdKafka::Producer::RK_MSG_COPY /* Copy payload */,const_cast<char *>(payload.c_str()), payload.size(),&key, NULL);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Failed to produce message: " <<RdKafka::err2str(resp) << std::endl;} else {std::cout << "Produced message (" << payload.size() << " bytes)" <<std::endl;producer->poll(0); // Non-blocking poll}producer->flush(10000); // Wait for up to 10 seconds to flush messagesdelete topic;delete producer;delete tconf;delete conf;return 0;
}

2、 Kafka 消费者(Consumer)示例

#include <iostream>
#include <string>
#include <librdkafka/rdkafkacpp.h>class ExampleConsumeCb : public RdKafka::ConsumeCb {
public:void consume_cb (RdKafka::Message &message, void *opaque) {if (message.err() == RdKafka::ERR_NO_ERROR) {std::cout << "Message received (" << message.len() << " bytes)" << std::endl;if (message.key()) {std::cout << "Key: " << *message.key() << std::endl;}std::cout << "Payload: " << std::string(static_cast<char *>(message.payload()), message.len()) << std::endl;} else {std::cerr << "Error while consuming message: " << message.errstr() << std::endl;}}
};int main() {std::string brokers = "localhost:9092"; // Kafka brokersstd::string errstr;RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);conf->set("bootstrap.servers", brokers, errstr);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);if (!consumer) {std::cerr << "Failed to create consumer: " << errstr << std::endl;delete tconf;delete conf;return 1;}std::string topic_str = "test_topic";RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str,tconf, errstr);if (!topic) {std::cerr << "Failed to create topic: " << errstr << std::endl;delete consumer;delete tconf;delete conf;return 1;}ExampleConsumeCb ex_consume_cb;RdKafka::ErrorCode resp = consumer->start(topic, 0, RdKafka::Topic::OFFSET_BEGINNING);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Failed to start consumer: " <<RdKafka::err2str(resp) << std::endl;delete topic;delete consumer;delete tconf;delete conf;return 1;}while (true) {RdKafka::Message *msg = consumer->consume(topic, 0, 1000);ex_consume_cb.consume_cb(*msg, NULL);delete msg;}consumer->stop(topic, 0);consumer->poll(1000); // Final cleanupdelete topic;delete consumer;delete tconf;delete conf;return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/51230.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

centos7安装redis数据库步骤

文章目录 前言步骤1、下载redis并解压到指定路径2、make 和 make install3、配置redis.conf4、制作启动脚本5、授权并启动 前言 我安装了很多次redis&#xff0c;包括redis安装、redis安装、或者使用ansible等自动化构建安装&#xff0c;但是直接用安装包安装还是比较少。 今…

软件测试---网络基础、HTTP

一、网络基础 &#xff08;1&#xff09;Web和网络知识 网络基础TCP/IP 使用HTTP协议访问Web WWW万维网的诞生 WWW万维网的构成 &#xff08;2&#xff09;IP协议 &#xff08;3&#xff09;可靠传输的TCP和三次握手策略 &#xff08;4&#xff09;域名解析服务DNS &#xff0…

【51单片机仿真】基于51单片机设计的广告机系统仿真源码原理图设计文档

效果: 摘要 该系统基于51单片机,通过LED点阵显示字符和简单图案,并实现按键控制。系统可以用于广告机,通过两个按键实现暂停/继续显示和显示方向切换功能。系统包含硬件电路设计和软件编程两部分。 目录 第1章 绪论 第2章 系统分析与总体设计 第3章 系统的硬件结构实现 …

Vue3-拉开序幕的setup

Vue3 中的 setup 是一个新的配置项&#xff0c;值是一个函数。 export default {name: App,setup: function () {} } </script> 和 Vue2 中的 data 一样&#xff0c;我也可以将 setup 简写成为 export default {name: App,setup() {} } setup函数的使用 与 Vue2 不一样…

go 语言踏出第一步

1、下载Go语言安装包&#xff1a;在官方网站&#xff08;https://golang.org/dl/&#xff09;上下载适合你操作系统的Go语言安装包。选择一个tar.gz格式的包。 2、解压安装包&#xff1a;打开终端&#xff0c;进入下载目录&#xff0c;并使用以下命令解压安装包&#xff1a; ta…

Git操作指令(已完结)

Git操作指令 一、安装git 1、设置配置信息&#xff1a; # global全局配置 git config --global user.name "Your username" git config --global user.email "Your email"# 显示颜色 git config --global color.ui true# 配置别名&#xff0c;各种指令都…

详细介绍MLP的原理

什么是MLP MLP&#xff08;Multi-Layer Perceptron&#xff09;&#xff0c;即多层感知机&#xff0c;是一种前馈型人工神经网络。它由一个输入层、一个输出层以及至少一个隐藏层&#xff08;输入层和输出层中间的层&#xff09;组成。每个神经元&#xff08;或称为节点&#x…

【Django】 js实现动态赋值、显示show隐藏hide效果

文章目录 需要达到的前端效果预览&#xff1a;实现步骤复制bootstrp代码&#xff08;buttons&#xff09;复制bootstrp代码&#xff08;Alert警告框&#xff09;写js测试效果 需要达到的前端效果预览&#xff1a; {% load static %} <!DOCTYPE html> <html lang"…

十分钟速通 MySQL —— CRUD

表格的结构 在之前的课程中我们已经学习了关系型数据库的表格&#xff0c;我们再来回顾-下表格由哪些元素构成 表由表名、行、列、列名构成表名是表的名称列名表示列的名字&#xff0c;列名不可以重复表格实质上是一个二维数组&#xff0c;行和列都是从0开始数的(数组的特性) …

线程池配置与CPU利用率

线程数设置理论 CPU密集型&#xff1a;核心数 1I/O密集型&#xff1a;核心数 * 2 CPU利用率基础 单个CPU核心在单位时间内只能执行一个线程的指令。 实验验证 死循环测试&#xff1a;单线程可跑满一个核心利用率。多线程测试&#xff1a;增加线程数&#xff0c;核心利用率…

【软考】广义表

目录 1. 说明2. 基本操作3. 特点4. 存储结构5. 例题5.1 例题1 1. 说明 1.广义表是线性表的推广&#xff0c;是由0个或多个单元素或子表组成的有限序列。2.广义表与线性表的区别在于:线性表的元素都是结构上不可分的单元素&#xff0c;而广义表的元素既可以是单元素&#xff0c…

【过滤器 vs 拦截器】SpringBoot中过滤器与拦截器:明智选择的艺术(如何在项目中做出明智选择)

文章目录 SpringBoot 过滤器 vs 拦截器过滤器 (Filter)定义特点使用场景实现步骤创建过滤器类注册过滤器&#xff08;可选&#xff0c;如果不使用 WebFilter 注解&#xff09; 拦截器 (Interceptor)定义特点使用场景实现步骤创建拦截器类注册拦截器 过滤器与拦截器的比较实际项…

Python教程:一文了解Python中的序列化与反序列化

目录 1. 序列化与反序列化概述 1.1 什么是序列化&#xff1f; 1.2 什么是反序列化&#xff1f; 1.3 应用场景 2. Python中的序列化与反序列化模块 2.1 pickle模块 2.1.1 使用示例 2.2 json模块 2.2.1 使用示例 2.3 yaml模块 2.3.1 使用示例 2.4 marshal模块 3. 实…

【北京迅为】《i.MX8MM嵌入式Linux开发指南》-第三篇 嵌入式Linux驱动开发篇-第六十二章 定时器按键消抖实验

i.MX8MM处理器采用了先进的14LPCFinFET工艺&#xff0c;提供更快的速度和更高的电源效率;四核Cortex-A53&#xff0c;单核Cortex-M4&#xff0c;多达五个内核 &#xff0c;主频高达1.8GHz&#xff0c;2G DDR4内存、8G EMMC存储。千兆工业级以太网、MIPI-DSI、USB HOST、WIFI/BT…

linux mysql 添加环境变量

要在Linux上添加MySQL的环境变量&#xff0c;可以按照以下步骤进行操作&#xff1a;打开终端窗口。使用文本编辑器&#xff08;如vi或nano&#xff09;打开~/.bashrc文件&#xff1a;vi ~/.bashrc或nano ~/.bashrc在文件的末尾添加以下内容&#xff1a;export PATH$PATH:/path/…

shardingsphere的学习(一):shardingsphere的基本概念和水平分表例子

简介 关于shardingsphere的基本相关概念的学习和使用shardingjdbc进行水平分表的例子 shardingsphere 是一套开源的分布式关系型数据库中间件解决方案&#xff0c;由sharding-jdbc&#xff0c;sharding-proxy&#xff0c;sharding-sidecar三个项目组成。 定位为关系型数据库…

日期类的实现(C++实现)

完整呈现 Date.h #include <iostream> using namespace std; //日期类 class Date { public:int GetMonthDays(int year, int month) const;//构造函数Date(int year 0, int month 1, int day 1);//拷贝构造Date(const Date& d);//打印void Print();//析构函数~…

Redis与MySQL数据一致性问题的策略模式及解决方案

目录 一、策略模式 1、旁路缓存模式&#xff08;Cache Aside Pattern&#xff09; 2、读写穿透&#xff08;Read-Through/Write-Through&#xff09; 3、异步缓存写入&#xff08;Write Behind&#xff09; 二、一致性解决方案 1、缓存延迟双删 2、删除重试机制 3、读取…

NodeJS:npm的使用

npm时nodejs的包安装工具 1.查看版本 $ npm -v 9.6.7 2.升级npm $ sudo npm install npm -g 3.安装nodejs模块 $ npm install <Module Name> 通过该方法将在当前目录下创建文件夹node_modules&#xff0c;并将模块安装到node_modules中 可以通过-g参数指定模块为全局安…

Zygote 进程你不知道的东西

一、概述 1.Zygote&#xff08;孵化&#xff09; 进程是所有 Android进程的父进程&#xff0c;包括SystemServer和各种应用进程都是通过Zygote进程fork出来的。Zygote进程相当于Android系统的根进程&#xff0c;系统启动后所有的进程都是通过这个进程fork出来的。这样做的好处…