总结了一下中继引擎(can中继器,TCP总机器)开发实际经验

多路数据进行中继的研究

1.数据中继的概念

数据中继是一种数据传输技术,用于在两个通信设备之间提供数字信号的传输。它利用数字信道传输数据信号,可以提供永久性和半永久性连接的数字数据传输信道。

数据中继的主要作用是提高通信质量和可靠性,同时实现多路复用,即在同一个物理链路上传输多个信号。

在数字通信网络中,数据中继可以用于计算机之间的通信,传送数字化传真、数字话音、数字图像信号或其它数字化信号等。

 简单来说:中继的核心就是数据传输,比如传输简单的基础数据、话音、传真、图像信息等;

最简单就是1到2和2到1的数据交互,如下模型,就是左右之间数据的交互。

         

 

2.中继扩展

 在简单的1<>2工作模型,扩展开来,比如3<>4, 5<>6...

        等成千上万,上百万个job处理时。就是中继引擎

​                             

 如上,左边的方式,适合于生命周期短的方案(倾向于通道互斥);

右边的方式时适合于生命周期长的方案(倾向于通道的信息共享);

3.中继方案1

实现步骤1

最简单就是1到2和2到1的数据交互的模型实现

拆分读写:

        

可以用epoll/select/poll 模型都可以;

                举例如下:

