Kafka与MySQL的组合使用

  1. 根据上面给出的student表,编写Python程序完成如下操作:

(1)读取student表的数据内容,将其转为JSON格式,发送给Kafka;

创建Student表的SQL语句如下:

create table student(
sno char(5),
sname char(10),
ssex char(2),
sage int
);

向student表中插入两条记录的SQL语句如下:

insert into student values(‘95001’,’John’,’M’,23);
insert into student values(‘95002’,’Tom’,’M’,23);

 启动zookeeper和kafka的服务

编写一个生产者程序mysql_producer.py:

from kafka import KafkaProducer
import json
import pymysql.cursorsproducer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8'))connect=pymysql.Connect(host='localhost',port=3306,user='root',passwd='123456',db='zhangna',charset='utf8'
)
cursor=connect.cursor()
sql="select sno,sname,ssex,sage from student;"
cursor.execute(sql)
data=cursor.fetchall()
connect.commit()for message in data:zn={}zn['sno']=message[0]zn['sname']=message[1]zn['sex']=message[2]zn['age']=message[3]producer.send('mysql_topic',zn)connect.close()
producer.close()

(2)再从Kafka中获取到JSON格式数据,打印出来;

编写一个消费者程序mysql_consumer.py:

from kafka import KafkaConsumer
import json
import pymysql.cursorsconsumer = KafkaConsumer('mysql_topic',bootstrap_servers=['localhost:9092'],group_id=None,auto_offset_reset='earliest')
for msg in consumer:msg1=str(msg.value,encoding="utf-8")data=json.loads(msg1)print(data)

终于出来了,出错的原因是encoding,我写成了encodings的缘故

为什么我会出现两条重复记录,原因是我生产者程序运行了多次,生产者多运行一次,消费者程序就会多一次查询

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/113012.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计模式-综合应用(一)

介绍 使用jQuery做一个模拟购物车的示例 用到的设计模式 工厂模式 单例模式装饰器模式 观察者模式状态模式 模板方法模式 代理模式 UML类图

ChatGPT(1):ChatGPT初识

1 ChatGPT原理 ChatGPT 是基于 GPT-3.5 架构的一个大型语言模型,它的工作原理涵盖了深度学习和自然语言处理技术。以下是 ChatGPT 的工作原理的一些关键要点: 神经网络架构:ChatGPT 的核心是一个深度神经网络,采用了变种的 Tran…

网络协议--ICMP:Internet控制报文协议

6.1 引言 ICMP经常被认为是IP层的一个组成部分。它传递差错报文以及其他需要注意的信息。ICMP报文通常被IP层或更高层协议(TCP或UDP)使用。一些ICMP报文把差错报文返回给用户进程。 ICMP报文是在IP数据报内部被传输的,如图6-1所示。 ICMP…

GRASP 、SOLID 与 GoF 设计模式

一、GRASP GRASP:通用职责分配软件设计模式(General Responsibility Assignment Software Patterns),其主要思想是基于单一职责设计软件对象。 思考软件对象设计以及大型构件的流行方式是,考虑其职责、角色和协作。这是被称为职责驱动设计&a…

基于CNN实现谣言检测 - python 深度学习 机器学习 计算机竞赛

文章目录 1 前言1.1 背景 2 数据集3 实现过程4 CNN网络实现5 模型训练部分6 模型评估7 预测结果8 最后 1 前言 🔥 优质竞赛项目系列,今天要分享的是 基于CNN实现谣言检测 该项目较为新颖,适合作为竞赛课题方向,学长非常推荐&am…

高速DSP系统设计参考指南(七)电磁干扰基础

(七)电磁干扰基础 1.概述2.EMI概述3.数字信号4.电流环路5.电源6.传输线7.电源层和地层8. 减少电磁干扰经验法则9.总结 1.概述 高速DSP系统中的辐射是由通过印刷电路板走线传播的快速开关电流和电压引起的。随着DSP速度的提高,印刷电路板走线…

C++ 字符串编码转换封装函数,UTF-8编码与本地编码互转

简介 字符串编码转换封装函数,UTF-8编码与本地编码互转。 中文乱码的解决方法 有时候我们会遇到乱码的字符串,比如: 古文码 可能是用GBK方式读取UTF-8编码的中文导致的,用下面的Utf8ToLocal(string str)函数转换一下就可以了。…

017 基于Spring Boot的食堂管理系统

部分代码地址: https://github.com/XinChennn/xc017-stglxt 基于Spring Boot的食堂管理系统 项目介绍 本项目是基于Java的管理系统。采用前后端分离开发。前端基于bootstrap框架实现,后端使用Java语言开发,技术栈包括但不限于SpringBoot、…

