提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 前言
- 一、can周期任务类构建
- 总结
前言
提示:这里可以添加本文要记录的大概内容:
最近想着有时间实现总线报文收发的动态的配置,今天特记录一下报文周期任务的动态的创建和移除
提示:以下是本篇文章正文内容,下面案例可供参考
一、can周期任务类构建
代码如下(示例):
import threading
import timeclass CycleTask():def __init__(self):self.__task_dict = dict() # 周期任务的标志位字典self.__task_queue = dict() # 周期任务需发送的数据字典,{cycle_time:{msg_obj:data}}self.__lock_dict = dict() # 每个线程对应的线程锁def create_task(self, cycle_time):"""功能:创建周期任务参数1:周期时间"""self.__create_task_thread(cycle_time)def __create_task_thread(self, cycle_time):"""功能:创建周期线程参数1:事件"""task = threading.Thread(target=self.__task, args=(cycle_time,))task.start()def __task(self, cycle_time):"""功能:周期任务函数参数1:周期事件"""lock = threading.Lock()self.__lock_dict[cycle_time] = locktask_flag = Trueself.__task_dict[cycle_time] = task_flagwhile self.__task_dict[cycle_time]:self.__lock_dict[cycle_time].acquire() # 线程上锁for msg, data in self.__task_queue[cycle_time].items():passself.__lock_dict[cycle_time].release() # 线程下锁time.sleep(cycle_time)def cycle_send_data(self, cycle_time, msg, data):"""功能:将报文进行周期发送参数1:周期事件参数2:报文对象参数3:数据列表"""if cycle_time in self.__task_queue:self.__lock_dict[cycle_time].acquire() # 线程上锁self.__task_queue[cycle_time][msg] = data # {cycle_time:{msg_obj:data}}self.__lock_dict[cycle_time].release() # 线程下锁else:self.__task_queue[cycle_time] = dict()self.__task_queue[cycle_time][msg] = datadef cycle_msg_stop(self, cycle_time, msg):"""功能:停止报文周期发送参数1:周期事件参数2:报文对象"""self.__lock_dict[cycle_time].acquire() # 线程上锁del self.__task_queue[cycle_time][msg]self.__lock_dict[cycle_time].release() # 线程下锁if not self.__task_queue: # 当周期任务中没有self.__task_dict[cycle_time] = False
总结
我是一名车载集成测试开发工程师,希望能和志同道合的朋友一起相互学习进步