【告警自动化处置脚本】

【告警自动化处置脚本0】

以下是一个Python脚本,用于定时查询历史告警,找到攻击结果为成功的告警,并根据告警结果查询威胁情报。如果是恶意IP,则调用防火墙进行封禁,并记录封禁动作的日志。

首先,确保已经安装了pymysqlrequests库。如果没有,请使用以下命令安装:

pip install pymysql requests

接下来,创建一个名为block_malicious_ips.py的脚本,并将以下代码复制到其中:

# 导入所需库
import pymysql  # 用于与MySQL数据库进行交互
import requests  # 用于发送HTTP请求
import json  # 用于处理JSON数据
import subprocess  # 用于执行系统命令
import time  # 用于获取当前时间# 数据库配置
db_config = {'host': 'localhost',  # 数据库主机名'user': 'root',  # 数据库用户名'password': 'your_password',  # 数据库密码'db': 'your_database',  # 数据库名称'charset': 'utf8mb4',  # 数据库字符集
}# 微步威胁情报API配置
threat_intelligence_api_key = 'your_api_key'  # 微步威胁情报API密钥
threat_intelligence_url = 'https://api.example.com/v1/ip/query'  # 微步威胁情报API的URL# 防火墙配置
firewall_cmd = 'iptables'  # 防火墙命令,这里使用iptables作为示例# 日志文件
log_file = 'block_malicious_ips.log'  # 日志文件名# 定义查询成功攻击的函数
def query_successful_attacks():connection = pymysql.connect(**db_config)  # 连接到MySQL数据库try:with connection.cursor() as cursor:  # 创建一个数据库游标sql = "SELECT * FROM `alerts` WHERE `attack_result` = 'success'"  # 查询成功攻击的SQL语句cursor.execute(sql)  # 执行SQL查询return cursor.fetchall()  # 返回查询结果finally:connection.close()  # 关闭数据库连接# 定义查询威胁情报的函数
def query_threat_intelligence(ip):headers = {'Authorization': f'Bearer {threat_intelligence_api_key}',  # 设置API密钥}response = requests.get(threat_intelligence_url, headers=headers, params={'ip': ip})  # 发送GET请求查询威胁情报return response.json()  # 返回JSON格式的响应数据# 定义封禁IP的函数
def block_ip(ip):cmd = f'{firewall_cmd} -A INPUT -s {ip} -j DROP'  # 构造防火墙命令subprocess.run(cmd, shell=True, check=True)  # 执行防火墙命令# 定义记录封禁IP日志的函数
def log_blocked_ip(ip):with open(log_file, 'a') as f:  # 以追加模式打开日志文件f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} Blocked IP: {ip}\n")  # 写入封禁IP的日志# 主函数
def main():successful_attacks = query_successful_attacks()  # 查询成功攻击for attack in successful_attacks:  # 遍历成功攻击ip = attack['source_ip']  # 获取攻击源IPthreat_info = query_threat_intelligence(ip)  # 查询威胁情报if threat_info.get('malicious'):  # 判断是否为恶意IPblock_ip(ip)  # 封禁恶意IPlog_blocked_ip(ip)  # 记录封禁日志# 当脚本作为主程序运行时,执行主函数
if __name__ == '__main__':main()

在上述脚本中,需要替换以下几个变量:

  • db_config:数据库配置,包括主机名、用户名、密码和数据库名称。
  • threat_intelligence_api_key:微步威胁情报API密钥。
  • threat_intelligence_url:微步威胁情报API的URL。
  • firewall_cmd:防火墙命令,这里使用iptables作为示例。

接下来,设置一个Linux计划任务,定期运行此脚本。首先,打开终端并运行以下命令:

crontab -e

在打开的编辑器中,添加以下行以每小时运行一次脚本:

0 * * * * /usr/bin/python3 /path/to/block_malicious_ips.py

保存并退出编辑器。现在,脚本将每小时运行一次,自动封禁恶意IP并记录封禁日志。

关于Linux计划任务(Cron)的详细解释:

Cron是Linux系统中用于定期执行任务的工具。它允许用户在特定的日期和时间自动执行命令或者脚本。

在这个例子中,我们使用crontab -e命令来编辑当前用户的Cron配置。crontab命令用于管理用户的Cron表,-e选项表示编辑模式。

编辑器中的每一行都代表一个计划任务。每行的格式如下:

* * * * * /path/to/command arg1 arg2
| | | | |
| | | | ----- Day of week (0 - 7) (Sunday = 0 or 7)
| | | ------- Month (1 - 12)
| | --------- Day of month (1 - 31)
| ----------- Hour (0 - 23)
------------- Minute (0 - 59)

在这个例子中,我们将添加以下行以每小时运行一次脚本:

0 * * * * /usr/bin/python3 /path/to/block_malicious_ips.py

这里的0 * * * *表示每小时的第0分钟执行任务,也就是每小时运行一次。/usr/bin/python3是Python 3解释器的路径,/path/to/block_malicious_ips.py是脚本的路径。请确保将其替换为实际的路径。

保存并退出编辑器后,Cron将自动加载新的配置,并按照计划执行任务。这样,脚本将每小时运行一次,自动封禁恶意IP并记录封禁日志。

【告警自动化处置脚本1】

这是一个Python脚本,使用了schedule库来实现定时任务,requests库来发送HTTP请求。请确保已安装这两个库:

pip install schedule requests
import schedule
import time
import requests# 定义查询历史告警的函数
def get_historical_alerts():# 这里替换为实际的查询历史告警的API地址url = "https://your-alert-api.example.com/alerts"response = requests.get(url)alerts = response.json()return alerts# 定义根据告警结果查询威胁情报的函数
def get_threat_intelligence(alert):# 这里替换为实际的查询威胁情报的API地址url = "https://your-threat-intelligence-api.example.com/intelligence"response = requests.get(url, params={"ip": alert["source_ip"]})threat_info = response.json()return threat_info# 定义调用防火墙进行封禁的函数
def block_ip(ip):# 这里替换为实际的调用防火墙封禁IP的API地址url = "https://your-firewall-api.example.com/block"response = requests.post(url, json={"ip": ip})return response.ok# 定义定时任务
def check_alerts():alerts = get_historical_alerts()for alert in alerts:if alert["attack_result"] == "success":threat_info = get_threat_intelligence(alert)if threat_info["malicious"]:blocked = block_ip(alert["source_ip"])if blocked:print(f"IP {alert['source_ip']} 已被封禁")else:print(f"无法封禁 IP {alert['source_ip']}")# 每小时执行一次定时任务
schedule.every(1).hours.do(check_alerts)# 无限循环执行定时任务
while True:schedule.run_pending()time.sleep(1)

请将上述代码中的API地址替换为实际的查询历史告警、查询威胁情报和调用防火墙封禁IP的API地址。运行此脚本后,它将每小时执行一次定时任务,检查历史告警中攻击结果为成功的告警,并根据告警结果查询威胁情报。如果是恶意IP,则调用防火墙进行封禁。

【告警自动化处置脚本2】

这是一个使用 Python 编写的示例脚本,它定时查询历史告警,找到攻击结果为成功的告警,根据告警结果查询威胁情报,如果是恶意IP则调用防火墙进行封禁。

请注意,这个示例脚本仅用于说明目的,并没有实际的 API 调用。你需要根据你的实际环境和 API 进行修改。

import time
import requests# 配置参数
INTERVAL = 60  # 查询间隔(秒)
ALARM_API_URL = "http://example.com/api/alarm"  # 历史告警 API URL
THREAT_INTEL_API_URL = "http://example.com/api/threat-intel"  # 威胁情报 API URL
FIREWALL_API_URL = "http://example.com/api/firewall"  # 防火墙 API URLdef get_successful_alarms():response = requests.get(ALARM_API_URL)alarms = response.json()successful_alarms = [alarm for alarm in alarms if alarm["attack_result"] == "success"]return successful_alarmsdef get_threat_intel(ip):response = requests.get(THREAT_INTEL_API_URL, params={"ip": ip})threat_intel = response.json()return threat_inteldef block_ip(ip):response = requests.post(FIREWALL_API_URL, json={"action": "block", "ip": ip})return response.status_code == 200def main():while True:print("查询历史告警...")successful_alarms = get_successful_alarms()for alarm in successful_alarms:ip = alarm["source_ip"]print(f"查询威胁情报: {ip}")threat_intel = get_threat_intel(ip)if threat_intel["malicious"]:print(f"封禁恶意 IP: {ip}")if block_ip(ip):print(f"封禁成功: {ip}")else:print(f"封禁失败: {ip}")time.sleep(INTERVAL)if __name__ == "__main__":main()

这个脚本首先定义了一些配置参数,如查询间隔、API URL 等。接下来,定义了三个函数:get_successful_alarms 用于查询历史告警并筛选出攻击成功的告警;get_threat_intel 用于查询指定 IP 的威胁情报;block_ip 用于调用防火墙 API 封禁指定 IP。

main 函数是脚本的主要逻辑,它会循环执行以下操作:查询历史告警,找到攻击成功的告警;对于每个成功的告警,查询其源 IP 的威胁情报;如果威胁情报显示该 IP 是恶意的,则调用防火墙 API 进行封禁。

为了运行此脚本,你需要安装 requests 库。你可以使用以下命令安装:

pip install requests

请注意,这个脚本仅作为示例,你需要根据你的实际环境和 API 进行修改。在实际使用中,你还需要考虑异常处理、日志记录等方面的问题。

【告警自动化处置脚本3】

以下是一个更详细的 Python 脚本示例,使用 pymysql 库查询数据库中的历史告警,并使用微步在线的威胁情报 API 查询威胁情报。该脚本包含详细的注释以解释每个步骤。

请注意,此示例脚本仅用于说明目的,你需要根据你的实际环境和数据库配置进行修改。

import time
import pymysql
import requests# 配置参数
INTERVAL = 60  # 查询间隔(秒)
DB_CONFIG = {  # 数据库配置'host': 'localhost','user': 'username','password': 'password','db': 'database_name','charset': 'utf8mb4'
}
THREAT_INTEL_API_URL = "https://api.example.com/v4/ip"  # 微步威胁情报 API URL
THREAT_INTEL_API_KEY = "your_api_key"  # 微步 API 密钥
FIREWALL_API_URL = "http://example.com/api/firewall"  # 防火墙 API URL# 连接数据库
def connect_db():connection = pymysql.connect(**DB_CONFIG)return connection# 查询成功攻击的告警
def get_successful_alarms(connection):with connection.cursor() as cursor:sql = "SELECT * FROM alarms WHERE attack_result = 'success'"cursor.execute(sql)successful_alarms = cursor.fetchall()return successful_alarms# 查询微步威胁情报
def get_threat_intel(ip):headers = {"Authorization": f"ApiKey {THREAT_INTEL_API_KEY}"}response = requests.get(f"{THREAT_INTEL_API_URL}/{ip}", headers=headers)threat_intel = response.json()return threat_intel# 调用防火墙 API 进行封禁
def block_ip(ip):response = requests.post(FIREWALL_API_URL, json={"action": "block", "ip": ip})return response.status_code == 200# 主函数
def main():connection = connect_db()while True:print("查询成功攻击的告警...")successful_alarms = get_successful_alarms(connection)for alarm in successful_alarms:ip = alarm["source_ip"]print(f"查询微步威胁情报: {ip}")threat_intel = get_threat_intel(ip)if threat_intel["malicious"]:print(f"封禁恶意 IP: {ip}")if block_ip(ip):print(f"封禁成功: {ip}")else:print(f"封禁失败: {ip}")time.sleep(INTERVAL)if __name__ == "__main__":main()

在这个脚本中,我们首先定义了一些配置参数,如查询间隔、数据库配置、API URL 等。接下来,定义了四个函数:

  1. connect_db:连接到数据库并返回连接对象。
  2. get_successful_alarms:查询数据库中成功攻击的告警。
  3. get_threat_intel:查询指定 IP 的微步威胁情报。
  4. block_ip:调用防火墙 API 封禁指定 IP。

main 函数是脚本的主要逻辑,它会循环执行以下操作:连接到数据库,查询成功攻击的告警;对于每个成功的告警,查询其源 IP 的微步威胁情报;如果威胁情报显示该 IP 是恶意的,则调用防火墙 API 进行封禁。

为了运行此脚本,你需要安装 pymysqlrequests 库。你可以使用以下命令安装:

pip install pymysql requests

请注意,这个脚本仅作为示例,你需要根据你的实际环境和数据库配置进行修改。在实际使用中,你还需要考虑异常处理、日志记录等方面的问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/233686.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HarmonyOS:Neural Network Runtime对接AI推理框架开发指导

场景介绍 Neural Network Runtime 作为 AI 推理引擎和加速芯片的桥梁,为 AI 推理引擎提供精简的 Native 接口,满足推理引擎通过加速芯片执行端到端推理的需求。 本文以图 1 展示的 Add 单算子模型为例,介绍 Neural Network Runtime 的开发流…

轻量封装WebGPU渲染系统示例<50>- Json数据描述材质等3D渲染场景信息

本示例中的3d渲染场景由Json数据来描述。 包含3个主要部分: 1. Json描述渲染器的基本信息。 2. Json描述渲染场景的环境信息,包括全局的灯光、阴影、雾等。 3. Json描述构成场景的各个可选人实体,包括几何信息、transform、材质、渲染状态等。 当前示例源码git…

postMessage解决跨域、消息传递(平台嵌入其他项目,需要相互发送接受消息)

使用背景:其他平台嵌入到自己项目平台上,使用iframe做嵌套,但是涉及到进来免登录以及跨域问题,使用postMessage发送信息。 想要使用 postMessage 实现跨域通信和页面间数据通信,只要记住 window 提供的 postMessage 方…

开启创意之旅:免费、开源的噪波贴图(noise texture)生成网站——noisecreater.com详细介绍

在当今数字创意领域,噪波贴图(Noise Texture)是游戏渲染、游戏开发、美术设计以及影视制作等行业不可或缺的艺术素材之一。为了满足广大创作者的需求,noisecreater.com应运而生,成为一款免费、开源的噪波贴图生成工具。…

保护IP地址免受盗用的有效方法

IP地址是互联网通信的基础,然而,由于其重要性,IP地址的盗用成为一种潜在的网络威胁。本文将深入探讨防止IP地址被盗用的方法,以维护网络的安全性。 第一部分:IP地址盗用的威胁与风险 1.1 IP地址盗用的定义 IP地址盗…

数据可视化---离群值展示

内容导航 类别内容导航机器学习机器学习算法应用场景与评价指标机器学习算法—分类机器学习算法—回归机器学习算法—聚类机器学习算法—异常检测机器学习算法—时间序列数据可视化数据可视化—折线图数据可视化—箱线图数据可视化—柱状图数据可视化—饼图、环形图、雷达图统…

操作系统系列:Unix进程系统调用fork,wait,exec

操作系统系列:Unix进程系统调用 fork系统调用fork()运用的小练习 wait系统调用Zombiesexec 系列系统调用 开发者可以查看创建新进程的系统调用,这个模块会讨论与进程相关的Unix系统调用,下一个模块会讨论Win32 APIs相关的进程。 fork系统调用…

java参数校验

引入依赖 <!--参数效验--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency><!--Length参数效验--><dependency><groupId>org.hib…

量子计算:开启IT领域的新时代

量子计算&#xff1a;开启IT领域的新时代 一、量子计算的基本原理与背景 量子计算作为一项前沿高级技术&#xff0c;正逐渐引起IT领域的广泛关注。传统计算机是通过二进制位&#xff08;0和1&#xff09;来储存和处理信息&#xff0c;而量子计算机则利用量子位或称为“量子比特…

python requests 设置全局代理,之后调用就不用设置

import requestsproxies {http: http://your_proxy_server:port,https: https://your_proxy_server:port }# 设置全局代理 session requests.Session() session.proxies.update(proxies)# 发起请求 response_with_proxy session.get(https://example.com)# 取消代理请求 res…

【python基础】-- yarn add 添加依赖的各种类型

目录 1、安装 yarn 1.1 使用npm安装 1.2 查看版本 1.3 yarn 淘宝源配置 2、安装命令说明 2.1 yarn add&#xff08;会更新package.json和yarn.lock&#xff09; 2.2 yarn install 2.3 一些操作 2.3.1 发布包 2.3.2 移除一个包 2.3.3 更新一个依赖 2.3.4 运行脚本 …

华为OD机试真题-螺旋数字矩阵-2023年OD统一考试(C卷)

题目描述:疫情期间,小明隔离在家,百无聊赖,在纸上写数字玩。他发明了一种写法: 给出数字个数n和行数m(0 < n ≤ 999,0 < m ≤ 999),从左上角的1开始,按照顺时针螺旋向内写方式,依次写出2,3...n,最终形成一个m行矩阵。 小明对这个矩阵有些要求: 1.每行数字的…

【设计模式--行为型--备忘录模式】

设计模式--行为型--备忘录模式 备忘录模式定义结构案例实现白箱备忘录模式黑箱备忘录模式 优缺点使用场景 备忘录模式 定义 又叫快照模式&#xff0c;在不破坏封装性的前提下&#xff0c;捕获一个对象的对象的内部状态&#xff0c;并在该对象之外保存这个状态&#xff0c;以便…

Python控制Excel自动刷新页面

比如我们有一个待刷新的Excel叫测试.xlsx 这里我们使用python控制Excel的应用来直接刷新相关页面&#xff1a; 传入的Excel路径需要是完整的路径&#xff0c;否则会提示找不到&#xff1a;pywintypes.com_error: (-2147352567, 发生意外。, (0, Microsoft Excel, 抱歉&#x…

常用数据库的分页语句(mySQL、oracle、PostgreSQL、SQL Server)

目录 ORACLE MySQL PostgreSQL SQL Server ORACLE SELECT * FROM (SELECT t.*, ROWNUM AS rnFROM (SELECT * FROM 表名 ORDER BY 排序字段) tWHERE ROWNUM < 结束行数 ) WHERE rn > 开始行数; 其中&#xff0c;表名是你要查询的表名&#xff0c;排序字段是你希望按…

Java 自定义注解

Java 自定义注解&#xff0c; 以及interface Target Retention Around Before After ProceedingJoinPoint JoinPoint 等用法 注解应用非常广泛&#xff0c;我们自定义注解能简化开发各种各种业务 一、关键字解释 (1) 定义注解时&#xff0c;关键字 interface 来表示注解类的类…

Spring Boot学习随笔- 实现AOP(JoinPoint、ProceedingJoinPoint、自定义注解类实现切面)

学习视频&#xff1a;【编程不良人】2021年SpringBoot最新最全教程 第十一章、AOP 11.1 为什么要使用AOP 问题 现有业务层开发存在问题 额外功能代码存在大量冗余每个方法都需要书写一遍额外功能代码不利于项目维护 Spring中的AOP AOP&#xff1a;Aspect 切面 Oriented 面向…

前端面试题(计算机网络):常见的HTTP请求头和响应头

前端面试题&#xff08;计算机网络&#xff09;&#xff1a;常见的HTTP请求头和响应头 HTTP Request Header 常见的请求头&#xff1a;HTTP Responses Header 常见的响应头&#xff1a;常见的 Content-Type 属性值有以下四种&#xff1a; HTTP Request Header 常见的请求头&…

竞赛保研 python 机器视觉 车牌识别 - opencv 深度学习 机器学习

1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 基于python 机器视觉 的车牌识别系统 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;3分工作量&#xff1a;3分创新点&#xff1a;3分 &#x1f9ff; 更多资…

【Python 基础】-- 在 mac OS 中安装 多个 python 版本

目录 1、需求 2、实现 2.1 安装 pyenv 2.2 安装 pyenv-virtualenv 2.3 配置环境变量 2.4 创建 python 3.9.9 的环境 2.5 激活环境&#xff0c;在当前项目目录中使用&#xff0c;即执行 python 1、需求 由于项目所依赖的 python 版本有多个&#xff0c;需要在不同的 pyth…