​
#include<stdio.h>
#include<stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include<errno.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/epoll.h>
#define     TTY1       "/dev/tty11"
#define     TTY2       "/dev/tty12"
#define     BUFSIZE     1024
enum {//几种状态STATE_R,STATE_W,STATE_AUTO,STATE_EX,STATE_T
};
struct fsm_st 
{int state;//记录状态int sfd;//源文件int dfd;//目的文件char buf[BUFSIZE];//中间缓冲区int len;//读到的长度int pos;//写的过程如果一次没有写完,记录上次写的位置char* err;//错误信息
};
static void fsm_driver(struct fsm_st* fsm) 
{int ret;switch (fsm->state) {case STATE_R:fsm->len = read(fsm->sfd, fsm->buf, BUFSIZE);if (fsm->len == 0)fsm->state = STATE_T;else if (fsm->len < 0) {if (errno == EAGAIN)fsm->state = STATE_R;else {fsm->err = "read()";fsm->state = STATE_EX;}}else {fsm->pos = 0;fsm->state = STATE_W;}break;case STATE_W:ret = write(fsm->dfd, fsm->buf + fsm->pos, BUFSIZE);if (ret < 0) {if (errno == EAGAIN)fsm->state = STATE_W;else {fsm->err = "write()";fsm->state = STATE_EX;}}else {fsm->pos += ret;fsm->len -= ret;if (fsm->len == 0)fsm->state = STATE_R;//写完了再去读elsefsm->state = STATE_W;//没写完继续写}break;case STATE_EX:perror(fsm->err);fsm->state = STATE_T;break;case STATE_T:/*  do smoething*/break;default:abort();break;}
}
static int max(int a, int b) 
{return a > b ? a : b;
}
static void relay(int fd1, int fd2) 
{struct fsm_st fsm12, fsm21;int epfd;struct epoll_event ev;int fd1_save = fcntl(fd1, F_GETFL);fcntl(fd1, F_SETFL, fd1_save | O_NONBLOCK);		//非阻塞	打开int fd2_save = fcntl(fd2, F_GETFL);fcntl(fd2, F_SETFL, fd2_save | O_NONBLOCK);		//非阻塞	打开//初始状态fsm12.state = STATE_R;fsm12.sfd = fd1;fsm12.dfd = fd2;fsm21.state = STATE_R;fsm21.sfd = fd2;fsm21.dfd = fd1;/*******创建epoll实例:告诉内核这个监听的数目是10*/epfd = epoll_create(10);if (epfd < 0) {perror("epoll_create()");exit(1);}/*控制、设置:注册要监听的事件类型*/ev.events = 0;//暂时不确定监视何种行为 - 位图清0 ev.data.fd = fd1;epoll_ctl(epfd, EPOLL_CTL_ADD, fd1, &ev);//增删改操作:加入fd1-确保系统监视fd1ev.events = 0;//暂时不确定监视何种行为 - 位图清0 ev.data.fd = fd2;epoll_ctl(epfd, EPOLL_CTL_ADD, fd2, &ev);//增删改操作  在循环外添加避免循环中来回操作while (fsm12.state != STATE_T || fsm21.state != STATE_T) {//1.为fd1布置监视任务ev.data.fd = fd1;ev.events = 0;//位图清0if (fsm12.state == STATE_R)   //1->2:说明如果fd1可读ev.events |= EPOLLIN;//读事件if (fsm21.state == STATE_W)   //2->1:说明如果fd1可写ev.events |= EPOLLOUT;//写事件epoll_ctl(epfd, EPOLL_CTL_MOD, fd1, &ev);//增删改操作ev.data.fd = fd2;ev.events = 0;//位图清0if (fsm12.state == STATE_W)//1->2:说明如果fd2可写ev.events |= EPOLLOUT;if (fsm21.state == STATE_R)//2->1:说明如果fd1可读ev.events |= EPOLLIN;//epoll_ctl(epfd, EPOLL_CTL_MOD, fd2, &ev);//增删改操作//2.监视--等待事件发生if (fsm12.state < STATE_AUTO || fsm21.state < STATE_AUTO) {					//epfd组,事件结构体,事件数量-元素个数ev,-1-死等while (epoll_wait(epfd, &ev, 1, -1) < 0) {			 //if (errno == EINTR) //假错{continue;}perror("epoll_wait()");exit(1);}}//3.查看监视结果//监视结果是fd1 且fd1可读if (ev.data.fd == fd1 && ev.events & EPOLLIN || ev.data.fd == fd2 \&& ev.events & EPOLLOUT || fsm12.state > STATE_AUTO)fsm_driver(&fsm12);//如果1可读2可写或者处于EX,T态if (ev.data.fd == fd1 && ev.events & EPOLLOUT || ev.data.fd == fd2 \&& ev.events & EPOLLIN || fsm21.state > STATE_AUTO)//如果2可读或者1可写fsm_driver(&fsm21);}//复原退出fcntl(fd1, F_SETFL, fd1_save);fcntl(fd2, F_SETFL, fd2_save);close(epfd);
}
int main(int argc, char** argv) 
{int fd1, fd2;fd1 = open(TTY1, O_RDWR);//先以阻塞打开(故意先阻塞形式)if (fd1 < 0) {perror("open()");exit(1);}write(fd1, "TTY1\n", 5);fd2 = open(TTY2, O_RDWR | O_NONBLOCK);//非阻塞if (fd2 < 0) {perror("open()");exit(1);}write(fd2, "TTY2\n", 5);relay(fd1, fd2);		//核心代码close(fd2);close(fd1);exit(0);
}​

实现步骤2 将该步骤,扩展

每一个job加入数组进行管理,然后遍历数组,实现管理每一个job任务(下方例子仅作参考,有待对每个job加入epoll)

#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<pthread.h>
#include <unistd.h>
#include<string.h>
#include<fcntl.h>
#include"relayer.h"
#define   BUFSIZE  1024
static struct rel_job_st* rel_job[REL_JOBMAX];
static pthread_mutex_t mut_rel_job = PTHREAD_MUTEX_INITIALIZER;
static pthread_once_t init_once = PTHREAD_ONCE_INIT;
enum {//状态机的几种状态STATE_R,STATE_W,STATE_EX,STATE_T
};
enum {//job的状态STATE_RUNNING = 1,STATE_CANCELED,STATE_OVER
};
//状态机
struct rel_fsm_st {int state;//记录状态机的状态int sfd;//源文件int dfd;//目的文件char buf[BUFSIZE];//中间缓冲区int len;//读到的长度int pos;//写的过程如果一次没有写完,记录上次写的位置char* err;//错误信息int64_t count; //输出字符数量
};
//每一对终端结构体
struct rel_job_st{int fd1;//两个终端int fd2;//该对终端状态STATE_RUNNING,STATE_CANCELED, STATE_OVERint job_state;//两个终端的状态机结构体struct rel_fsm_st fsm12, fsm21;//用来退出复原状态int fd1_save, fd2_save;
};
//状态转移函数
static void fsm_driver(struct rel_fsm_st* fsm) {int ret;switch (fsm->state) {case STATE_R:fsm->len = read(fsm->sfd, fsm->buf, BUFSIZE);if (fsm->len == 0)fsm->state = STATE_T;else if (fsm->len < 0) {if (errno == EAGAIN)fsm->state = STATE_R;else {fsm->err = "read()";fsm->state = STATE_EX;}}else {fsm->pos = 0;fsm->state = STATE_W;}break;case STATE_W:ret = write(fsm->dfd, fsm->buf + fsm->pos, fsm->len);if (ret < 0) {if (errno == EAGAIN)fsm->state = STATE_W;else {fsm->err = "write()";fsm->state = STATE_EX;}}else {fsm->pos += ret;fsm->len -= ret;if (fsm->len == 0)fsm->state = STATE_R;//写完了再去读elsefsm->state = STATE_W;//没写完继续写}break;case STATE_EX:perror(fsm->err);fsm->state = STATE_T;break;case STATE_T:/*  do smoething*/break;default:abort();break;}
}
static void *thr_relayer(void *p) {int i;while (1) {pthread_mutex_lock(&mut_rel_job);//死等for (i = 0; i < REL_JOBMAX; i++) {if (rel_job[i] != NULL) //不断的找到一个任务然后推送执行{if (rel_job[i]->job_state == STATE_RUNNING)//运行态{fsm_driver(&rel_job[i]->fsm12);//先推再判断fsm_driver(&rel_job[i]->fsm21);if (rel_job[i]->fsm12.state == STATE_T && rel_job[i]->fsm21.state == STATE_T)rel_job[i]->job_state = STATE_OVER;}}}pthread_mutex_unlock(&mut_rel_job);}}
static void module_load(void) 
{int err;pthread_t tid_relayer;err = pthread_create(&tid_relayer, NULL, thr_relayer, NULL);if (err) {fprintf(stderr, "pthread_create():%s\n", strerror(err));exit(1);}
}
static int get_free_pos_unlocked() {int i;for (i = 0; i < REL_JOBMAX; i++) {if (rel_job[i] == NULL)return i;}return -1;
}
int rel_addjob(int fd1, int fd2) {struct rel_job_st *me;int pos;pthread_once(&init_once, module_load);//单次调用:pthread_onceme = malloc(sizeof(*me));if (me == NULL)   //空间问题return -ENOMEM;me->fd1 = fd1;me->fd2 = fd2;me->job_state = STATE_RUNNING;//该对终端设置正在运行me->fd1_save = fcntl(me->fd1, F_GETFL);fcntl(me->fd1, F_SETFL, me->fd1_save | O_NONBLOCK);  //非阻塞	打开me->fd2_save = fcntl(me->fd2, F_GETFL);fcntl(me->fd2, F_SETFL, me->fd2_save | O_NONBLOCK);//非阻塞	打开me->fsm12.sfd = me->fd1;me->fsm12.dfd = me->fd2;me->fsm12.state = STATE_R;me->fsm21.sfd = me->fd2;me->fsm21.dfd = me->fd1;me->fsm21.state = STATE_R;pthread_mutex_lock(&mut_rel_job);pos = get_free_pos_unlocked();//临界状态-需要加入互斥锁if (pos < 0) {pthread_mutex_unlock(&mut_rel_job);fcntl(me->fd1, F_SETFL, me->fd1_save);//恢复现场fcntl(me->fd2, F_SETFL, me->fd2_save);free(me);//释放空间return -ENOSPC;}rel_job[pos] = me;pthread_mutex_unlock(&mut_rel_job);return pos;
}
int rel_canceljob(int id);
/*return == 0			成功,指定任务成功取消== -EINVAL	失败,参数非法== -EBUSY    失败,任务早已被取消
*/
int rel_waitjob(int id, struct rel_stat_st*);
int rel_statjob(int id, struct rel_stat_st*);

4.中继方案2

方案2不采用方案1的经验,是因为方案2,在多路io复用的时候,容易形成“群惊”效应。

还有,方案1,是通道排斥,2是信息分享;

因此,采用如下方式:每一个收发都配置一个接收队列,发送直接发给对方的队列装载

                                   

实现步骤1

队列的实现(队列的实现--数量少采用静态数组/数量多动态数组(进程内),进程间的处理反射光hi,采用共享内存+数组方式)

队列实现-例子:数量少采用静态数组(进程内)

/****************************************************
* 函数名:udp_msg_enqueue
* 功能描述:UDP消息入队
* 输入参数:msg_type-消息类型,msg_in_data-消息数据
* 输出参数:
* 返回值:
* 备注:Fhsj YJ
****************************************************/
int udp2_msg_enqueue(const UDP_MSG_STRU *msg_in_data) {if (NULL == msg_in_data) {printf("func:%s : 指针为NULL", __func__);return 0;}// 数据入队并返回结果if (((udp2_msg_queue.rear + 1) % MAX_UDP_MSG_NUM) == udp2_msg_queue.front) {return 0;} else {memset(&udp2_msg_queue.date[udp2_msg_queue.rear], 0, sizeof(UDP_MSG_STRU));memcpy(&udp2_msg_queue.date[udp2_msg_queue.rear], msg_in_data, sizeof(UDP_MSG_STRU));udp2_msg_queue.rear = (udp2_msg_queue.rear + 1) % MAX_UDP_MSG_NUM;return 1;}
}/****************************************************
* 函数名:dsrc_msg_dequeue
* 功能描述:UDP消息出队
* 输入参数:msg_type-消息类型
* 输出参数:
* 返回值:msg_in_data-消息数据
* 备注:Fhsj YJ
****************************************************/
UDP_MSG_STRU *udp2_msg_dequeue(void) {u16 u16obj_num = 0;// 数据出队if (udp2_msg_queue.rear == udp2_msg_queue.front) {return NULL;} else {u16obj_num = udp2_msg_queue.front;udp2_msg_queue.front = (udp2_msg_queue.front + 1) % MAX_UDP_MSG_NUM;return &udp2_msg_queue.date[u16obj_num];}
}/****************************************************
* 函数名:get_udp_msg_num
* 功能描述:UDP消息数量获取
* 输入参数:msg_type-消息类型
* 输出参数:
* 返回值:消息数量
* 备注:Fhsj YJ
****************************************************/
u8 get_udp2_msg_num(void) {u8 msg_num;if (udp2_msg_queue.rear >= udp2_msg_queue.front) {msg_num = udp2_msg_queue.rear - udp2_msg_queue.front;} else {msg_num = udp2_msg_queue.rear + MAX_UDP_MSG_NUM - udp2_msg_queue.front;}return msg_num;
}

 实现步骤2,

每一个fd,对应一个数据处理线程,在线程,在每一个线程收到数据后,进入对应队列

然后再另外一个地方(线程或者进程),出队,处理...

对应代码,已经实现,暂不展示了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/662004.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

详解WebRTC rtc::Thread实现

rtc::Thread介绍 rtc::Thread类不仅仅实现了线程这个执行器&#xff08;比如posix底层调用pthread相关接口创建线程&#xff0c;管理线程等&#xff09;&#xff0c;还包括消息队列&#xff08;message_queue)的实现&#xff0c;rtc::Thread启动后就作为一个永不停止的event l…

《Pandas 简易速速上手小册》第8章:Pandas 高级数据分析技巧(2024 最新版)

文章目录 8.1 使用 apply 和 map 函数8.1.1 基础知识8.1.2 重点案例&#xff1a;客户数据清洗和转换8.1.3 拓展案例一&#xff1a;产品评分调整8.1.4 拓展案例二&#xff1a;地址格式化 8.2 性能优化技巧8.2.1 基础知识8.2.2 重点案例&#xff1a;大型销售数据分析8.2.3 拓展案…

BUUCTF-Real-[Flask]SSTI

目录 漏洞描述 模板注入漏洞如何产生&#xff1f; 漏洞检测 漏洞利用 get flag ​编辑 漏洞描述 Flask框架&#xff08;jinja2&#xff09;服务端模板注入漏洞分析&#xff08;SSTI&#xff09; Flask 是一个 web 框架。也就是说 Flask 为您提供工具、库和技术来允许您构…

基于PSO-BP神经网络的风电功率MATLAB预测程序

微❤关注“电气仔推送”获得资料&#xff08;专享优惠&#xff09; 参考文献 基于风电场运行特性的风电功率预测及应用分析——倪巡天 资源简介 由于自然风具有一定的随机性、不确定性与波动性&#xff0c;这将会使风电场的功率预测受到一定程度的影响&#xff0c;它们之间…

骨传导耳机的工作原理是什么?跟一般的耳机相比有什么特点?

骨传导耳机是利用骨传导技术研发而成一种新型蓝牙耳机&#xff0c;其传声方式很独特&#xff0c;不通过空气传导&#xff0c;而是通过人体骨骼来传递声音。 和传统的耳机相比&#xff0c;在佩戴方式和传声方式上都有所不同。 1、佩戴方式不同 首先就是佩戴方式不同&#xff0…

Spring Boot--08--Mybatis 之Mapper在IDEA中自动注入警告的解决方案

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 问题原因 解决方案方法1&#xff1a;为 Autowired 注解设置required false方法2&#xff1a;用 Resource 替换 Autowired方法3&#xff1a;在Mapper接口上加上Repo…

振动传感器接头MIL-C-5015玻璃烧结插座

振动传感器接头MIL-C-5015玻璃烧结插座产品主要用于加速度传感器&#xff0c;倾角传感器&#xff0c;耐高低温&#xff0c;耐腐蚀&#xff0c;适合使用恶劣环境。适用品牌有&#xff1a;MEGGITT&#xff0c;VibraSens,CTC measurement,BENTLY(本特利&#xff09;等众多品牌可定…

【代码随想录-链表】环形链表 II

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学习,不断总结,共同进步,活到老学到老导航 檀越剑指大厂系列:全面总结 jav…

C++ 入门(五)— 头文件(Header files)

文章目录 头文件的用途使用标准库头文件使用头文件传播前向声明避免将函数或变量定义放在头文件中尖括号与双引号VSCode中添加来自其他目录的头文件 头文件保护标头保护不会阻止标头包含在不同的代码文件中pragma once总结 头文件的用途 C 代码文件&#xff08;扩展名为 .cpp&…

基于Java SSM框架实现教师管理系统项目【项目源码】

基于java的SSM框架实现教师管理系统演示 JAVA简介 Java主要采用CORBA技术和安全模型&#xff0c;可以在互联网应用的数据保护。它还提供了对EJB&#xff08;Enterprise JavaBeans&#xff09;的全面支持&#xff0c;java servlet API&#xff0c;JSP&#xff08;java server p…

网络空间测绘在安全领域的应用(下)

3.漏洞感知 漏洞感知能力在当今的网络安全领域是至关重要的&#xff0c;而其核心技术之一是漏洞验证技术。通过对漏洞信息与产品版本的关联&#xff0c;系统能够更准确地感知漏洞&#xff0c;但仅仅依靠这种数据关联会引发一系列问题。 首先&#xff0c;漏洞是程序中存在的隐…

YOLOv5改进:下采样系列 |一种新颖的基于 Haar 小波的下采样HWD,有效涨点系列

💡💡💡本文独家改进:HWD的核心思想是应用Haar小波变换来降低特征图的空间分辨率,同时保留尽可能多的信息,与传统的下采样方法相比,有效降低信息不确定性。 💡💡💡使用方法:代替原始网络的conv,下采样过程中尽可能包括更多信息,从而提升检测精度。 收录 YO…

Java-File类

目录 前言&#xff1a; 1.File类概述 2.File类属性 3.File类构造方法 4.File类普通方法 前言&#xff1a; 在Java中操作文件主要分为&#xff1a;1.文件系统的操作&#xff08;File类&#xff09;、2.文件内容的操作&#xff08;流对象&#xff09;。 本节主要介绍Java中的…

数据加密算法多样化的安全需求

数据加密算法是信息安全领域中非常重要的一环&#xff0c;它能够确保数据在传输和存储过程中的机密性和完整性。随着技术的发展&#xff0c;数据加密算法也在不断地演进和改进&#xff0c;以满足更为复杂和多样化的安全需求。 数据加密算法的基本原理是使用加密密钥和加密算法对…

【计算机网络】Socket的SO_REUSEADDR选项与TIME_WAIT

SO_REUSEADDR用于设置套接字的地址重用。当一个套接字关闭后&#xff0c;它的端口可能会在一段时间内处于TIME_WAIT状态&#xff0c;此时无法立即再次绑定相同的地址和端口。使用SO_REUSEADDR选项可以允许新的套接字立即绑定到相同的地址和端口&#xff0c;即使之前的套接字仍处…

Vim实战:使用Vim实现图像分类任务(一)

文章目录 摘要安装包安装timm 数据增强Cutout和MixupEMA项目结构编译安装Vim环境环境安装过程安装库文件 计算mean和std生成数据集 摘要 论文&#xff1a;https://arxiv.org/pdf/2401.09417v1.pdf 翻译&#xff1a; 近年来&#xff0c;随着深度学习的发展&#xff0c;视觉模型…

JUC并发编程01——进程,线程(详解),并发和并行

目录 1.进程和线程的概念及对比1.进程概述 2.线程3.对比 2.并行与并发1.并发2.并行 3.线程详解3.1.创建和运行线程3.1.1.Thread3.1.2.Runnable结合Thread 创建线程3.1.3.Callable 3.2线程方法APIrun startsleep yieldjoininterrupt打断线程打断 park终止模式 daemon不推荐使用的…

基于Java SSM框架实现校园快领服务系统项目【项目源码+论文说明】计算机毕业设计

基于java的SSM框架实现校园快领服务系统演示 摘要 随着科学技术的飞速发展&#xff0c;各行各业都在努力与现代先进技术接轨&#xff0c;通过科技手段提高自身的优势&#xff1b;对于校园快领服务系统当然也不能排除在外&#xff0c;随着网络技术的不断成熟&#xff0c;带动了…

骨传导耳机的技术原理是什么?和传统耳机相比有哪些优点?

骨传导耳机通过人体骨骼来传递声音&#xff0c;可以绕过耳道和耳膜直接传达音频到听者的内耳&#xff0c;开放双耳的佩戴方式可以在享受音乐或通话的同时保持对周围环境的感知&#xff0c;这种设计在户外活动或运动等场景下的使用尤为实用&#xff0c;可以避免堵塞耳朵&#xf…

CHS_08.2.3.6_1+生产者-消费者问题

CHS_08.2.3.6_1生产者-消费者问题 问题描述问题分析思考&#xff1a;能否改变相邻P、V操作的顺序&#xff1f;知识回顾 在这个小节中 我们会学习一个经典的进程同步互斥的问题 问题描述 并且尝试用上个小节学习的p v操作 也就是信号量机制来解决这个生产者消费者问题 问题的描…