SpingBoot 整合 kafka Elk

文章目录

            • 1. 依赖
            • 2. yml配置
            • 3. 测试类
            • 4. aop拦截
            • 5. 并发队列异步发送MQ
            • 6. 封装json消息
            • 7. 完整封装json消息

1. 依赖
<dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.79</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
2. yml配置

application.yml

server:port: 8080
spring:application:name: springboot-kafkakafka:bootstrap-servers: 192.168.92.137:9092producer:batch-size: 16384buffer-memory: 33544432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default_consumer_groupauto-commit-interval: 1000auto-offset-reset: earliestenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3. 测试类
package com.gblfy.elk.controller;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@GetMapping("/healthAdvice")public String healthAdvice() {return "healthAdvice";}@GetMapping("/errorAdvice")public String errorAdvice(@RequestParam("userId") Integer userId) {Integer i = 1 / userId;return "success";}
}
4. aop拦截

演示:
AOP前置通知
后置通知
异常通知

package com.gblfy.elk.aop;import com.alibaba.fastjson.JSONObject;
import com.gblfy.elk.queue.AsynConcurrentQueue;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;@Aspect
@Component
public class AopLogAspect {@Value("${server.port}")private String serverPort;// 申明一个切点 里面是 execution表达式@Pointcut("execution(* com.gblfy.elk.controller.*.*(..))")private void serviceAspect() {}@Autowiredprivate AsynConcurrentQueue asynConcurrentQueue;// 请求method前打印内容@Before(value = "serviceAspect()")public void methodBefore(JoinPoint joinPoint) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式jsonObject.put("request_time", df.format(new Date()));jsonObject.put("request_url", request.getRequestURL().toString());jsonObject.put("request_method", request.getMethod());jsonObject.put("signature", joinPoint.getSignature());jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs()));// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);JSONObject requestJsonObject = new JSONObject();requestJsonObject.put("request", jsonObject);// 将日志信息投递到kafka中String log = requestJsonObject.toJSONString();asynConcurrentQueue.put(log);}// 在方法执行完结后打印返回内容@AfterReturning(returning = "o", pointcut = "serviceAspect()")public void methodAfterReturing(Object o) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject respJSONObject = new JSONObject();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式jsonObject.put("response_time", df.format(new Date()));jsonObject.put("response_content", JSONObject.toJSONString(o));// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);respJSONObject.put("response", jsonObject);asynConcurrentQueue.put(respJSONObject.toJSONString());}/*** 异常通知** @param point*/@AfterThrowing(pointcut = "serviceAspect()", throwing = "e")public void serviceAspect(JoinPoint point, Exception e) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式jsonObject.put("request_time", df.format(new Date()));jsonObject.put("request_url", request.getRequestURL().toString());jsonObject.put("request_method", request.getMethod());jsonObject.put("signature", point.getSignature());jsonObject.put("request_args", Arrays.toString(point.getArgs()));jsonObject.put("error", e.toString());// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);JSONObject requestJsonObject = new JSONObject();requestJsonObject.put("request", jsonObject);// 将日志信息投递到kafka中String log = requestJsonObject.toJSONString();asynConcurrentQueue.put(log);}public static String getIpAddr(HttpServletRequest request) {//X-Forwarded-For(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。String ipAddress = request.getHeader("x-forwarded-for");if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getHeader("Proxy-Client-IP");}if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getHeader("WL-Proxy-Client-IP");}if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getRemoteAddr();if (ipAddress.equals("127.0.0.1") || ipAddress.equals("0:0:0:0:0:0:0:1")) {//根据网卡取本机配置的IPInetAddress inet = null;try {inet = InetAddress.getLocalHost();} catch (UnknownHostException e) {e.printStackTrace();}ipAddress = inet.getHostAddress();}}//对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割if (ipAddress != null && ipAddress.length() > 15) { //"***.***.***.***".length() = 15if (ipAddress.indexOf(",") > 0) {ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));}}return ipAddress;}
}
5. 并发队列异步发送MQ

这里采用异步发消息到MQ,是为了解决同步发送消息到MQ和业务线程同属于一个主线程带来的延迟问题。

