python监控数据处理应用服务Socket心跳解决方案

1. 概述

从网页、手机App上抓取数据应用服务,涉及到多个系统集成协同工作,依赖工具较多。例如,使用Frida进行代码注入和动态分析,以实现对网络通信的监控和数据捕获。在这样的集成环境中,手机模拟器、手机中应用、消息侦听、数据获取服务等各自独立运行,任何一个环节出现问题,整个流程势必中断。除了必要的数据检验处理外,还需要实时侦听各个独立服务是否存活,并确认流程是否畅通。常用的监控工具往往只能监控进程和端口,无法深入系统内部进行监控,因此,我们采用Socket通讯方式,自主开发监控机制。

具体方案如下:

  1. Pika侦听与心跳机制

    • 手动启动Pika监听器,循环读取消息队列中的消息。
    • 每次读取消息后,调用心跳函数向监控端发送心跳信息。
    • 如果长时间未发送心跳信息(超过预设的时间阈值),则认为该服务已经死掉,此时重启数据应用服务进程和模拟器。
  2. 模拟器数据监控

    • 监控从模拟器端获取的数据流。
    • 如果长时间(超过预设的时间阈值)未成功获取到数据,则认为数据获取过程存在问题,此时同样重启数据应用服务进程和模拟器。

通过上述机制,确保在复杂的集成环境中,各个服务能够稳定运行,一旦出现问题能够及时发现并自动恢复,从而提高整体系统的可靠性和稳定性。
在这里插入图片描述

2. Socket通讯与心跳

2.1. 关于Socket

Python中的socket库是一种用于网络通信的标准库,它提供了丰富的函数和类来创建和管理网络连接

socket库概述

  • 功能:Python的socket库允许开发者创建客户端和服务器端应用程序,实现网络通信。

  • 协议支持:它支持多种协议,包括TCP(面向连接、可靠传输)和UDP(无连接、快速传输)。

  • 操作方式:支持同步和异步通信,其中同步通信是一种阻塞式的方式,而异步通信则不会阻塞程序的其他操作。

基本操作

  • 创建套接字:使用socket.socket()函数来创建一个套接字对象,这是进行网络通信的基础。例如,创建一个TCP套接字可以使用sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

  • 绑定地址:对于服务器而言,需要将套接字与特定的网络接口和端口绑定,使用bind()方法完成此操作。

  • 监听连接:服务器通过调用listen()方法开始监听来自客户端的连接请求。

  • 接受连接:使用accept()方法接受客户端的连接请求,并返回一个新的套接字对象和客户端地址信息。

  • 发送和接收数据:利用send()recv()方法在客户端和服务器之间发送和接收数据。

  • 关闭套接字:通信完成后,使用close()方法关闭套接字以释放资源。

总的来说,Python的socket库为网络编程提供了强大的工具,使得开发者能够轻松地构建各种类型的网络应用程序。无论是简单的TCP或UDP客户端和服务器,还是复杂的网络服务,socket库都能提供必要的支持。

2.2. 监控服务端Socket Server

