SpingBoot 整合 kafka Elk

文章目录

            • 1. 依赖
            • 2. yml配置
            • 3. 测试类
            • 4. aop拦截
            • 5. 并发队列异步发送MQ
            • 6. 封装json消息
            • 7. 完整封装json消息

1. 依赖
<dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.79</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
2. yml配置

application.yml

server:port: 8080
spring:application:name: springboot-kafkakafka:bootstrap-servers: 192.168.92.137:9092producer:batch-size: 16384buffer-memory: 33544432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default_consumer_groupauto-commit-interval: 1000auto-offset-reset: earliestenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3. 测试类
package com.gblfy.elk.controller;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@GetMapping("/healthAdvice")public String healthAdvice() {return "healthAdvice";}@GetMapping("/errorAdvice")public String errorAdvice(@RequestParam("userId") Integer userId) {Integer i = 1 / userId;return "success";}
}
4. aop拦截

演示:
AOP前置通知
后置通知
异常通知

package com.gblfy.elk.aop;import com.alibaba.fastjson.JSONObject;
import com.gblfy.elk.queue.AsynConcurrentQueue;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;@Aspect
@Component
public class AopLogAspect {@Value("${server.port}")private String serverPort;// 申明一个切点 里面是 execution表达式@Pointcut("execution(* com.gblfy.elk.controller.*.*(..))")private void serviceAspect() {}@Autowiredprivate AsynConcurrentQueue asynConcurrentQueue;// 请求method前打印内容@Before(value = "serviceAspect()")public void methodBefore(JoinPoint joinPoint) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式jsonObject.put("request_time", df.format(new Date()));jsonObject.put("request_url", request.getRequestURL().toString());jsonObject.put("request_method", request.getMethod());jsonObject.put("signature", joinPoint.getSignature());jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs()));// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);JSONObject requestJsonObject = new JSONObject();requestJsonObject.put("request", jsonObject);// 将日志信息投递到kafka中String log = requestJsonObject.toJSONString();asynConcurrentQueue.put(log);}// 在方法执行完结后打印返回内容@AfterReturning(returning = "o", pointcut = "serviceAspect()")public void methodAfterReturing(Object o) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject respJSONObject = new JSONObject();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式jsonObject.put("response_time", df.format(new Date()));jsonObject.put("response_content", JSONObject.toJSONString(o));// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);respJSONObject.put("response", jsonObject);asynConcurrentQueue.put(respJSONObject.toJSONString());}/*** 异常通知** @param point*/@AfterThrowing(pointcut = "serviceAspect()", throwing = "e")public void serviceAspect(JoinPoint point, Exception e) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式jsonObject.put("request_time", df.format(new Date()));jsonObject.put("request_url", request.getRequestURL().toString());jsonObject.put("request_method", request.getMethod());jsonObject.put("signature", point.getSignature());jsonObject.put("request_args", Arrays.toString(point.getArgs()));jsonObject.put("error", e.toString());// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);JSONObject requestJsonObject = new JSONObject();requestJsonObject.put("request", jsonObject);// 将日志信息投递到kafka中String log = requestJsonObject.toJSONString();asynConcurrentQueue.put(log);}public static String getIpAddr(HttpServletRequest request) {//X-Forwarded-For(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。String ipAddress = request.getHeader("x-forwarded-for");if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getHeader("Proxy-Client-IP");}if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getHeader("WL-Proxy-Client-IP");}if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getRemoteAddr();if (ipAddress.equals("127.0.0.1") || ipAddress.equals("0:0:0:0:0:0:0:1")) {//根据网卡取本机配置的IPInetAddress inet = null;try {inet = InetAddress.getLocalHost();} catch (UnknownHostException e) {e.printStackTrace();}ipAddress = inet.getHostAddress();}}//对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割if (ipAddress != null && ipAddress.length() > 15) { //"***.***.***.***".length() = 15if (ipAddress.indexOf(",") > 0) {ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));}}return ipAddress;}
}
5. 并发队列异步发送MQ

这里采用异步发消息到MQ,是为了解决同步发送消息到MQ和业务线程同属于一个主线程带来的延迟问题。

