python多进程队列中的队列_python 多进程队列数据处理详解

我就废话不多说了,直接上代码吧!

# -*- coding:utf8 -*-

import paho.mqtt.client as mqtt

from multiprocessing import Process, Queue

import time, random, os

import camera_person_num

MQTTHOST = "172.19.4.4"

MQTTPORT = 1883

mqttClient = mqtt.Client()

q = Queue()

# 连接MQTT服务器

def on_mqtt_connect():

mqttClient.connect(MQTTHOST, MQTTPORT, 60)

mqttClient.loop_start()

# 消息处理函数

def on_message_come(lient, userdata, msg):

# print(msg.topic + ":" + str(msg.payload.decode("utf-8")))

q.put(msg.payload.decode("utf-8")) # 放入队列

print("产生消息", msg.payload.decode("utf-8"))

# 消息处理开启多进程

# p = Process(target=talk, args=("/camera/person/num/result", msg.payload.decode("utf-8")))

# p.start()

def consumer(q, pid):

print("开启消费序列进程", pid)

while True:

msg = q.get()

# p = Process(target=talk, args=("/camera/person/num/result", msg, pid))

# p.start()

talk("/camera/person/num/result", msg, pid)

# subscribe 消息订阅

def on_subscribe():

mqttClient.subscribe("test123", 1) # 主题为"test"

mqttClient.on_message = on_message_come # 消息到来处理函数

# publish 消息发布

def on_publish(topic, msg, qos):

mqttClient.publish(topic, msg, qos);

# 多进程中发布消息需要重新初始化mqttClient

def talk(topic, msg, pid):

cameraPsersonNum = camera_person_num.CameraPsersonNum(msg)

t_max, t_mean, t_min = cameraPsersonNum.personNum()

# time.sleep(20)

print("消费消息", pid, msg)

mqttClient2 = mqtt.Client()

mqttClient2.connect(MQTTHOST, MQTTPORT, 60)

mqttClient2.loop_start()

mqttClient2.publish(topic, '{"max":' + str(t_max) + ',"mean":' + str(t_mean) + ',"min:"' + t_min + '}', 1)

mqttClient2.disconnect()

def main():

on_mqtt_connect()

on_subscribe()

for i in range(1, 3):

c1 = Process(target=consumer, args=(q, i))

c1.start()

while True:

pass

if __name__ == '__main__':

main()

以上这篇python 多进程队列数据处理详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持python博客。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/526475.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UOS nginx

 安装 nginx 软件包;  配置文件名为 ispweb.conf,放置在/etc/nginx/conf.d/目录下;  网站根目录为/mut/crypt(目录不存在需创建);  启用 FastCGI 功能,让 nginx 能够解析 php 请求&a…

mysql数据库重做日志文件_mysql数据库重做日志

