Kafka适合什么样的场景?
它可以用于两大类别的应用:
- 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于message queue)
- 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)
Kafka中文文档:http://kafka.apachecn.org/
1,系统环境
a,操作系统 CentOS Linux release 7.6.1810 (Core) 64位,必须确保你的内存是4G以上,双核CPU!否则将无法新建默认命名空间。
b,确保jdk环境已经安装,具体教程请看 CentOS7 shell脚本安装jdk
c,确保Python3和对应的pip已经安装,具体教程请看 CentOS7 源码编译安装Python3.5
2,执行以下命令,安装Kafka并启动
wget https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz # 下载kafka 1.0.0安装包
tar -zxvf kafka_2.11-1.0.0.tgz # 解压安装包
cd kafka_2.11-1.0.0/ # 打开kafka目录
sh bin/zookeeper-server-start.sh -daemon config/zookeeper.properties # 后台运行zookeeper
sh bin/kafka-server-start.sh config/server.properties # 运行kafka服务
出现 “started” 则是启动成功
3,执行以下命令,创建test5的topic
sh bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test5
4,执行以下命令,创建监听test5的消息队列程序
sh bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test5 --from-beginning
5,执行以下命令,建发送test5消息队列的生产者程序
sh bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test5
生产者发送一些消息,回车
看到消费者界面出现生产者的消息
6,Python3安装kafka依赖包,执行命令 “pip3 install kafka”
依赖包安装完成后,创建python3消费者监听程序,kafka_consumer.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('test4', bootstrap_servers=['localhost:9092'])
for msg in consumer:recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)print(recv)
运行 python3 kafka_consumer.py
创建python3生产消息队列程序,kafka_producer.py
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
msg_dict = {"sleep_time": 10,"db_config": {"database": "test_1","host": "xxxx","user": "root","password": "root"},"table": "msg","msg": "Hello World"
}
msg = json.dumps(msg_dict)
producer.send('test4', bytes(msg,'ascii'), partition=0)
producer.close()
运行 “python3 kafka_producer.py”,转到kafka_consumer.py运行界面看到已接收到生产程序发送过来的信息