import socket
from loguru import logger
from time import sleep
from datetime import datetime
import time
import jsonlogger.add("monitor_{time}.log",rotation="1 weeks",  # 1周分隔转日志文件  retention="2 month"  # 保留2个月的日志文件 )def start_server(host='localhost', port=5005):# 启动应用程序# 应用函数()sleep(90)# 创建缓存    station_list = []  poi_list = []      heartbeat_list = [time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())]# 心跳缓存# 创建socket对象with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:# 绑定地址和端口号server_socket.bind((host, port))        # 开始监听传入的连接请求server_socket.listen()logger.info(f"monitor Server listening on {host}:{port}")server_socket.settimeout(3)   # 超时3秒timeout = 0        while True:try:# 接受一个新的连接client_socket, client_address = server_socket.accept()logger.info(f"Connected by {client_address}")with client_socket:while True:# 接收数据data = client_socket.recv(1024)if not data:sleep(1)breakmessage = data.decode('utf-8')print(f"Received from client: {message}")sleep(1)timeout = 0# 监控处理函数monitor(message, station_list, poi_list, heartbeat_list)          except socket.timeout:timeout += 1if timeout*5%10==0:logger.info(f"app is starting, time out {timeout*5}")if timeout*5 > 150:   # 超过2分钟30秒没有启动,重启启动timeout = 0# 应用函数()sleep(60)heartbeat_list = [time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())]                               except Exception as e:logger.error(f"monitor Server error occurred: {e}")break

2.3. 建立心跳线,客户端Socket Client

Socket Client

import socket
import time
from time import sleep
from loguru import loggerdef start_client(message, host='localhost', port=5005):try:# 创建socket对象client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 连接到服务器client_socket.connect((host, port))logger.info(f"Connected to server at {host}:{port}")# 发送数据client_socket.sendall(message.encode())# 接收响应#data = client_socket.recv(1024)#print(f"Received from server: {data.decode()}")except socket.error as e:logger.info(f"Socket error occurred: {e}")        finally:    # 关闭连接client_socket.close()

建立心跳线,Socket Client应用

手动侦听消息,使用Pika库监听RabbitMQ中的消息,并循环读取消息。每次读取到消息时,调用心跳函数向监控端发送消息。如果长时间未发送消息,则认为服务已经死掉,触发重启控制软件和手机的操作。

# 消费消息
def startRabbitMQ():# 1.连接rabbittry:    credentials = pika.PlainCredentials('rabbit', '*****')  # 用户名和密码# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。connection = pika.BlockingConnection(pika.ConnectionParameters('192.*.*.*',port = 55671,virtual_host = '/xxxxx-dev',credentials = credentials))except pika.exceptions.AMQPError as e:logger.error(f"Error connecting to RabbitMQ in main process: {e}")exit(1)   # 建立心跳信息current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())message = json.dumps({"type": "heartbeat", "time": current_time})            start_client(message)try:        channel = connection.channel()time.sleep(1)channel.queue_declare(queue='xxxxx_poi_queue', durable=True)channel.basic_qos(prefetch_count=1)while True:logger.info('取消息开始时间')  method_frame, header_frame, body = channel.basic_get(queue='xxxxx_poi_queue', auto_ack=False)  if method_frame:  # 处理消息体 print('header_frame:',header_frame) logger.info(f'body:,{body}')  _poi = body.decode('utf-8')  # 将 bytes 转换为字符串 current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())datas = {"type":"poiid","time":current_time,"dat":_poi}message = json.dumps(datas)start_client(message)                                                 # 业务应用处理函数()      # 如果你设置了auto_ack=False,则需要手动确认消息  channel.basic_ack(delivery_tag=method_frame.delivery_tag)  else:  logger.info("没有消息可以获取,")  current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())message = json.dumps({"type": "heartbeat", "time": current_time})            start_client(message)time.sleep(10)except pika.exceptions.AMQPError as e:logger.error(f"Error RabbitMQ in process: {e}")exit(1)    

监控应用、工具进程,在此略过,相关技术参见《Python监控服务进程及自启动服务方法与实践》。

3. 遇到的问题

3.1. 阻塞模式

在阻塞模式下,当调用某些socket API(如send、recv等)时,如果操作不能立即完成,调用线程会被挂起,直到操作完成或超时

在Python的socket编程中,settimeout()方法用于设置套接字操作的超时时间。当调用这个方法后,如果在指定的时间内没有完成相应的网络操作(如连接、发送或接收数据),程序将抛出一个socket.timeout异常。

  • 阻塞模式:默认情况下,套接字是阻塞模式的,这意味着如果进行的操作(如accept()recv()等)不能立即完成,程序会一直等待直到操作完成。

  • 非阻塞模式:通过设置超时时间,可以将套接字设置为非阻塞模式。在这种模式下,如果操作不能在指定时间内完成,程序会立即返回并抛出一个socket.timeout异常。

3.2. 数据传递编码与解码

在Python的socket编程中,传递字典类型数据时,通常需要将字典序列化为字符串或字节流进行传输。这是因为socket通信只接收bytes类型数据,而实际传过去的可能是str类型或其他非bytes类型。其中如下是关于字典类型数据的编码与解码的详细解析:

  • 编码:将字典转换为JSON字符串,然后将其编码为字节流进行发送。
import json
import socket...
current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
message = json.dumps({"type": "heartbeat", "time": current_time}) 
...
# 发送数据
client_socket.sendall(message.encode())
  • 解码:接收到字节流后,先将其解码为字符串,再从字符串中解析出字典。
# 接受一个新的连接
client_socket, client_address = server_socket.accept()
logger.info(f"Connected by {client_address}")with client_socket:while True:# 接收数据data = client_socket.recv(1024)if not data:sleep(1)breakmessage = data.decode('utf-8')print(f"Received from client: {message}")

其中,编码使用data.encode,解码使用data.decode

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/890841.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

商品线上个性定制,并实时预览3D定制效果,是如何实现的?

商品线上3D个性化定制的实现涉及多个环节和技术,以下是详细的解释: 一、实现流程 产品3D建模: 是实现3D可视化定制的前提,需要对产品进行三维建模。可通过三维扫描仪或建模师进行建模,将产品的外观、结构、材质等细…

开源 SOAP over UDP

简介 看到有人想要实现两个 EXE 之间的互动。这可以采用 RPC 的方式嘛。 Delphi 现成的 RPC 框架,比如 WebService,比如 DataSnap; 当然,github 上面还有第三方开源的 XMLRPC 等等。 为啥要搞一个 UDP Delphi 的 WebService …

【Laravel】接口的访问频率限制器

Laravel 接口的访问频率,你可以在 Laravel 中使用速率限制器(Rate Limiter)。以下是一个详细的步骤,展示如何为这个特定的 API 路由设置速率限制: 1. 配置 RouteServiceProvider 首先,确保在 App\Provide…

Vue.use()和Vue.component()

当很多页面用到同一个组件,又不想每次都在局部注册时,可以在main.js 中全局注册 Vue.component()一次只能注册一个组件 import CcInput from /components/cc-input.vue Vue.component(CcInput);Vue.use()一次可以注册多个组件 对于自定义的组件&#…

地理数据库Telepg面试内容整理-请描述空间索引的基本概念,如何使用它提高查询性能

空间索引的基本概念 空间索引是专门用于加速空间数据(如地理位置、几何对象等)查询的一种数据结构。空间数据本质上是多维的,包含了坐标、形状、区域等信息,这使得传统的单维索引(如 B+ 树)并不适用。空间索引通过将空间数据映射到特定的索引结构中,使得在进行空间查询时…

Rust : tokio中select!

关于tokio的select宏,有不少的用途。包括超时和竞态选择等。 关于select宏需要关注,相关的异步条件,会同时执行,只是当有一个最早完成时,会执行“抛弃”和“对应”策略。 说明:对本文以下素材的来源表示感…

Python PyMupdf 去除PDF文档中Watermark标识水印

通过PDF阅读或编辑工具,可在PDF中加入Watermark标识的PDF水印,如下图: 该类水印特点 这类型的水印,会在文件的字节流中出现/Watermark、EMC等标识,那么,我们可以通过改变文件字节内容,清理掉…

python EEGPT报错:Cannot cast ufunc ‘clip‘ output from dtype(‘float64‘)

今天在运行EEGPT的时候遇见了下面的问题,首先是nme报错,然后引起了numpy的报错: numpy.core._exceptions._UFuncOutputCastingError: Cannot cast ufunc clip output from dtype(float64)在网上找了好久的教程,但是没有找到。猜测…

旧衣回收小程序开发,绿色生活,便捷回收

随着绿色生活、资源回收利用理念的影响,人们逐渐开始关注旧衣回收,选择将断舍离等闲置衣物进行回收,在资源回收的同时也能够减少资金浪费。目前,旧衣回收的方式也迎来了数字化发展,相比传统的回收方式更加便捷&#xf…

[论文笔记] 从生成到评估:LLM-as-a-judge 的机遇与挑战

https://arxiv.org/pdf/2411.16594 1. LLM-as-a-judge 的引入 传统的评估方法(如 BLEU 和 ROUGE)在处理生成内容的有用性、无害性等细腻属性时表现不足。随着大语言模型(LLM)的发展,提出了 “LLM-as-a-judge”(LLM 作为评估者)的新范式,用于对任务进行评分、排序或选择…

Bluetooth Spec【0】蓝牙核心架构

蓝牙核心系统由一个主机、一个主控制器和零个或多个辅助控制器组成蓝牙BR/ EDR核心系统的最小实现包括了由蓝牙规范定义的四个最低层和相关协议,以及一个公共服务层协议;服务发现协议(SDP)和总体配置文件要求在通用访问配置文件&a…

【C 基础】C语言代码编译过程

从一个源文件(.c)到可执行程序到底经历了哪几步&#xff0c;我想大多数的人都知道&#xff0c;到时到底每一步都做了什么&#xff0c;我估计也没多少人能够说得清清楚楚&#xff0c;明明白白。 其实总的流程是这样的。 【第一步】编辑hello.c 1 #include <stdio.h> 2 …

数据处理之数据规约

数据处理之数据规约 1. 数据规约概述 数据规约是数据处理中的重要方法&#xff0c;旨在让数据处理更简便、高效&#xff0c;以满足业务需求。当从数据仓库获取的数据量庞大时&#xff0c;直接在海量数据上进行分析和挖掘成本颇高。数据规约可得到数据集的归约表示&#xff0c…

vulnhub靶场-matrix-breakout-2-morpheus攻略(截止至获取shell)

扫描出ip为192.168.121.161 访问该ip&#xff0c;发现只是一个静态页面什么也没有 使用dir dirsearch 御剑都只能扫描到/robots.txt /server-status 两个页面&#xff0c;前者提示我们什么也没有&#xff0c;后面两个没有权限访问 扫描端口&#xff0c;存在81端口 访问&#x…

Java - 日志体系_Apache Commons Logging(JCL)日志接口库

文章目录 官网1. 什么是JCL&#xff1f;2. JCL的主要特点3. JCL的核心组件4. JCL的实现机制5. SimpleLog 简介6. CodeExample 1 &#xff1a; 默认日志实现 (JCL 1.3.2版本)Example 2 &#xff1a; JCL (1.2版本&#xff09; Log4J 【安全风险高&#xff0c;请勿使用】 7. 使用…

C++-----------映射

探索 C 中的映射与查找表 在 C 编程中&#xff0c;映射&#xff08;Map&#xff09;和查找表&#xff08;Lookup Table&#xff09;是非常重要的数据结构&#xff0c;它们能够高效地存储和检索数据&#xff0c;帮助我们解决各种实际问题。今天&#xff0c;我们就来深入探讨一下…

免费 IP 归属地接口

免费GEOIP&#xff0c;查询IP信息&#xff0c;支持IPV4 IPV6 ,包含国家地理位置&#xff0c;维度&#xff0c;asm,邮编 等&#xff0c;例如 例如查询1.1.1.1 http://geoip.91hu.top/?ip1.1.1.1 返回json 对象

Linux应用软件编程-多任务处理(进程)

多任务&#xff1a;让系统具备同时处理多个事件的能力。让系统具备并发性能。方法&#xff1a;进程和线程。这里先讲进程。 进程&#xff08;process&#xff09;&#xff1a;正在执行的程序&#xff0c;执行过程中需要消耗内存和CPU。 进程的创建&#xff1a;操作系统在进程创…

认识计算机网络

单单看这一个词语&#xff0c;有熟悉又陌生&#xff0c;让我们来重新认识一下这位大角色——计算机网络。 一、是什么 以及 怎么来的 计算机网络是指将地理位置不同的具有独立功能的多台计算机及其外部设备&#xff0c;通过通信线路和通信设备连接起来&#xff0c;在网络操作…

3. Kafka入门—安装与基本命令

Kafka基础操作 一. 章节简介二. kafka简介三. Kafka安装1. 准备工作2. Zookeeper安装2.1 配置文件2.2 启动相关命令3. Kafka安装3.1 配置文件3.2 启动相关命令-------------------------------------------------------------------------------------------------------------…