需求
kafka中存储着syslog日志,需将消费kafka的同时,将不需要的数据过滤掉,保存后面的json个数的数据。
SYSLOG数据
<188>Nov 1 15:01:54 everyServer {"busiName":"默认业务","clntIP":"192.168.1.140","AppProto":"22","copyTo":"","bcopyTo":"","loginName":"","SvrPort":"22","scanDateTs":"2023-11-01 15:01:53","severity":"1","jobName":"敏感词检测","carryMac":"00:E2:69:53:EE:93","clntSize":"7","policyName":"","svrSize":"0","actionType":"审计","cTitle":"","grade":"默认分级","cFrom":"","attName":"","category":"默认分类","totalMatches":"1","ClntPort":"53131","matchPos":"[正文;]","SvrIP":"192.168.1.134","cTo":""}
过滤代码
import re
imporr jsonre_data = re.sub(br'<\d+>[\w\s:]+', b'', kafka_message)
print(re_data)
data = json.loads(re_data)
print(data)
结果
{"busiName":"默认业务","clntIP":"192.168.1.140","AppProto":"22","copyTo":"","bcopyTo":"","loginName":"","SvrPort":"22","scanDateTs":"2023-11-01 15:01:53","severity":"1","jobName":"敏感词检测","carryMac":"00:E2:69:53:EE:93","clntSize":"7","policyName":"","svrSize":"0","actionType":"审计","cTitle":"","grade":"默认分级","cFrom":"","attName":"","category":"默认分类","totalMatches":"1","ClntPort":"53131","matchPos":"[正文;]","SvrIP":"192.168.1.134","cTo":""}