Kafka多语言版本

Installation

curl -sSL https://raw.githubusercontent.com/bitnami/containers/main/bitnami/kafka/docker-compose.yml > docker-compose.yml
docker-compose up -d

Rust

这里使用的是rdkafka,

producer.rs:

use std::time::Duration;use clap::{App, Arg};
use log::info;use rdkafka::config::ClientConfig;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::get_rdkafka_version;use crate::example_utils::setup_logger;mod example_utils;async fn produce(brokers: &str, topic_name: &str) {let producer: &FutureProducer = &ClientConfig::new().set("bootstrap.servers", brokers).set("message.timeout.ms", "5000").create().expect("Producer creation error");let send_time = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).expect("Time went backwards").as_millis(); // 获取当前时间戳(毫秒)// This loop is non blocking: all messages will be sent one after the other, without waiting// for the results.let futures = (0..5).map(|i| async move {// The send operation on the topic returns a future, which will be// completed once the result or failure from Kafka is received.let delivery_status = producer.send(FutureRecord::to(topic_name).payload(&format!("{}",  send_time)).key(&format!("Key {}", i)).headers(OwnedHeaders::new().insert(Header {key: "header_key",value: Some("header_value"),})),Duration::from_secs(0),).await;// This will be executed when the result is received.info!("Delivery status for message {} received", i);delivery_status}).collect::<Vec<_>>();// This loop will wait until all delivery statuses have been received.for future in futures {info!("Future completed. Result: {:?}", future.await);}
}#[tokio::main]
async fn main() {let matches = App::new("producer example").version(option_env!("CARGO_PKG_VERSION").unwrap_or("")).about("Simple command line producer").arg(Arg::with_name("brokers").short("b").long("brokers").help("Broker list in kafka format").takes_value(true).default_value("localhost:9092"),).arg(Arg::with_name("log-conf").long("log-conf").help("Configure the logging format (example: 'rdkafka=trace')").takes_value(true),).arg(Arg::with_name("topic").short("t").long("topic").help("Destination topic").takes_value(true).required(true),).get_matches();setup_logger(true, matches.value_of("log-conf"));let (version_n, version_s) = get_rdkafka_version();info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);let topic = matches.value_of("topic").unwrap();let brokers = matches.value_of("brokers").unwrap();produce(brokers, topic).await;
}

consumer.rs

use clap::{App, Arg};
use log::{info, warn};use rdkafka::client::ClientContext;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance};
use rdkafka::error::KafkaResult;
use rdkafka::message::{Headers, Message};
use rdkafka::topic_partition_list::TopicPartitionList;
use rdkafka::util::get_rdkafka_version;use crate::example_utils::setup_logger;mod example_utils;// A context can be used to change the behavior of producers and consumers by adding callbacks
// that will be executed by librdkafka.
// This particular context sets up custom callbacks to log rebalancing events.
struct CustomContext;impl ClientContext for CustomContext {}impl ConsumerContext for CustomContext {fn pre_rebalance(&self, rebalance: &Rebalance) {info!("Pre rebalance {:?}", rebalance);}fn post_rebalance(&self, rebalance: &Rebalance) {info!("Post rebalance {:?}", rebalance);}fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {info!("Committing offsets: {:?}", result);}
}// A type alias with your custom consumer can be created for convenience.
type LoggingConsumer = StreamConsumer<CustomContext>;async fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {let context = CustomContext;let consumer: LoggingConsumer = ClientConfig::new().set("group.id", group_id).set("bootstrap.servers", brokers).set("enable.partition.eof", "false").set("session.timeout.ms", "6000").set("enable.auto.commit", "true")//.set("statistics.interval.ms", "30000")//.set("auto.offset.reset", "smallest").set_log_level(RDKafkaLogLevel::Debug).create_with_context(context).expect("Consumer creation failed");consumer.subscribe(&topics.to_vec()).expect("Can't subscribe to specified topics");loop {let start = std::time::Instant::now();match consumer.recv().await {Err(e) => warn!("Kafka error: {}", e),Ok(m) => {info!("Start time is: {:?}", start);let duration = start.elapsed();info!("Time elapsed receiving the message is: {:?}", duration);let payload = match m.payload_view::<str>() {None => "",Some(Ok(s)) => s,Some(Err(e)) => {warn!("Error while deserializing message payload: {:?}", e);""}};println!("pay load is {}", payload);let payload_parts: Vec<&str> = payload.split(" with timestamp ").collect();if payload_parts.len() == 2 {let send_time: u128 = payload_parts[1].parse().unwrap_or(0);let receive_time = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).expect("Time went backwards").as_millis(); // 获取当前时间戳(毫秒)let elapsed_time = receive_time - send_time; // 计算耗时info!("Received time: {}  {}", receive_time, send_time);info!("Elapsed time: {} ms", elapsed_time);}// let send_time: u128 = payload.parse().unwrap_or(1);// println!("send time is {} ", send_time);info!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());if let Some(headers) = m.headers() {for header in headers.iter() {info!("  Header {:#?}: {:?}", header.key, header.value);}}consumer.commit_message(&m, CommitMode::Async).unwrap();}};}
}#[tokio::main]
async fn main() {let matches = App::new("consumer example").version(option_env!("CARGO_PKG_VERSION").unwrap_or("")).about("Simple command line consumer").arg(Arg::with_name("brokers").short("b").long("brokers").help("Broker list in kafka format").takes_value(true).default_value("localhost:9092"),).arg(Arg::with_name("group-id").short("g").long("group-id").help("Consumer group id").takes_value(true).default_value("example_consumer_group_id"),).arg(Arg::with_name("log-conf").long("log-conf").help("Configure the logging format (example: 'rdkafka=trace')").takes_value(true),).arg(Arg::with_name("topics").short("t").long("topics").help("Topic list").takes_value(true).multiple(true).required(true),).get_matches();setup_logger(true, matches.value_of("log-conf"));let (version_n, version_s) = get_rdkafka_version();// info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);let topics = matches.values_of("topics").unwrap().collect::<Vec<&str>>();let brokers = matches.value_of("brokers").unwrap();let group_id = matches.value_of("group-id").unwrap();consume_and_print(brokers, group_id, &topics).await
}