package com.gblfy.elk.queue;import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;@Component
public class AsynConcurrentQueue {private static BlockingDeque<String> logDeque = new LinkedBlockingDeque<>();@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;public AsynConcurrentQueue() {// 初始化new AsynConcurrentQueue.LogThreadKafka().start();}/*** 存入日志** @param log*/public void put(String log) {logDeque.offer(log);}class LogThreadKafka extends Thread {@Overridepublic void run() {while (true) {String log = logDeque.poll();if (!StringUtils.isEmpty(log)) {// 将消息投递kafka中kafkaTemplate.send("mayikt-log", log);}}}}
}
6. 封装json消息
   { "request_args": "[0]","request_method": "GET","error": "java.lang.ArithmeticException: / by zero","ip_addres": "192.168.92.1:8080","request_url": "http://localhost:8080/errorAdvice"}
7. 完整封装json消息
{"request": {"request_time": "2022-03-11 20:45:50","signature": {"declaringType": "com.gblfy.elk.controller.KafkaController","declaringTypeName": "com.gblfy.elk.controller.KafkaController","exceptionTypes": [],"method": {"accessible": false,"annotatedExceptionTypes": [],"annotatedParameterTypes": [{"annotations": [],"declaredAnnotations": [],"type": "java.lang.Integer"}],"annotatedReceiverType": {"annotations": [],"declaredAnnotations": [],"type": "com.gblfy.elk.controller.KafkaController"},"annotatedReturnType": {"annotations": [],"declaredAnnotations": [],"type": "java.lang.String"},"annotations": [{"path": [],"headers": [],"name": "","produces": [],"params": [],"value": ["/errorAdvice"],"consumes": []}],"bridge": false,"declaringClass": "com.gblfy.elk.controller.KafkaController","default": false,"exceptionTypes": [],"genericExceptionTypes": [],"genericParameterTypes": ["java.lang.Integer"],"genericReturnType": "java.lang.String","modifiers": 1,"name": "errorAdvice","parameterAnnotations": [[{"name": "","value": "userId","defaultValue": "\n\t\t\n\t\t\n\n\t\t\t\t\n","required": true}]],"parameterCount": 1,"parameterTypes": ["java.lang.Integer"],"returnType": "java.lang.String","synthetic": false,"typeParameters": [],"varArgs": false},"modifiers": 1,"name": "errorAdvice","parameterNames": ["userId"],"parameterTypes": ["java.lang.Integer"],"returnType": "java.lang.String"},"request_args": "[0]","request_method": "GET","error": "java.lang.ArithmeticException: / by zero","ip_addres": "192.168.92.1:8080","request_url": "http://localhost:8080/errorAdvice"}
}
GET /mayikt_logs/_search

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/515919.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何用一套引擎搞定机器学习全流程?

作者:陈戊超&#xff08;仲卓&#xff09; 深度学习技术在当代社会发挥的作用越来越大。目前深度学习被广泛应用于个性化推荐、商品搜索、人脸识别、机器翻译、自动驾驶等多个领域&#xff0c;此外还在向社会各个领域迅速渗透。 背景 当前&#xff0c;深度学习的应用越来越多…

分布式解决方案之分布式日志采集elk+kafka 环境的构建

文章目录一、软件下载列表1. zookeeper2. kafka3. logstash4. elasticsearch5. kibana6. zktools二、安装zk环境2.1. 上传安装包2.2. 解压2.3. 创建data目录2.4. 修改配置2.5.启动zk三、安装kafka环境3.1. 上传安装包3.2. 解压kafka安装包3.3. 创建data文件夹3.4. 修改配置3.5.…

基于https国密算法构建安全数据传输链路

网络数据安全得到前所未有的重视 HTTPS成为解决传输安全问题利器 大家都知道&#xff0c;HTTP 本身是明文传输的&#xff0c;没有经过任何安全处理&#xff0c;网站HTTPS解决方案通过在HTTP协议之上引入证书服务&#xff0c;完美解决网站的安全问题。 下图左侧表示Chrome浏览…

AOP+自定义注解 实现service统一的异常信息处理

返回信息枚举 TipsMsg&#xff1a; public enum TipsMsg {DEFAULT_SUCCESS("0","操作成功"),DEFAULT_FAILED("1","操作失败"),//其他枚举//get//set}自定义注解 ReturnMsg &#xff08;也是切点&#xff09; &#xff1a; Target(El…

跟风 Python 的人,后来都怎样了?

许多人觉得 Python 功能强大、就业范围范围广还上手轻松&#xff0c;得来全部费功夫。 但是一旦推开 Python 的大门你会发现&#xff0c;Python 入门容易但精通很难。看似语法掌握熟练&#xff0c;但一面试或者做项目就会被打回原形。 比如&#xff1a; 1&#xff09;如何写出高…

Kubernetes 是一个“数据库”吗?

作者 | 张磊&#xff0c;阿里云高级技术专家、CNCF 官方大使&#xff0c;CNCF 应用交付领域 co-chair&#xff0c;Kubernetes 项目资深维护者 最近&#xff0c;Kubernetes 社区里有一个关于“Kubernetes is the new database”的论述&#xff0c;引起了很多人的关注。当然&…

云原生下的开发测试

【以下为分享实录&#xff0c;有删节】 测试环境管理之困与阿里巴巴的解决之道 在云原生时代下&#xff0c;软件的迭代速度越来越快&#xff0c;对测试的要求也越来越高&#xff0c;很多开发者开始使用Kubernetes来管理测试环境。在这个过程中&#xff0c;开发者会遇到很多困…

我是程序员,我用这种方式铭记历史

作者 | kokohuang责编 | 晋兆雨头图 | 付费下载于视觉中国✨抗战直播: 以图文方式“直播”1931年9月18日至1945年9月2日14年间抗战的日日夜夜✨开源地址&#xff1a;https://github.com/kokohuang/WarOfResistanceLive✨预览地址&#xff1a;https://kokohuang.github.io/WarOf…

分布式ELK日志采集系统

文章目录1. 传统日志采集存在哪些优缺点2. Elk采集日志的原理3. 为什么需要将日志存储在ElasticSeach 而不是mysql中呢4. 为什么需要使用elkkafka5. elkkafka原理6. elkkafka 环境的构建7. SpingBoot 整合 kafka Elk1.传统日志采集存在哪些问题2.分布式日志采集有哪些方案3.Ela…

从零入门 Serverless | 架构的演进

作者 | 许晓斌 阿里云高级技术专家 本文整理自《Serverless 技术公开课》第 1 讲&#xff0c;点击开始学习。 关注 “Serverless” 公众号&#xff0c;回复 入门 &#xff0c;即可获取 Serverless 系列文章 PPT。 传统单体应用架构 十多年前主流的应用架构都是单体应用&…

如何使用 Istio 进行多集群部署管理:多控制平面

作者 | 王夕宁 阿里云高级技术专家 导读&#xff1a;本文摘自于阿里云高级技术专家王夕宁撰写的《Istio 服务网格技术解析与实战》一书&#xff0c;讲述了如何使用 Istio 进行多集群部署管理来阐述服务网格对多云环境、多集群即混合部署的支持能力。 前文详情&#xff1a; …

JDK8新特性入门到精通

文章目录一、 接口中默认方法修饰为普通方法1. 在jdk8之前2. 在JDK 1.8开始3. 案例演练二、Lambda表达式2.1. 什么是Lambda表达式2.2. 为什么要使用Lambda表达式2.3. Lambda表达式的规范2.4. 函数接口定义2.5. Lambda基础语法2.6. 方法引入2.7. Lambda实战案例三、java 8 strea…

一文聊“图”,从图数据库到知识图谱

作者 | 穆琼责编 | 晋兆雨头图 | 付费下载于视觉中国随着知识图谱的发展&#xff0c;图数据库一词被越来越多的提到。那么到底什么是图数据库&#xff0c;为什么要用图数据库&#xff0c;如何去建设一个图数据库应用系统&#xff0c;图数据库与知识图谱到底是什么关系。今天为大…

阿里云机器学习PAI DSW 2.0 Alink商业版重磅发布

DSW 2.0&#xff1a;面向AI研发的集成开发平台 DSW&#xff08;Data Science Workshop&#xff09;是阿里巴巴PAI团队根据多年的AI算法和产品研发经验积累&#xff0c;围绕提高AI算法研发效率&#xff0c;降低研发成本而推出的一款适用于各类AI开发者的云端机器学习集成开发环…

DSW:面向AI研发的集成开发平台

发布会传送门 产品详情 云原生技术&#xff0c;注重用户体验&#xff0c;提升研发效率 环境搭建是算法研发过程中的重要一环&#xff0c;这里除了硬件选型外&#xff0c;软件环境的安装配置&#xff0c;后续升级往往会耗费不少时间。DSW借助阿里云ECS&#xff0c;Docker和Ku…

程序员应如何理解高并发中的协程

来源 | 码农的荒岛求生责编 | 晋兆雨头图 | 付费下载于视觉中国作为程序员&#xff0c;想必你多多少少听过协程这个词&#xff0c;这项技术近年来越来越多的出现在程序员的视野当中&#xff0c;尤其高性能高并发领域。当你的同学、同事提到协程时如果你的大脑一片空白&#xff…

5G边缘计算行业通识:阿里云ENS技术演进之路

近日&#xff0c;阿里云杨敬宇在CSDN阿里云核心技术竞争力在线峰会上进行了《5G基础设施-阿里云边缘计算的技术演进之路》主题演讲&#xff0c;针对5G时代下&#xff0c;行业和技术的趋势、边缘计算产业通识以及阿里云边缘计算从过去到未来的技术演进之路进行分享。 5GAI需求推…

精讲23种设计模式-策略模式~聚合短信服务和聚合支付服务

文章目录一、设计模式1. 为什么需要使用设计模式2. 设计模式的分类3. 什么是策略模式4. 为什么叫做策略模式5. 策略模式优缺点6. 策略模式应用场景7. Spring框架中使用的策略模式二、策略模式~聚合短信服务2.1. 依赖引入2.2. 抽象公共行为接口2.3. 具体策略接口实现类2.4. 策略…

引领开源新风潮,阿里巴巴编程之夏第二期重磅来袭!

“唯有热爱&#xff0c;可抵岁月漫长”。 2020 年 5 月 25 日&#xff0c;阿里巴巴编程之夏&#xff08;Alibaba Summer of Code&#xff0c;以下简称 ASoC &#xff09;第二期正式上线&#xff0c;项目规模再度升级&#xff0c;来自开源社区的 Apache Dubbo、Apache RocketMQ…