文章目录
- 1. 依赖
- 2. yml配置
- 3. 测试类
- 4. aop拦截
- 5. 并发队列异步发送MQ
- 6. 封装json消息
- 7. 完整封装json消息
1. 依赖
<dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.79</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
2. yml配置
application.yml
server:port: 8080
spring:application:name: springboot-kafkakafka:bootstrap-servers: 192.168.92.137:9092producer:batch-size: 16384buffer-memory: 33544432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default_consumer_groupauto-commit-interval: 1000auto-offset-reset: earliestenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3. 测试类
package com.gblfy.elk.controller;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@GetMapping("/healthAdvice")public String healthAdvice() {return "healthAdvice";}@GetMapping("/errorAdvice")public String errorAdvice(@RequestParam("userId") Integer userId) {Integer i = 1 / userId;return "success";}
}
4. aop拦截
演示:
AOP前置通知
后置通知
异常通知
package com.gblfy.elk.aop;import com.alibaba.fastjson.JSONObject;
import com.gblfy.elk.queue.AsynConcurrentQueue;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;@Aspect
@Component
public class AopLogAspect {@Value("${server.port}")private String serverPort;// 申明一个切点 里面是 execution表达式@Pointcut("execution(* com.gblfy.elk.controller.*.*(..))")private void serviceAspect() {}@Autowiredprivate AsynConcurrentQueue asynConcurrentQueue;// 请求method前打印内容@Before(value = "serviceAspect()")public void methodBefore(JoinPoint joinPoint) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式jsonObject.put("request_time", df.format(new Date()));jsonObject.put("request_url", request.getRequestURL().toString());jsonObject.put("request_method", request.getMethod());jsonObject.put("signature", joinPoint.getSignature());jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs()));// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);JSONObject requestJsonObject = new JSONObject();requestJsonObject.put("request", jsonObject);// 将日志信息投递到kafka中String log = requestJsonObject.toJSONString();asynConcurrentQueue.put(log);}// 在方法执行完结后打印返回内容@AfterReturning(returning = "o", pointcut = "serviceAspect()")public void methodAfterReturing(Object o) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject respJSONObject = new JSONObject();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式jsonObject.put("response_time", df.format(new Date()));jsonObject.put("response_content", JSONObject.toJSONString(o));// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);respJSONObject.put("response", jsonObject);asynConcurrentQueue.put(respJSONObject.toJSONString());}/*** 异常通知** @param point*/@AfterThrowing(pointcut = "serviceAspect()", throwing = "e")public void serviceAspect(JoinPoint point, Exception e) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式jsonObject.put("request_time", df.format(new Date()));jsonObject.put("request_url", request.getRequestURL().toString());jsonObject.put("request_method", request.getMethod());jsonObject.put("signature", point.getSignature());jsonObject.put("request_args", Arrays.toString(point.getArgs()));jsonObject.put("error", e.toString());// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);JSONObject requestJsonObject = new JSONObject();requestJsonObject.put("request", jsonObject);// 将日志信息投递到kafka中String log = requestJsonObject.toJSONString();asynConcurrentQueue.put(log);}public static String getIpAddr(HttpServletRequest request) {//X-Forwarded-For(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。String ipAddress = request.getHeader("x-forwarded-for");if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getHeader("Proxy-Client-IP");}if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getHeader("WL-Proxy-Client-IP");}if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getRemoteAddr();if (ipAddress.equals("127.0.0.1") || ipAddress.equals("0:0:0:0:0:0:0:1")) {//根据网卡取本机配置的IPInetAddress inet = null;try {inet = InetAddress.getLocalHost();} catch (UnknownHostException e) {e.printStackTrace();}ipAddress = inet.getHostAddress();}}//对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割if (ipAddress != null && ipAddress.length() > 15) { //"***.***.***.***".length() = 15if (ipAddress.indexOf(",") > 0) {ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));}}return ipAddress;}
}
5. 并发队列异步发送MQ
这里采用异步发消息到MQ,是为了解决同步发送消息到MQ和业务线程同属于一个主线程带来的延迟问题。
package com.gblfy.elk.queue;import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;@Component
public class AsynConcurrentQueue {private static BlockingDeque<String> logDeque = new LinkedBlockingDeque<>();@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;public AsynConcurrentQueue() {// 初始化new AsynConcurrentQueue.LogThreadKafka().start();}/*** 存入日志** @param log*/public void put(String log) {logDeque.offer(log);}class LogThreadKafka extends Thread {@Overridepublic void run() {while (true) {String log = logDeque.poll();if (!StringUtils.isEmpty(log)) {// 将消息投递kafka中kafkaTemplate.send("mayikt-log", log);}}}}
}
6. 封装json消息
{ "request_args": "[0]","request_method": "GET","error": "java.lang.ArithmeticException: / by zero","ip_addres": "192.168.92.1:8080","request_url": "http://localhost:8080/errorAdvice"}
7. 完整封装json消息
{"request": {"request_time": "2022-03-11 20:45:50","signature": {"declaringType": "com.gblfy.elk.controller.KafkaController","declaringTypeName": "com.gblfy.elk.controller.KafkaController","exceptionTypes": [],"method": {"accessible": false,"annotatedExceptionTypes": [],"annotatedParameterTypes": [{"annotations": [],"declaredAnnotations": [],"type": "java.lang.Integer"}],"annotatedReceiverType": {"annotations": [],"declaredAnnotations": [],"type": "com.gblfy.elk.controller.KafkaController"},"annotatedReturnType": {"annotations": [],"declaredAnnotations": [],"type": "java.lang.String"},"annotations": [{"path": [],"headers": [],"name": "","produces": [],"params": [],"value": ["/errorAdvice"],"consumes": []}],"bridge": false,"declaringClass": "com.gblfy.elk.controller.KafkaController","default": false,"exceptionTypes": [],"genericExceptionTypes": [],"genericParameterTypes": ["java.lang.Integer"],"genericReturnType": "java.lang.String","modifiers": 1,"name": "errorAdvice","parameterAnnotations": [[{"name": "","value": "userId","defaultValue": "\n\t\t\n\t\t\n\n\t\t\t\t\n","required": true}]],"parameterCount": 1,"parameterTypes": ["java.lang.Integer"],"returnType": "java.lang.String","synthetic": false,"typeParameters": [],"varArgs": false},"modifiers": 1,"name": "errorAdvice","parameterNames": ["userId"],"parameterTypes": ["java.lang.Integer"],"returnType": "java.lang.String"},"request_args": "[0]","request_method": "GET","error": "java.lang.ArithmeticException: / by zero","ip_addres": "192.168.92.1:8080","request_url": "http://localhost:8080/errorAdvice"}
}
GET /mayikt_logs/_search