Golang

这里使用的是confluent-kafka-go

producer.go:

package mainimport ("fmt""math/rand""os""strconv""time""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {config := &kafka.ConfigMap{"bootstrap.servers": "localhost:9092","client.id":         "go-producer",}topic := "purchases"p, err := kafka.NewProducer(config)if err != nil {fmt.Printf("Failed to create producer: %s", err)os.Exit(1)}// Go-routine to handle message delivery reports and// possibly other event types (errors, stats, etc)go func() {for e := range p.Events() {switch ev := e.(type) {case *kafka.Message:if ev.TopicPartition.Error != nil {fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)} else {fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n",*ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))}}}}()users := [...]string{"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"}items := [...]string{"book", "alarm clock", "t-shirts", "gift card", "batteries"}for n := 0; n < 50; n++ {key := users[rand.Intn(len(users))]data := items[rand.Intn(len(items))]currentTimeMillis := time.Now().UnixNano() / 1e6data = strconv.FormatInt(currentTimeMillis, 10) // append current time to datap.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Key:            []byte(key),Value:          []byte(data),}, nil)}// Wait for all messages to be deliveredp.Flush(15 * 1000)p.Close()
}

consumer.go

package mainimport ("fmt""os""os/signal""strconv""syscall""time""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// if len(os.Args) != 2 {// 	fmt.Fprintf(os.Stderr, "Usage: %s <config-file-path>\n",// 		os.Args[0])// 	os.Exit(1)// }// configFile := os.Args[1]conf := kafka.ConfigMap{"bootstrap.servers": "localhost:9092","client.id":         "go-producer",}conf["group.id"] = "kafka-go-getting-started"conf["auto.offset.reset"] = "earliest"c, err := kafka.NewConsumer(&conf)if err != nil {fmt.Printf("Failed to create consumer: %s", err)os.Exit(1)}topic := "purchases"err = c.SubscribeTopics([]string{topic}, nil)// Set up a channel for handling Ctrl-C, etcsigchan := make(chan os.Signal, 1)signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)// Process messagesrun := truefor run {select {case sig := <-sigchan:fmt.Printf("Caught signal %v: terminating\n", sig)run = falsedefault:ev, err := c.ReadMessage(100 * time.Millisecond)if err != nil {// Errors are informational and automatically handled by the consumercontinue}fmt.Printf("Consumed event from topic %s: key = %-10s value = %s\n",*ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))// parts := strings.Split(string(ev.Value))// if len(parts) < 2 {// 	fmt.Println("Invalid message format. Skipping.")// 	continue// }// data := parts[0]timestampStr := string(ev.Value)// Convert the string timestamp to an integersentTimestamp, err := strconv.ParseInt(timestampStr, 10, 64)if err != nil {fmt.Println("Could not parse timestamp. Skipping.")continue}receivedTimestamp := time.Now().UnixNano() / 1e6 // current time in millisecondslatency := receivedTimestamp - sentTimestampfmt.Printf("Consumed event from topic %s: key = %-10s  latency = %d ms\n",*ev.TopicPartition.Topic, string(ev.Key), latency)}}c.Close()
}

Python

这里使用的是confluent_kafka

producer.py:

import socket
from typing import Optionalfrom confluent_kafka import Producerclass KafkaProducer:def __init__(self, bootstrap_servers: str, client_id: str) -> None:self.conf = {"bootstrap.servers": bootstrap_servers, "client.id": client_id}self.producer = Producer(self.conf)def acked(self, err: Optional[str], msg: str) -> None:if err is not None:print(f"Failed to deliver message: {msg}: {err}")else:print(f"Message produced: {msg}")def produce(self, topic: str, key: Optional[str] = None, value: Optional[str] = None) -> None:self.producer.produce(topic, key=key, value=value, callback=self.acked)def poll(self, timeout: float) -> None:self.producer.poll(timeout)def flush(self) -> None:self.producer.flush()if __name__ == "__main__":producer = KafkaProducer(bootstrap_servers="kafka:9092", client_id=socket.gethostname())topic = "test"producer.produce(topic, key="key", value="value")producer.poll(1)

consumer.py

import sys
from typing import List, Optionalfrom confluent_kafka import Consumer, KafkaError, KafkaExceptionclass KafkaConsumer:def __init__(self, bootstrap_servers: str, group_id: str, auto_offset_reset: str = 'smallest') -> None:self.conf = {'bootstrap.servers': bootstrap_servers,'group.id': group_id,'auto.offset.reset': auto_offset_reset}self.consumer = Consumer(self.conf)self.running = Truedef basic_consume_loop(self, topics: List[str], timeout: float = 1.0) -> None:try:self.consumer.subscribe(topics)while self.running:msg = self.consumer.poll(timeout=timeout)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:# End of partition eventsys.stderr.write('%% %s [%d] reached end at offset %d\n' %(msg.topic(), msg.partition(), msg.offset()))elif msg.error():raise KafkaException(msg.error())else:print(msg.value().decode('utf-8'))# do something with msg.value()finally:# Close down consumer to commit final offsets.self.consumer.close()def shutdown(self) -> None:self.running = Falseif __name__ == "__main__":consumer = KafkaConsumer(bootstrap_servers="kafka:9092", group_id="foo", auto_offset_reset="smallest")topics = ["test"]consumer.basic_consume_loop(topics)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/70449.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

力扣(LeetCode)算法_C++—— 只出现一次的数字

给你一个 非空 整数数组 nums &#xff0c;除了某个元素只出现一次以外&#xff0c;其余每个元素均出现两次。找出那个只出现了一次的元素。 你必须设计并实现线性时间复杂度的算法来解决此问题&#xff0c;且该算法只使用常量额外空间。 示例 1 &#xff1a; 输入&#xff1…

uni-app之android项目云打包

1&#xff0c;项目根目录&#xff0c;找到mainfest.json&#xff0c;如果appid是空的&#xff0c;需要生成一个appid 2&#xff0c;点击重新获取appid&#xff0c;这个时候需要登录&#xff0c;那就输入账号密码登录下 3&#xff0c;登陆后可以看到获取appid成功 4&#xff0c;…

gitHub添加ssh

gitHub添加ssh 首先你需要有一个github的账户 第一步&#xff1a; 打开终端&#xff0c;输入以下命令&#xff0c;注意“your email”处你自己的邮箱&#xff0c;创建新的ssh ssh-keygen -t ed25519 -C “your email” 第二步&#xff1a;使用ssh登录ssh-agent&#xff0c;终端…

【ES6】require、export和import的用法

在JavaScript中&#xff0c;require、export和import是Node.js的模块系统中的关键字&#xff0c;用于处理模块间的依赖关系。 1、require&#xff1a;这是Node.js中引入模块的方法。当你需要使用其他模块提供的功能时&#xff0c;可以使用require关键字来引入该模块。例如&…

恒运资本:股市板块轮动顺口溜?

股市是一个变化多端的场所&#xff0c;不同的板块会因为不同的方针、商场影响、经济形势等多种原因而有不同的体现。因而&#xff0c;不同时期不同板块的轮动也成为了研究的热门。下面咱们就通过一个顺口溜&#xff0c;来深化了解股市板块轮动&#xff1a; “钢铁、水泥、煤炭…

如何在Ubuntu 20.04|18.04上安装 FreeSwitch

如何在Ubuntu 20.04|18.04上安装FreeSwitch 什么是 FreeSwitch PBX&#xff1f;FreeSwitch PBX 系统有哪些功能&#xff1f;开始部署部署前准备开始安装freeswitch 安装完成错误及问题FAQ常见配置文件及说明修改默认端口&#xff1a;防火墙配置账号密码配置/添加新用户freeswit…

Nginx从安装到使用,反向代理,负载均衡

什么是Nginx&#xff1f; 文章目录 什么是Nginx&#xff1f;1、Nginx概述1.1、Nginx介绍1.2、Nginx下载和安装1.3、Nginx目录结构 2、Nginx命令2.1、查看版本2.2、检查配置文件正确性2.3、启动和停止2.4、重新加载配置文件2.5、环境变量的配置 3、Nginx配置文件结构4、Nginx具体…

