在安装完kafka(Docker安装kafka_docker 部署kafka-CSDN博客),查看容器是否启动:
docker ps | grep -E 'kafka|zookeeper'
再用python开启服务
from fastapi import FastAPI, Request
from kafka import KafkaProducer
import kafka
import json
import logging
from datetime import datetime# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s')# 初始化 FastAPI 应用
app = FastAPI()# 示例博客文章数据
blog_posts = [{"id": 1, "title": "First Post", "content": "This is the first post."},{"id": 2, "title": "Second Post", "content": "This is the second post."}
]def produce_view_event(ip_address, post_id):"""生成博客文章的查看事件。参数:ip_address (str): 查看者的 IP 地址。post_id (int): 被查看的文章 ID。"""logging.info(f"生成查看事件,文章 ID: {post_id},IP 地址: {ip_address}")try:# 初始化 Kafka 生产者producer = KafkaProducer(bootstrap_servers='110.40.130.231:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))# 准备发送到 Kafka 的消息message = {"ip_address": ip_address,"post_id": post_id,"event_type": "view"}logging.info(f"发送消息到 Kafka: {message}")future = producer.send('blog_views', value=message)try:# 等待消息成功发送record_metadata = future.get(timeout=10)logging.info(f"消息发送成功。主题: {record_metadata.topic}, 分区: {record_metadata.partition}, 偏移量: {record_metadata.offset}")except Exception as e:logging.error(f"发送消息失败: {e}")# 确保所有消息已发送并关闭生产者producer.flush()producer.close()# 将查看事件打印到控制台print_view_event(ip_address, post_id)except kafka.errors.NoBrokersAvailable as e:logging.error(f"没有可用的 Broker: {e}")def print_view_event(ip_address, post_id):"""打印博客文章的查看事件。参数:ip_address (str): 查看者的 IP 地址。post_id (int): 被查看的文章 ID。"""event_type = "view"created_at = datetime.now().isoformat()print(f"View Event - IP Address: {ip_address}, Post ID: {post_id}, Event Type: {event_type}, Created At: {created_at}")@app.get("/posts/{post_id}")
def get_post(post_id: int, request: Request):"""根据 ID 获取博客文章。参数:post_id (int): 博客文章的 ID。request (Request): 进来的请求对象。返回:dict: 如果找到文章则返回文章,否则返回错误信息。"""logging.info(f"收到请求,文章 ID: {post_id}")for post in blog_posts:if post["id"] == post_id:logging.info(f"找到文章: {post}")produce_view_event(request.client.host, post_id)return postreturn {"error": "文章未找到"}if __name__ == "__main__":import uvicornimport os# 获取当前文件名(不带扩展名)供 UVicorn 使用app_modeel_name = os.path.basename(__file__).replace(".py", "")print(app_modeel_name)# 使用 UVicorn 运行 FastAPI 应用uvicorn.run(f"{app_modeel_name}:app", host='0.0.0.0', port=1213, reload=True)
访问:http://110.40.130.231:1213/posts/1