linux 获取公网流量 tcpdump + python + C++

前言

需求为,统计linux上得上下行公网流量,常规得命令如iftop 、sar、ifstat、nload等只能获取流量得大小,不能区分公私网,所以需要通过抓取网络包并排除私网段才能拿到公网流量。下面提供了一些有效得解决思路,提供了部分得代码片段,但不提供整个代码内容。

方案

方案一:在Linux服务器上安装服务,抓取到数据包后发送到中央计算服务器

优点:①由于计算是在中央,并不会占用linux业务服务器得资源性能

缺点:①通过scp或其他传输手段,会占用一定得网络资源,特别当数据量大时

②中央计算服务器得资源易达到瓶颈,需要开多个计算节点

方案二:在Linux服务器上安装服务,抓取到数据包后本地处理,直接将结果发至中央

优点:①中央节点压力减小,数据实时性提升

缺点:①每个linux服务器都需要抓包并且处理,需要占用一定的计算资源,且单位包越大消耗越高

方案三:在同层交换机上放置旁路监控服务器,抓取广播域内所有包,逐个分析每个linux服务器上的流量情况

优点:

①独立部署,不会占用linux服务器资源

②维护相对容易,linux业务服务器与监控节点解耦,只需要维护监控节点而不需要维护linux服务器

缺点:

①需要跟linux服务器处在一个广播域,并且要求性能足够,当linux业务服务器数据过多时,易有瓶颈

②可能存在漏抓包的情况

③需要监听所有得vlan,并分别部署监控节点

方案四:加入流控设备,旁路到核心网关

优点:

①无需自研,有成熟的抓取功能,且有更多丰富的功能

缺点:

①成本较高

②缺乏控制能力,比如流量限速,阻断等

下面主要介绍第一和第二种方法得实现

方案一技术实现

先来看技术拓扑图,流程为:

  1. linux服务器开启tcpdump抓包
  2. linux服务器将pcap包发送到流量分析服务器
  3. 流量服务器进行公私网流量拆分,公网白名单过滤
  4. 获取步骤三得结果并传输到中央

下面讲解每一步得实现细节

1.linux服务器开启tcpdump抓包

核心命令为"tcpdump", "-i", interface , f"host {target_ip}", '-s 96',"-w", output_file ,'-p'