【ES】笔记-Class类剖析

Class Class介绍与初体验ES5 通过构造函数实例化对象ES6 通过Class中的constructor实列化对象 Class 静态成员实例对象与函数对象的属性不相通实例对象与函数对象原型上的属性是相通的Class中对于static 标注的对象和方法不属于实列对象&#xff0c;属于类。 ES5构造函数继承Cl…

HTML emoji整理 表情符号

<!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><title>测试</title></head><body><div style"font-size: 50px;">&#128276</div><script>let count 0d…

将Vue项目迁移到微信小程序中

文章目录 一、创建一个Vue.js的应用程序二、构建微信小程序1. 安装微信小程序构建工具2. 在vuejs项目的根目录中创建一个wepy.confgjs文件3. 在vuejs项目的根目录中运行构建 三、错误记录1. 找不到编译器&#xff1a;wepy-compiler-sass 一、创建一个Vue.js的应用程序 使用 Vu…

Linux 系统服务日志查询 journalctl:查询 systemd 日记

journalctl&#xff1a;查询 systemd 日记 systemd 在取代 SUSE Linux Enterprise 12 中的传统 init 脚本时&#xff08;参见第 13 章 “systemd 守护程序”&#xff09;&#xff0c;引入了自身的称为日记的日志记录系统。由于所有系统事件都将写入到日记中&#xff0c;因此&a…

什么是反向代理(Reverse Proxy)?解释反向代理的作用和常见应用。

1、什么是反向代理&#xff08;Reverse Proxy&#xff09;&#xff1f;解释反向代理的作用和常见应用。 反向代理是一种代理服务器模型&#xff0c;它位于客户端和后端服务器之间。它允许将请求转发到后端服务器&#xff0c;并将响应返回给客户端。反向代理的主要作用如下&…

ElementUI浅尝辄止31:Tabs 标签页

选项卡组件&#xff1a;分隔内容上有关联但属于不同类别的数据集合。 常见于网站内容信息分类或app内容信息tab分类 1.如何使用&#xff1f; Tabs 组件提供了选项卡功能&#xff0c;默认选中第一个标签页&#xff0c;你也可以通过 value 属性来指定当前选中的标签页。 <temp…

Geotools对geojson的解析

在 GeoTools 中&#xff0c;对 GeoJSON 的支持是通过一个插件来完成的&#xff0c;用户同样可以在 Maven 的 pom.xml 配置文件中添加下述的依赖。 <dependency><groupId>org.geotools</groupId><artifactId>gt-geojson</artifactId><version&…

3.运行项目

克隆项目 使用安装的git克隆vue2版本的若依项目&#xff0c;博主使用的版本是3.8.6. git clone https://gitee.com/y_project/RuoYi-Vue.git目录结构如下图所示&#xff0c;其中ruoyi-ui是前端的内容&#xff0c;其它均为后端的内容。 配置mysql数据库 在数据库里新建一个…

Android知识点整理

关键点 Activity Fragment 调试应用 处理应用程序配置 Intent 和 Intent 过滤器 会使用Context 后台处理指南 Android 的数据隐私 Android 网络数据安全教程 Android 中的依赖项注入 内容提供程序 Android 内存管理概览 一些重要的库 1.Glide 是一个 Android 上的…

【ARM CoreLink 系列 1 -- CoreLink 系列 产品介绍】

文章目录 ARM CoreLink 介绍ARM CoreLink InterconnectARM CoreLink 处理器外设ARM CoreLink Memory Controllers ARM CoreLink 介绍 ARM的CoreLink系列产品是一套能够进行高效互联的组件和工具&#xff0c;它们用于构建高性能、低功耗的嵌入式和消费电子设备。CoreLink产品系…

vue3组件通信学习笔记

1、Prop 父组件 <template><div class"parent"><h1>我是父元素</h1><Child :msg"msg"></Child></div> </template><script setup> import Child from ./Child.vue let msg ref(我是父组件的数据…

逆向工程-架构真题(二十)

结构化程序设计采用自顶向下、逐步求精及模块化程序设计方法&#xff0c;通过&#xff08;&#xff09;三种基本控制结构可以构造出任何单入口单出口程序。 顺序、选择和嵌套顺序、分支和循环分支、并发和循环跳转、选择和并发 答案&#xff1a;B 解析&#xff1a; 结构化设…

循环(while do...while for)介绍

3.循环 1.while循环 while循环是先判断后执行 while循环一般都会有: 循环初始值, 循环条件 和 循环变量增量(或者减量) 语法: while(表达式){逻辑代码块 }// 计算123...100之和var num 1;//循环初始值var sum 0;//统计结果//循环条件while (num < 100) {console.log(n…