Installation
curl -sSL https://raw.githubusercontent.com/bitnami/containers/main/bitnami/kafka/docker-compose.yml > docker-compose.yml
docker-compose up -d
Rust
这里使用的是rdkafka
,
producer.rs:
use std::time::Duration;use clap::{App, Arg};
use log::info;use rdkafka::config::ClientConfig;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::get_rdkafka_version;use crate::example_utils::setup_logger;mod example_utils;async fn produce(brokers: &str, topic_name: &str) {let producer: &FutureProducer = &ClientConfig::new().set("bootstrap.servers", brokers).set("message.timeout.ms", "5000").create().expect("Producer creation error");let send_time = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).expect("Time went backwards").as_millis(); // 获取当前时间戳(毫秒)// This loop is non blocking: all messages will be sent one after the other, without waiting// for the results.let futures = (0..5).map(|i| async move {// The send operation on the topic returns a future, which will be// completed once the result or failure from Kafka is received.let delivery_status = producer.send(FutureRecord::to(topic_name).payload(&format!("{}", send_time)).key(&format!("Key {}", i)).headers(OwnedHeaders::new().insert(Header {key: "header_key",value: Some("header_value"),})),Duration::from_secs(0),).await;// This will be executed when the result is received.info!("Delivery status for message {} received", i);delivery_status}).collect::<Vec<_>>();// This loop will wait until all delivery statuses have been received.for future in futures {info!("Future completed. Result: {:?}", future.await);}
}#[tokio::main]
async fn main() {let matches = App::new("producer example").version(option_env!("CARGO_PKG_VERSION").unwrap_or("")).about("Simple command line producer").arg(Arg::with_name("brokers").short("b").long("brokers").help("Broker list in kafka format").takes_value(true).default_value("localhost:9092"),).arg(Arg::with_name("log-conf").long("log-conf").help("Configure the logging format (example: 'rdkafka=trace')").takes_value(true),).arg(Arg::with_name("topic").short("t").long("topic").help("Destination topic").takes_value(true).required(true),).get_matches();setup_logger(true, matches.value_of("log-conf"));let (version_n, version_s) = get_rdkafka_version();info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);let topic = matches.value_of("topic").unwrap();let brokers = matches.value_of("brokers").unwrap();produce(brokers, topic).await;
}
consumer.rs
use clap::{App, Arg};
use log::{info, warn};use rdkafka::client::ClientContext;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance};
use rdkafka::error::KafkaResult;
use rdkafka::message::{Headers, Message};
use rdkafka::topic_partition_list::TopicPartitionList;
use rdkafka::util::get_rdkafka_version;use crate::example_utils::setup_logger;mod example_utils;// A context can be used to change the behavior of producers and consumers by adding callbacks
// that will be executed by librdkafka.
// This particular context sets up custom callbacks to log rebalancing events.
struct CustomContext;impl ClientContext for CustomContext {}impl ConsumerContext for CustomContext {fn pre_rebalance(&self, rebalance: &Rebalance) {info!("Pre rebalance {:?}", rebalance);}fn post_rebalance(&self, rebalance: &Rebalance) {info!("Post rebalance {:?}", rebalance);}fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {info!("Committing offsets: {:?}", result);}
}// A type alias with your custom consumer can be created for convenience.
type LoggingConsumer = StreamConsumer<CustomContext>;async fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {let context = CustomContext;let consumer: LoggingConsumer = ClientConfig::new().set("group.id", group_id).set("bootstrap.servers", brokers).set("enable.partition.eof", "false").set("session.timeout.ms", "6000").set("enable.auto.commit", "true")//.set("statistics.interval.ms", "30000")//.set("auto.offset.reset", "smallest").set_log_level(RDKafkaLogLevel::Debug).create_with_context(context).expect("Consumer creation failed");consumer.subscribe(&topics.to_vec()).expect("Can't subscribe to specified topics");loop {let start = std::time::Instant::now();match consumer.recv().await {Err(e) => warn!("Kafka error: {}", e),Ok(m) => {info!("Start time is: {:?}", start);let duration = start.elapsed();info!("Time elapsed receiving the message is: {:?}", duration);let payload = match m.payload_view::<str>() {None => "",Some(Ok(s)) => s,Some(Err(e)) => {warn!("Error while deserializing message payload: {:?}", e);""}};println!("pay load is {}", payload);let payload_parts: Vec<&str> = payload.split(" with timestamp ").collect();if payload_parts.len() == 2 {let send_time: u128 = payload_parts[1].parse().unwrap_or(0);let receive_time = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).expect("Time went backwards").as_millis(); // 获取当前时间戳(毫秒)let elapsed_time = receive_time - send_time; // 计算耗时info!("Received time: {} {}", receive_time, send_time);info!("Elapsed time: {} ms", elapsed_time);}// let send_time: u128 = payload.parse().unwrap_or(1);// println!("send time is {} ", send_time);info!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());if let Some(headers) = m.headers() {for header in headers.iter() {info!(" Header {:#?}: {:?}", header.key, header.value);}}consumer.commit_message(&m, CommitMode::Async).unwrap();}};}
}#[tokio::main]
async fn main() {let matches = App::new("consumer example").version(option_env!("CARGO_PKG_VERSION").unwrap_or("")).about("Simple command line consumer").arg(Arg::with_name("brokers").short("b").long("brokers").help("Broker list in kafka format").takes_value(true).default_value("localhost:9092"),).arg(Arg::with_name("group-id").short("g").long("group-id").help("Consumer group id").takes_value(true).default_value("example_consumer_group_id"),).arg(Arg::with_name("log-conf").long("log-conf").help("Configure the logging format (example: 'rdkafka=trace')").takes_value(true),).arg(Arg::with_name("topics").short("t").long("topics").help("Topic list").takes_value(true).multiple(true).required(true),).get_matches();setup_logger(true, matches.value_of("log-conf"));let (version_n, version_s) = get_rdkafka_version();// info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);let topics = matches.values_of("topics").unwrap().collect::<Vec<&str>>();let brokers = matches.value_of("brokers").unwrap();let group_id = matches.value_of("group-id").unwrap();consume_and_print(brokers, group_id, &topics).await
}
Golang
这里使用的是confluent-kafka-go
producer.go:
package mainimport ("fmt""math/rand""os""strconv""time""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {config := &kafka.ConfigMap{"bootstrap.servers": "localhost:9092","client.id": "go-producer",}topic := "purchases"p, err := kafka.NewProducer(config)if err != nil {fmt.Printf("Failed to create producer: %s", err)os.Exit(1)}// Go-routine to handle message delivery reports and// possibly other event types (errors, stats, etc)go func() {for e := range p.Events() {switch ev := e.(type) {case *kafka.Message:if ev.TopicPartition.Error != nil {fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)} else {fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n",*ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))}}}}()users := [...]string{"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"}items := [...]string{"book", "alarm clock", "t-shirts", "gift card", "batteries"}for n := 0; n < 50; n++ {key := users[rand.Intn(len(users))]data := items[rand.Intn(len(items))]currentTimeMillis := time.Now().UnixNano() / 1e6data = strconv.FormatInt(currentTimeMillis, 10) // append current time to datap.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Key: []byte(key),Value: []byte(data),}, nil)}// Wait for all messages to be deliveredp.Flush(15 * 1000)p.Close()
}
consumer.go
package mainimport ("fmt""os""os/signal""strconv""syscall""time""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// if len(os.Args) != 2 {// fmt.Fprintf(os.Stderr, "Usage: %s <config-file-path>\n",// os.Args[0])// os.Exit(1)// }// configFile := os.Args[1]conf := kafka.ConfigMap{"bootstrap.servers": "localhost:9092","client.id": "go-producer",}conf["group.id"] = "kafka-go-getting-started"conf["auto.offset.reset"] = "earliest"c, err := kafka.NewConsumer(&conf)if err != nil {fmt.Printf("Failed to create consumer: %s", err)os.Exit(1)}topic := "purchases"err = c.SubscribeTopics([]string{topic}, nil)// Set up a channel for handling Ctrl-C, etcsigchan := make(chan os.Signal, 1)signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)// Process messagesrun := truefor run {select {case sig := <-sigchan:fmt.Printf("Caught signal %v: terminating\n", sig)run = falsedefault:ev, err := c.ReadMessage(100 * time.Millisecond)if err != nil {// Errors are informational and automatically handled by the consumercontinue}fmt.Printf("Consumed event from topic %s: key = %-10s value = %s\n",*ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))// parts := strings.Split(string(ev.Value))// if len(parts) < 2 {// fmt.Println("Invalid message format. Skipping.")// continue// }// data := parts[0]timestampStr := string(ev.Value)// Convert the string timestamp to an integersentTimestamp, err := strconv.ParseInt(timestampStr, 10, 64)if err != nil {fmt.Println("Could not parse timestamp. Skipping.")continue}receivedTimestamp := time.Now().UnixNano() / 1e6 // current time in millisecondslatency := receivedTimestamp - sentTimestampfmt.Printf("Consumed event from topic %s: key = %-10s latency = %d ms\n",*ev.TopicPartition.Topic, string(ev.Key), latency)}}c.Close()
}
Python
这里使用的是confluent_kafka
producer.py:
import socket
from typing import Optionalfrom confluent_kafka import Producerclass KafkaProducer:def __init__(self, bootstrap_servers: str, client_id: str) -> None:self.conf = {"bootstrap.servers": bootstrap_servers, "client.id": client_id}self.producer = Producer(self.conf)def acked(self, err: Optional[str], msg: str) -> None:if err is not None:print(f"Failed to deliver message: {msg}: {err}")else:print(f"Message produced: {msg}")def produce(self, topic: str, key: Optional[str] = None, value: Optional[str] = None) -> None:self.producer.produce(topic, key=key, value=value, callback=self.acked)def poll(self, timeout: float) -> None:self.producer.poll(timeout)def flush(self) -> None:self.producer.flush()if __name__ == "__main__":producer = KafkaProducer(bootstrap_servers="kafka:9092", client_id=socket.gethostname())topic = "test"producer.produce(topic, key="key", value="value")producer.poll(1)
consumer.py
import sys
from typing import List, Optionalfrom confluent_kafka import Consumer, KafkaError, KafkaExceptionclass KafkaConsumer:def __init__(self, bootstrap_servers: str, group_id: str, auto_offset_reset: str = 'smallest') -> None:self.conf = {'bootstrap.servers': bootstrap_servers,'group.id': group_id,'auto.offset.reset': auto_offset_reset}self.consumer = Consumer(self.conf)self.running = Truedef basic_consume_loop(self, topics: List[str], timeout: float = 1.0) -> None:try:self.consumer.subscribe(topics)while self.running:msg = self.consumer.poll(timeout=timeout)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:# End of partition eventsys.stderr.write('%% %s [%d] reached end at offset %d\n' %(msg.topic(), msg.partition(), msg.offset()))elif msg.error():raise KafkaException(msg.error())else:print(msg.value().decode('utf-8'))# do something with msg.value()finally:# Close down consumer to commit final offsets.self.consumer.close()def shutdown(self) -> None:self.running = Falseif __name__ == "__main__":consumer = KafkaConsumer(bootstrap_servers="kafka:9092", group_id="foo", auto_offset_reset="smallest")topics = ["test"]consumer.basic_consume_loop(topics)