-i 接网卡名,host接希望抓取得ip(可不填),-s 96 表示只抓取包头部(可能有得包头超过这个长度,但96依然是个比较适中得值,-w 保存为pcap包,-p 关闭混杂模式(避免监听到广播得其他包使流量过多)

#执行tcpdump抓包
def run_tcpdump(target_ip):INTERVAL = 60interface = getInterface() #获取网卡名timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") #获取开始时间output_file = f"{SAVEPATH}/xxx_{timestamp}.pcap"tcpdump_command = ["tcpdump", "-i", interface , f"host {target_ip}", '-s 96',"-w", output_file ,'-p']try:process = subprocess.Popen(tcpdump_command) #使用subprocess执行shell命令time.sleep(INTERVAL) #定义休眠时间process.terminate()  #结束shell命令process.wait()       #等待子进程退出,确保子进程清理完成except Exception as e:logger.error(f"xxx")

2.linux服务器将pcap包发送到流量分析服务器

这一个步骤没有太多可讲得,使用socket 或requests 或 subprocess(scp)都可以,目的就是将抓取到得包发送到流量分析服务器。

3.流量服务器进行公私网流量拆分,公网白名单过滤

这一步是关键步骤,公私网流量,公网白名单过滤拆分需要一定得计算性能,所以核心代码推荐用C++语言编写,博主测试过同代码情况下,python C++ java得表现能力,C++(或C)性能要远超其他语言,如同体积包,假设占用30%得单核心性能,C++可以做到1%以内。

代码要实现两点:

1.公私网彻底拆分

2.过滤公网白名单内的(不希望统计得)数据

公私网流量拆分参考以下代码片段,首先剔除三类子网地址(0xFF000000 是掩码(255.0.0.0 的二进制形式),只保留 IP 地址的最高 8 位。0x0A000000 是 10.0.0.0 的二进制形式。)

注意:现在得大型网络多采用overlay得方式,有些ip虽然在私网段,但由于走了vxlan隧道占用了公网带宽,其实也是数据公网得。后续得处理需读者自行处理。

bool is_private_ip(const std::string &ip) {struct in_addr addr;inet_pton(AF_INET, ip.c_str(), &addr);uint32_t ip_addr = ntohl(addr.s_addr);// 10.0.0.0/8if ((ip_addr & 0xFF000000) == 0x0A000000) return true;// 172.16.0.0/12if ((ip_addr & 0xFFF00000) == 0xAC100000) return true;// 192.168.0.0/16if ((ip_addr & 0xFFFF0000) == 0xC0A80000) return true;return false;
}

定义4个变量分别存公网下行,公网上行,私网下行,私网上行

    uint64_t public_in= 0;uint64_t public_out = 0;uint64_t private_in = 0;uint64_t private_out = 0;  pcap_t *handle = pcap_open_offline(pcap_file.c_str(), errbuf); #获取pcap包pcap_next_ex(handle, &header, &data)  #调用libpcap 捕获库 const struct ip *ip_header = (struct ip *)(data + sizeof(struct ether_header)); #获取包得headstd::string src_ip = inet_ntoa(ip_header->ip_src); #获取src ipstd::string dst_ip = inet_ntoa(ip_header->ip_dst); #获取dst ip#判断是私网还是公网if (dst_ip == target_ip) {if (is_private_ip(src_ip)) {private_in += pkt_len;} else {public_in += pkt_len;}} else if (src_ip == target_ip) {if (is_private_ip(dst_ip)) {private_out += pkt_len;} else {public_out += pkt_len;}}

接下来是过滤不希望统计的ip名单,在上面代码的基础上再做一层判断即可,

    #判断是私网还是公网if (dst_ip == target_ip) {if (is_private_ip(src_ip)) {private_in += pkt_len;} else {if (filter_write(src_ip)){  #return true则公网ip不在白名单内,需要加和public_in += pkt_len;}}} else if (src_ip == target_ip) {if (is_private_ip(dst_ip)) {private_out += pkt_len;} else {if (filter_write(src_ip)){public_out += pkt_len;}}}

4.获取步骤三得结果上报并持久化

拿到公网流量后,传输给用于持久化的程序进行入库操作(对于此类数据,建议用clickhouse进行建库,对体量小的用mysql也可以接受).关于传输方式,可以选择使用生产消费者方式,使用消息中间件缓冲数据(如Kafka)。数据量小通过http/https协议传输后插入也可以。

kafka模板

def push_kafka(data, retries=5, backoff_factor=1):logger.info(f"上传 {data}")# 配置 Kafka 生产者kafka_config = {'bootstrap.servers': KAFKA_URL,  # 替换为你的 Kafka 地址'client.id': 'my-producer',}producer = Producer(kafka_config)topic = KAFKA_TOPIC  # 替换为你的 Kafka 主题名# 重试逻辑for attempt in range(retries):try:# 将数据转换为 JSON 格式并发送到 Kafkaproducer.produce(topic=topic, value=json.dumps(data), callback=delivery_report)producer.flush()  # 强制刷新缓冲区logger.info("数据上传成功")return  # 成功后返回except KafkaException as e:logger.error(f"Kafka error occurred: {e}")except Exception as e:logger.error(f"Unexpected error: {e}")# 等待后重试wait_time = backoff_factor * (2 ** attempt)logger.info(f"等待 {wait_time} 秒后重试...")time.sleep(wait_time)logger.error("所有重试均失败")

http/https直传

def post_with_retries_center(data, retries=5, backoff_factor=1):URL_CENTER = "192.168.10.10" #填自己的地址logger.info(f"上传{data}")session = requests.Session()headers = {'Content-Type': 'application/json',}#  定义重试策略retry_strategy = Retry(total=retries,status_forcelist=[404, 500, 502, 503, 504],allowed_methods=["POST"],backoff_factor=backoff_factor)#  创建适配器并将其安装到会话对象中adapter = HTTPAdapter(max_retries=retry_strategy)session.mount("http://", adapter)session.mount("https://", adapter)try:response = requests.post(URL_CENTER, json=data, headers=headers)response.raise_for_status()  # 如果响应状态码不是200,则抛出异常except requests.exceptions.ConnectionError as conn_err:logger.error(f"Connection error occurred: {conn_err}")except requests.exceptions.Timeout as timeout_err:logger.error(f"Timeout error occurred: {timeout_err}")except requests.exceptions.RequestException as req_err:logger.error(f"An error occurred: {req_err}")except Exception as e:logger.error(f"Unexpected error: {e}")return None

方案二技术实现

在Linux服务器上安装服务,抓取到数据包后本地处理,直接将结果发至中央

流程为

  1. linux服务器开启tcpdump抓包
  2. linux服务器分析包,获取私网、公网流量结果
  3. linux服务器推送消息中间件
  4. 中央消费者对消息消费并入库

方案二与方案一代码几乎相同,只是职权不同,原linux不需要处理包,现在需要处理,所以处理程序应该在linux服务器上,处理后推送到消息中间件,由中央消费者进行消费即可。代码片段这里就不贴了

结尾

本文章内容适用于基于linux系统的流量包分析,方案本身是完整闭环的。其中对包的处理快慢以及对公、私网的ip段判断、消息推送的方式这些是可以持续优化的点。由于方案本身具备一定商用价值,故没有贴上源码,若读者需要可联系博主提供。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/62982.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Rain后台权限管理系统,快速开发

这段时间一直没有更新,因为在写一个脚手架,今天Rain项目终于完工,已经发布到github,免费使用 项目github链接 https://github.com/Rain-hechang/Rain 简介 前端采用Vue3.x、Element UI。 后端采用Spring Boot、Spring Security、Redis &…

scroll-view组件,在iOS设备上禁用橡皮筋回弹效果

问题描述 在实现uniapp微信小程序的一个项目时,ios真机测试,scroll-view组件可以向下拉动一段距离然后又回弹。 如下图 解决方法: 可以通过设置scroll-view组件的属性来禁用橡皮筋回弹效果。如下,设置enhanced"true&…

【AI系统】昇腾异构计算架构 CANN

昇腾异构计算架构 CANN 本文将介绍昇腾 AI 异构计算架构 CANN(Compute Architecture for Neural Networks),这是一套为高性能神经网络计算需求专门设计和优化的架构。CANN 包括硬件层面的达芬奇架构和软件层面的全栈支持,旨在提供…

csv文件的上传、解析和获得最后的数据

前端和node端解析、读取csv文件的区别 1、前端 运行环境为浏览器,受到浏览器安全策略的限制,例如跨域请求、文件访问权限等。对于大型CSV文件的处理可能会受到性能瓶颈的影响。前端运行在用户的浏览器中,受到浏览器安全策略的限制&#xff…

Python学习------第十五天

1.异常的捕获方式: #基本捕获语法 try:f open("D:/abc.txt","r",encoding"UTF-8") except:print("出现异常了,因为文件不存在,我将open模式改为w模式去打开")f open("D:/abc.txt", &quo…

THENA大涨将对整个DeFi市场产生怎样的影响?

引言 近期,区块链行业的一个热门项目——THENA(THE)代币,在短时间内吸引了大量投资者的目光。THE代币的价格在短短几个月内经历了显著的上涨,引发了市场对其背后机制的浓厚兴趣。而在THENA生态系统的成功背后&#xf…

Kubernetes命名空间详解

目录 目标 版本 官网 概述 namespace(命名空间、名称空间) 注意事项 基本命令 查看namespace列表 查看所有Pod的namespace 查看单个Pod的namespace 查看同一个namespace下的所有Pod 查看单个namespace资源配额 查看单个Pod详情 查看所有na…

【开源】A059-基于SpringBoot的社区养老服务系统的设计与实现

🙊作者简介:在校研究生,拥有计算机专业的研究生开发团队,分享技术代码帮助学生学习,独立完成自己的网站项目。 代码可以查看项目链接获取⬇️,记得注明来意哦~🌹 赠送计算机毕业设计600个选题ex…

【AI系统】算子开发编程语言 Ascend C

算子开发编程语言 Ascend C 本节将深入探讨昇腾算子开发编程语言 Ascend C,这是一种专为昇腾 AI 处理器算子开发设计的编程语言,它原生支持 C 和 C标准规范,最大化匹配用户的开发习惯。Ascend C 通过多层接口抽象、自动并行计算、孪生调试等…

Python基于大数据的微博的舆情分析,机器学习的微博情感分析系统

博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…

WPF+LibVLC开发播放器-LibVLC播放控制

接上一篇&#xff1a; LibVLC在C#中的使用 实现LibVLC播放器播放控制 界面 界面上添加一个Button按钮用于控制播放 <ButtonGrid.Row"1"Width"88"Height"24"Margin"10,0,0,0"HorizontalAlignment"Left"VerticalAlignme…

【设计模式系列】中介者模式(十八)

一、什么是中介者模式 中介者模式&#xff08;Mediator Pattern&#xff09;是一种行为型设计模式&#xff0c;其核心思想是通过一个中介者对象来封装一系列对象之间的交互&#xff0c;使这些对象不需要相互显式引用。中介者模式提供了一个中介层&#xff0c;用以协调各个对象…

【AI系统】Ascend C 编程范式

Ascend C 编程范式 AI 的发展日新月异&#xff0c;AI 系统相关软件的更新迭代也是应接不暇&#xff0c;作为一本讲授理论的作品&#xff0c;我们将尽可能地讨论编程范式背后的原理和思考&#xff0c;而少体现代码实现&#xff0c;以期让读者理解 Ascend C 为何这样设计&#x…

是什么阻断了kafka与zk的链接?

转载说明&#xff1a;如果您喜欢这篇文章并打算转载它&#xff0c;请私信作者取得授权。感谢您喜爱本文&#xff0c;请文明转载&#xff0c;谢谢。 问题描述&#xff1a; 前几天部署一套环境&#xff0c;先把zk集群起来了&#xff0c;之后第二天在启动kafka的时候&#xff0c;…

CentOS 二进制安装部署MongoDB 4.0

一、安装MongoDB 1. 下载 MongoDB 二进制文件 前往 MongoDB 官方下载页面(https://www.mongodb.com/try/download/community) 选择对应版本的 tar 包。 wget https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-4.0.28.tgz 2. 解压并移动至目标目录 解压文件&#xff…

【数据结构】【线性表】特殊的线性表-字符串

目录 字符串的基本概念 字符串的三要素 字符串的基本概念 串的编码 串的实现及基本运算 顺序串的实现 串的静态数组实现 串的动态数组的实现 顺序存储的四种方案 链式串的实现 基本运算 方案三 方案一 字符串的基本概念 数据结构千千万&#xff0c…

Agile VMO分享:海尔案例

海尔集团是全球最大的家电制造商之一&#xff0c;拥有超过76 000名员工。它获得了2018-2019年全球智能家电品牌前10名和2018-2019年全球消费电子品牌前50名的荣誉。 海尔利用价值流结构将自己组织成一些可以自管理的微型企业。这些微型企业拥有决策&#xff0c;设计和交付新产品…

ThinkPHP场景动态验证

一、缘由 今天在用thinkphp8写东西的时候发现&#xff0c;写验证器规则和场景优点费时间&#xff0c;就算用tinkphp的命令行生成也是生成一个空壳。内容还是要自己填写感觉麻烦。 就突发奇想能不能自动生成验证器&#xff0c;也不能是说自动生成验证器&#xff0c;生成验证其的…

限定符使用

正则表达式的元字符一次一般只能匹配一个位置或一个字符,如果想要匹配零个、一个或多个字符时,则需要使用限定符。限定符用于指定允许特定字符或字符集自身重复出现的次数。常用限定符如下: <asp:TextBox [^>]> 正则表达式字符类[^>]匹配除过“>”之外的任何字…

vue3+vite 批量引入组件动态使用

import { ref, reactive, toRaw, markRaw, defineAsyncComponent, onMounted } from vue import type { Component } from vue// vue3vite 批量引入组件动态使用 const modules import.meta.glob<Component>(./details/*.vue) // 明确指定导入的模块类型为Component con…