{"moduleinfo":{"card_count":[{"count_phone":1,"count":1}],"search_count":[{"count_phone":4,"count":4}]},"card":[{"des":"阿里云数据库专家保驾护航,为用户…

mysql免安装如何改密码_mysql免安装版配置与修改密码的教程

第一步:配置环境变量(我的解压路径:G:\mysql\mysql-5.7.21-winx64 )MYSQL_HOME你解压的路径PATH ;%MYSQL_HOME %\bin;PATH变量是在原来的基础上多添加的,不要把其它的设置给删掉了第二步在解压的目录下添加my.ini 文件(如果已经有了这个文件&#xff0c…

unix 登录mysql_实例分析mysql用户登录。

今天,在学习mysql授权认证时,遇到了一个问题,看下,我是如何分析的:我在数据库内添加了一个帐号:create databases firstdb;grant all on firstdb.* to ‘firstdb’’’ identified by ‘xxxxx’;flush priv…

拉普拉斯时域卷积定理_如何证明频域卷积定理

展开全部设抄IF表示傅立叶逆变换,则因此有袭故频域卷积定2113理5261得证。4102扩展资料频域卷积定理频域卷积定理表明两信号1653在时域的乘积对应于这两个信号傅立叶变换的卷积除以2π。卷积定理揭示了时间域与频率域的对应关系。这一定理对Laplace变换、Z变换、Mel…

suse查看mysql内存使用情况_MySQL 慢查询日志(Slow Query Log)

4、格式化慢查询日志结构化慢查询日志就是把慢查询日志中的重要信息按照便于阅读以及按照特定的排序方式来提取SQL。这种方式有点类似于Oracle中有个tkprof来格式化oracle的trace文件。对于前面的慢查询日志我们使用MySQLdumpslow来提取如下:SUSE11b:~ # mysqldumps…

mysql按照学生分组查询_将student表按照gender字段值进行分组查询,并计算每个分组中有多少名学生_学小易找答案...

【简答题】查询student表中一共有多少条记录【简答题】在department表和employee表之间分别使用where查询、自连接查询【简答题】使用DESC查看学生表和班级表【简答题】在表student和表grade中添加外键约束,建立两个表的关联关系【简答题】在department表和employee表之间使用右…

python 解方程 sympy_用Python和Sympy求解方程并得到数值答案

我正试图用sympy来解决方程,但我想得到一个直接的数字答案 . 我的脚本是这样的:from sympy import *A,B,Vsymbols(A,B,V)eq1Eq(630.26*(V-39.0)*V*(V39)-AB,0)eq2Eq(B,1.36*10**8*(V-39))eq3Eq(A,5.75*10**5*V*(V39.0))solve([eq1,eq2,eq3], [A,B,V], di…

mysql 分析服务_MySQL分析服务器状态_MySQL

概述文章简单介绍了通过一些查询命令分析当前服务器的状态。目录概述获取服务器整体的性能状态SQL操作计数总结步骤获取服务器整体的性能状态首先对一个数据库服务器进行性能优化需要先知道服务器当前主要的性能问题出现在哪里,在这点sql server也是类似&#xff0c…

apache启服务命令_Linux系统重启apache服务命令详解

Linux系统中apache是重要的一个服务,掌握基本操作尤其重要。下面由学习啦小编为大家整理了Linux系统重启apache服务命令详解,希望对大家有帮助!Linux系统重启apache服务命令详解基本的操作方法:本文假设你的apahce安装目录为/usr/local/apach…

python 接口测试 如何写配置文件_python接口自动化测试 - configparser配置文件解析器详细使用...

configparser简介ConfigParser模块已在Python 3中重命名为configparser该模块定义了ConfigParser类。 ConfigParser类实现一种基本的配置文件解析器语言,该语言提供的结构类似于 .ini 文件中的结构ini文件相关知识键值对可用 或者 : 进行分隔section 的名字是区分大…

case是java关键字吗_Java关键字

3. 程序控制语句1) break 跳出,中断break 关键字用于提前退出 for、while 或 do 循环,或者在 switch 语句中用来结束 case 块。break 总是退出最深层的 while、for、do 或 switch 语句。2) continue 继续continue 关键字用来跳转到 for、while 或 do 循环…

java 多线程 临界区_多线程编程的设计模式 临界区模式

临界区模式 Critical Section Pattern 是指在一个共享范围中只让一个线程执行的模式.它是所有其它多线程设计模式的基础,所以我首先来介绍它.把着眼点放在范围上,这个模式叫临界区模式,如果把作眼点放在执行的线程上,这个模式就叫单线程执行模式.首先我们来玩一个钻山洞的游戏,…

java jdom 设置第1行_Java通过jdom操作生成XML文件的实例代码下载

工作需要,要生成xml文件,所以做了个小demo分享一下。看代码吧~ main()里面没什么好说的 该写的都写了public static void main(String[] args) {//调用 DocumentBuilderFactory.newInstance() 方法得到创建 DOM 解析器的工厂DocumentBuilderFactory fact…

java 模拟qq空间登陆_java最新完美实现模拟登录QQ登录QQ空间获取说说

package com.pengliu.config;import com.pengliu.util.http.HttpService;/*** desc: 全局公共属性配置*/public class baseConfig {//项目路径public static String javaProjectPathSystem.getProperty(“user.dir“);//QQ账号public static String qqNum;//QQ密码public stati…

java 对象工厂_Java设计模式之--工厂方式

在分析工厂模式之前,我们先看看普通模式下的Java三层结构。Controller(表现层)-Service(业务层)-Dao(持久层、数据库连接层),基于此三层结构完成JavaWeb相关业务。假设我们要完成的业务是添加员工信息,先创建Dao包,创建EmpDao接口和EmpDaoFac…

mysql 5.7 io 性能 aio_深入理解MySQL的InnoDB引擎

在MySQL中的引擎一文中说了,我们在几乎所有的情况下其实用的都是InnoDB引擎,这里我们就重点再看一下这个引擎,包括他的存储结构,线程模型和数据文件。我们可以通过show engine innodb status \G;(\G只是表示输出结果纵向表格输出)…

mysql 使用场景_MySQLMHA典型使用场景

1 管理节点部署位置1.1. Dedicated Manager server and multiple MySQL (master,slaves) servers 使用专用的管理服务1 管理节点部署位置1.1. Dedicated Manager server and multiple MySQL (master,slaves) servers使用专用的管理服务器管理多组MySQL主从服务器Since MHA Mana…

mysql构建数据立方体_OLAP数据建模工具Workbench的初步使用(数据立方体的建立)

OLAP数据建模工具Workbench的初步使用(数据立方体的建立)概要:1.workbench工具简介2.workbench简单操作(附demo)3.workbench初步使用总结1.workbench工具简介OLAP,(Online Analytical Processing,联机分析处理)。从事相关工作的小伙伴,具体的…

mysql访问60s出现timeout_websocket 每60s报WsHttpUpgradeHandler.timeoutAsync

在后台连接了一个websocket连接,每60s就报如下错误,有知道这是什么情况的吗?2020-09-10 14:22:40 [Catalina-utility-1] ERROR org.apache.coyote.http11.Http11NioProtocol -Error processing async timeoutsjava.util.concurrent.Execution…