前言
由于mysql链接超时波动,导致数据缺失,需要根据日志填补数据
流程
获取确实数据的订单列表
搜索日志,获取请求日志
根据请求日志拼装sql
打印sql供修复数据
代码
因为我们日志打印的有问题,所以这里用字符串截取获取入参。如果日志打印的是标准json,直接搞json即可
from elasticsearch import Elasticsearch
import jsonclass MyUtils:passdef getValue(fullStr, beginStr, endStr):start = fullStr.find(beginStr) + len(beginStr)end = fullStr.find(endStr)value = fullStr[start:end]return valuedef setValue(orderInfoExt, columnName, fullStr, beginStr, endStr):value = MyUtils.getValue(fullStr, beginStr, endStr)if value != 'null':orderInfoExt[columnName] = valuees = Elasticsearch(hosts="http://xxx:9200/", http_auth=('xxx', 'xxx'))
scroll_id = None
fileName = "create-order-info" + ".txt"
orderIdList = [74xxxx574,74xxxx822]
orderExtInfoList = []for orderId in orderIdList:query_json = {"_source": ["message", "logger_name", "@timestamp"],"query": {"bool": {"filter":[{"bool":{"filter":[{"multi_match":{"lenient": True,"query": "order/v1/createOrder","type": "phrase"}},{"multi_match":{"lenient": True,"query": orderId,"type": "phrase"}}]}},{"range":{"@timestamp":{"format": "strict_date_optional_time","gte": "2024-11-01T00:00:00.000Z","lte": "2024-11-02T10:00:00.000Z"}}}],"must":[],"must_not":[],"should":[]}}}query = es.search(index='xxxx-pro*', body=query_json, scroll='25m', size=5000,request_timeout=2000000)for k in query['hits']['hits']:timestr = k['_source']['@timestamp']request = k['_source']['message']orderInfoExt = {}#beancopy的字段MyUtils.setValue(orderInfoExt, 'user_device_mac', request, "userDeviceMac=", ", userDeviceImei")MyUtils.setValue(orderInfoExt, 'user_device_imei', request, "userDeviceImei=", ", userDeviceImsi")#特殊的字段MyUtils.setValue(orderInfoExt, 'order_id', request, "orderId=", ", oid")MyUtils.setValue(orderInfoExt, 'user_order_ip', request, "userIpAddr=", ", userPort")#print(orderInfoExt)orderExtInfoList.append(orderInfoExt)
# 假设表名为 orders
table_name = 'order_info_ext'
for orderInfoExt in orderExtInfoList:# 提取列名columns = ', '.join(orderInfoExt.keys())# 提取值,并处理为适当的格式values = []for key, value in orderInfoExt.items():if value == 'null':values.append('NULL')elif isinstance(value, (int, float)):values.append(str(value))elif isinstance(value, str):values.append("'"+value+"'")else:values.append('NULL')# 构建 INSERT 语句sql = f"INSERT INTO {table_name} ({columns}) VALUES ({', '.join(values)});"print(sql)