Filter与Listener(过滤器与监听器)

1.Filter 1.过滤器概述 过滤器——Filter,它是JavaWeb三大组件之一。另外两个是Servlet和Listener 它可以对web应用中的所有资源进行拦截,并且在拦截之后进行一些特殊的操作 在程序中访问服务器资源时,当一个请求到来,服务器首…

unity操作_碰撞器 c#

碰撞器Collider 在场景中选择一个物体Cube 观察检查器Inspector 自带Cube会默认挂载盒子碰撞器Box Colilider 增加组件可以增加更多中碰撞器 Edit Collider 编辑碰撞器形状 Is Trigger选项 Is Trigger :是否是触发器,如果启用此属性 则该碰撞体将用于触…

数据挖掘(6)聚类分析

一、什么是聚类分析 1.1概述 无指导的,数据集中类别未知类的特征: 类不是事先给定的,而是根据数据的相似性、距离划分的聚类的数目和结构都没有事先假定。挖掘有价值的客户: 找到客户的黄金客户ATM的安装位置 1.2区别 二、距离和相似系数 …

UITesting 界面测试

1. 创建界面测试视图 UITestingBootcampView.swift import SwiftUI/// 界面测试 ViewModel class UITestingBootcampViewModel: ObservableObject{let placeholderText: String "Add name here..."Published var textFiledText: String ""Published var…

2023年中国一次性医用内窥镜市场发展现状分析:相关产品进入上市高峰期[图]

基于对减少交叉感染风险和维护成本的需求等因素,一种新兴的、耗材化的一次性内窥镜可以避免因重复使用产品而导致的感染问题和高额的清洗消毒费用,从而提高患者的安全性并帮助医疗机构节省运营成本。 一次性和可重复使用医用内窥镜特点对比 资料来源&am…

Android 指定有线网或Wifi进行网络请求

Android 指定有线网或Wifi进行网络请求 文章目录 Android 指定有线网或Wifi进行网络请求一、前言:二、指定网络通讯测试1、 窗口命令 ping -I 网络节点 IP2、Java 代码指定特定网络通讯 三、指定特定网络的demo app 开发1、效果图:2、实际测试结果说明&a…

Nginx负载均衡反向代理动静分离

文章目录 nginx负载均衡&反向代理&动静分离环境说明部署动静分离1.主机lnmp部署一个动态页面,在此以discuz论坛系统为例2.主机n1部署两个静态页面访问动、静态页面 配置负载均衡配置反向代理访问测试 nginx负载均衡&反向代理&动静分离 环境 主机名…

【LINUX】1-移植NXP提供的源码

一、在Linux中添加自己的开发板 defconfig配置文件:一个就是imx6ull_alientek_emmc_defconfig默认配置文件 # 复制一份NXP 官方的SDK cd arch/arm/configs cp imx_v7_mfg_defconfig imx_alientek_emmc_defconfig 设备树:imx6ull-alientek-emmc.d…

【Arduino32】PWM控制直流电机速度

硬件准备 震动传感器:1个 红黄绿LED灯:各一个 旋钮电位器:1个 直流电机:1个 1K电阻:1个 220欧电阻:3个 杜邦线:若干 硬件连线 软件程序 const int analogInPin A0;//PWM输入引脚 const…

SpringCloud链路追踪——Spring Cloud Sleuth 和 Zipkin 介绍 Windows 下使用初步

前言 在微服务中,随着服务越来越多,对调用链的分析越来越复杂。如何能够分析调用链,定位微服务中的调用瓶颈,并对其进行解决。 本篇博客介绍springCloud中用到的链路追踪的组件,Spring Cloud Sleuth和Zipkin&#xf…

使用 PyAudio、语音识别、pyttsx3 和 SerpApi 构建简单的基于 CLI 的语音助手

德米特里祖布☀️ 一、介绍 正如您从标题中看到的,这是一个演示项目,显示了一个非常基本的语音助手脚本,可以根据 Google 搜索结果在终端中回答您的问题。 您可以在 GitHub 存储库中找到完整代码:dimitryzub/serpapi-demo-project…

Git的安装

前置 知道自己电脑上跑的是什么系统 查看电脑位数 省事的一种办法 Windows 在cmd中输入如下命令 wmic os get osarchitecture看命令结果即可 省事的一种办法 Linux 直接在终端中输入如下命令 uname -m若结果是x86_64就是64位的,反之32位 图形化的办法 Wind…