package com.gblfy.elk.queue;import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;@Component
public class AsynConcurrentQueue {private static BlockingDeque<String> logDeque = new LinkedBlockingDeque<>();@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;public AsynConcurrentQueue() {// 初始化new AsynConcurrentQueue.LogThreadKafka().start();}/*** 存入日志** @param log*/public void put(String log) {logDeque.offer(log);}class LogThreadKafka extends Thread {@Overridepublic void run() {while (true) {String log = logDeque.poll();if (!StringUtils.isEmpty(log)) {// 将消息投递kafka中kafkaTemplate.send("mayikt-log", log);}}}}
}
6. 封装json消息
   { "request_args": "[0]","request_method": "GET","error": "java.lang.ArithmeticException: / by zero","ip_addres": "192.168.92.1:8080","request_url": "http://localhost:8080/errorAdvice"}
7. 完整封装json消息
{"request": {"request_time": "2022-03-11 20:45:50","signature": {"declaringType": "com.gblfy.elk.controller.KafkaController","declaringTypeName": "com.gblfy.elk.controller.KafkaController","exceptionTypes": [],"method": {"accessible": false,"annotatedExceptionTypes": [],"annotatedParameterTypes": [{"annotations": [],"declaredAnnotations": [],"type": "java.lang.Integer"}],"annotatedReceiverType": {"annotations": [],"declaredAnnotations": [],"type": "com.gblfy.elk.controller.KafkaController"},"annotatedReturnType": {"annotations": [],"declaredAnnotations": [],"type": "java.lang.String"},"annotations": [{"path": [],"headers": [],"name": "","produces": [],"params": [],"value": ["/errorAdvice"],"consumes": []}],"bridge": false,"declaringClass": "com.gblfy.elk.controller.KafkaController","default": false,"exceptionTypes": [],"genericExceptionTypes": [],"genericParameterTypes": ["java.lang.Integer"],"genericReturnType": "java.lang.String","modifiers": 1,"name": "errorAdvice","parameterAnnotations": [[{"name": "","value": "userId","defaultValue": "\n\t\t\n\t\t\n\n\t\t\t\t\n","required": true}]],"parameterCount": 1,"parameterTypes": ["java.lang.Integer"],"returnType": "java.lang.String","synthetic": false,"typeParameters": [],"varArgs": false},"modifiers": 1,"name": "errorAdvice","parameterNames": ["userId"],"parameterTypes": ["java.lang.Integer"],"returnType": "java.lang.String"},"request_args": "[0]","request_method": "GET","error": "java.lang.ArithmeticException: / by zero","ip_addres": "192.168.92.1:8080","request_url": "http://localhost:8080/errorAdvice"}
}
GET /mayikt_logs/_search

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/515919.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

哀其不幸,怒其不争

不甘于平庸却又不想承受努力的痛苦。每天在碌碌无为中懊悔&#xff0c;又在懊悔中每日碌碌无为。时常后悔为什么之前的自己不努力一点&#xff0c;却又不舍得让现在的自己踏出舒适圈&#xff0c;那将来的自己还是会后悔现在的自己为什么不努力一点。每天晚上都在想&#xff0c;…

如何用一套引擎搞定机器学习全流程?

作者:陈戊超&#xff08;仲卓&#xff09; 深度学习技术在当代社会发挥的作用越来越大。目前深度学习被广泛应用于个性化推荐、商品搜索、人脸识别、机器翻译、自动驾驶等多个领域&#xff0c;此外还在向社会各个领域迅速渗透。 背景 当前&#xff0c;深度学习的应用越来越多…

分布式解决方案之分布式日志采集elk+kafka 环境的构建

文章目录一、软件下载列表1. zookeeper2. kafka3. logstash4. elasticsearch5. kibana6. zktools二、安装zk环境2.1. 上传安装包2.2. 解压2.3. 创建data目录2.4. 修改配置2.5.启动zk三、安装kafka环境3.1. 上传安装包3.2. 解压kafka安装包3.3. 创建data文件夹3.4. 修改配置3.5.…

基于https国密算法构建安全数据传输链路

网络数据安全得到前所未有的重视 HTTPS成为解决传输安全问题利器 大家都知道&#xff0c;HTTP 本身是明文传输的&#xff0c;没有经过任何安全处理&#xff0c;网站HTTPS解决方案通过在HTTP协议之上引入证书服务&#xff0c;完美解决网站的安全问题。 下图左侧表示Chrome浏览…

AOP+自定义注解 实现service统一的异常信息处理

返回信息枚举 TipsMsg&#xff1a; public enum TipsMsg {DEFAULT_SUCCESS("0","操作成功"),DEFAULT_FAILED("1","操作失败"),//其他枚举//get//set}自定义注解 ReturnMsg &#xff08;也是切点&#xff09; &#xff1a; Target(El…

跟风 Python 的人,后来都怎样了?

许多人觉得 Python 功能强大、就业范围范围广还上手轻松&#xff0c;得来全部费功夫。 但是一旦推开 Python 的大门你会发现&#xff0c;Python 入门容易但精通很难。看似语法掌握熟练&#xff0c;但一面试或者做项目就会被打回原形。 比如&#xff1a; 1&#xff09;如何写出高…

Kubernetes 是一个“数据库”吗?

作者 | 张磊&#xff0c;阿里云高级技术专家、CNCF 官方大使&#xff0c;CNCF 应用交付领域 co-chair&#xff0c;Kubernetes 项目资深维护者 最近&#xff0c;Kubernetes 社区里有一个关于“Kubernetes is the new database”的论述&#xff0c;引起了很多人的关注。当然&…

ES启动异常:max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

现象 ES启动时出现异常 bin/elasticsearchmax virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]原因分析 系统虚拟内存默认最大映射数为65530&#xff0c;无法满足ES系统要求&#xff0c;需要调整为262144以上。 处理办法 设置vm.m…

mybatis resultMap type属性问题 ( xx.xx.PO cannot be cast to xx.xx.BO)

一句话总结&#xff1a; mybatis 查询语句的映射实体是根据resultMap 的type属性决定的。 即type属性为A&#xff0c;即使接口返回值用B接收&#xff0c;他的实际类型还是A&#xff08;idea不会编译出错&#xff09; 异常信息&#xff1a; xx.NameA_PO cannot be cast to xx…

云原生下的开发测试

【以下为分享实录&#xff0c;有删节】 测试环境管理之困与阿里巴巴的解决之道 在云原生时代下&#xff0c;软件的迭代速度越来越快&#xff0c;对测试的要求也越来越高&#xff0c;很多开发者开始使用Kubernetes来管理测试环境。在这个过程中&#xff0c;开发者会遇到很多困…

Mybatis 常用语句

文章目录1.结果集&#xff1a;2.代码块&#xff1a;2.1. 代码块&#xff08;可被引用&#xff09;2.2. 分支&#xff08;if else&#xff09;3.1 添加语句&#xff1a;3.2. 添加语句&#xff08;主键自增&#xff0c;并返回主键&#xff09;4.逻辑删除语句&#xff1a;5.物理删…

我是程序员,我用这种方式铭记历史

作者 | kokohuang责编 | 晋兆雨头图 | 付费下载于视觉中国✨抗战直播: 以图文方式“直播”1931年9月18日至1945年9月2日14年间抗战的日日夜夜✨开源地址&#xff1a;https://github.com/kokohuang/WarOfResistanceLive✨预览地址&#xff1a;https://kokohuang.github.io/WarOf…

分布式ELK日志采集系统

文章目录1. 传统日志采集存在哪些优缺点2. Elk采集日志的原理3. 为什么需要将日志存储在ElasticSeach 而不是mysql中呢4. 为什么需要使用elkkafka5. elkkafka原理6. elkkafka 环境的构建7. SpingBoot 整合 kafka Elk1.传统日志采集存在哪些问题2.分布式日志采集有哪些方案3.Ela…

从零入门 Serverless | 架构的演进

作者 | 许晓斌 阿里云高级技术专家 本文整理自《Serverless 技术公开课》第 1 讲&#xff0c;点击开始学习。 关注 “Serverless” 公众号&#xff0c;回复 入门 &#xff0c;即可获取 Serverless 系列文章 PPT。 传统单体应用架构 十多年前主流的应用架构都是单体应用&…

如何使用 Istio 进行多集群部署管理:多控制平面

作者 | 王夕宁 阿里云高级技术专家 导读&#xff1a;本文摘自于阿里云高级技术专家王夕宁撰写的《Istio 服务网格技术解析与实战》一书&#xff0c;讲述了如何使用 Istio 进行多集群部署管理来阐述服务网格对多云环境、多集群即混合部署的支持能力。 前文详情&#xff1a; …

@Transactional 事务失效记录

代码调用结构&#xff1a; Transactional public ReturnBo saveBase(BaseBo bo){ServiceA.dbMethodA();ServiceB.dbMethodB();ServiceC.dbMethodC(); }异常现象&#xff1a; 在saveBase方法中&#xff0c;如果 ServiceB.dbMethodB()方法实现中出现了异常&#xff0c;Servic…

JDK8新特性入门到精通

文章目录一、 接口中默认方法修饰为普通方法1. 在jdk8之前2. 在JDK 1.8开始3. 案例演练二、Lambda表达式2.1. 什么是Lambda表达式2.2. 为什么要使用Lambda表达式2.3. Lambda表达式的规范2.4. 函数接口定义2.5. Lambda基础语法2.6. 方法引入2.7. Lambda实战案例三、java 8 strea…

一文聊“图”,从图数据库到知识图谱

作者 | 穆琼责编 | 晋兆雨头图 | 付费下载于视觉中国随着知识图谱的发展&#xff0c;图数据库一词被越来越多的提到。那么到底什么是图数据库&#xff0c;为什么要用图数据库&#xff0c;如何去建设一个图数据库应用系统&#xff0c;图数据库与知识图谱到底是什么关系。今天为大…

阿里云机器学习PAI DSW 2.0 Alink商业版重磅发布

DSW 2.0&#xff1a;面向AI研发的集成开发平台 DSW&#xff08;Data Science Workshop&#xff09;是阿里巴巴PAI团队根据多年的AI算法和产品研发经验积累&#xff0c;围绕提高AI算法研发效率&#xff0c;降低研发成本而推出的一款适用于各类AI开发者的云端机器学习集成开发环…