文章目录
- 一、安装docker-compose插件
- 1. 下载docker-compose插件
- 2. 赋予权限
- 二、搭建ELK+KAFKA环境
- 2.1. 编写docker-compose
- 2.2. 启动docker-compose
- 2.3. 验证效果
- 2.4. 安装logstash
- 三、微信项目投递消息kafka
- 3.1. 微信集成kafka
- 3.2. 配置kafka
- 3.3. aop拦截
- 3.4. 消息投递
- 3.5. 测试接口
- 3.6. apipost 发送请求
- 3.7. kibana 查看日志
一、安装docker-compose插件
1. 下载docker-compose插件
curl -L https://github.com/docker/compose/releases/download/1.23.2/docker-compose-$(uname -s)-$(uname -m) -o /usr/local/bin/docker-compose
2. 赋予权限
chmod +x /usr/local/bin/docker-compose
二、搭建ELK+KAFKA环境
内存建议4g及以上
2.1. 编写docker-compose
cd /app/
mkdir mayiktelkkafka
上传docker-compose.yml
version: '2'
services:zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafkavolumes:- /etc/localtime:/etc/localtimeports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.122.128KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_PORT: 9092KAFKA_LOG_RETENTION_HOURS: 120KAFKA_MESSAGE_MAX_BYTES: 10000000KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000KAFKA_NUM_PARTITIONS: 3KAFKA_DELETE_RETENTION_MS: 1000kafka-manager:image: sheepkiller/kafka-managerenvironment:ZK_HOSTS: 192.168.122.128ports:- "9001:9001"elasticsearch:image: daocloud.io/library/elasticsearch:6.5.4restart: alwayscontainer_name: elasticsearchports:- "9200:9200"kibana:image: daocloud.io/library/kibana:6.5.4restart: alwayscontainer_name: kibanaports:- "5601:5601"environment:- elasticsearch_url=http://192.168.122.128:9200depends_on:- elasticsearch
2.2. 启动docker-compose
docker-compose up
这个错误需要你检查一下命令后面是否有多余的空格,删除重新运行即可
启动成功后的效果图
成功启动后有5个容器,如果容器个数不够根据容器ID查看日志,我使用的是虚拟机,启动后es容器启动失败,查查看日志
异常信息+解决方案->跳转:max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
2.3. 验证效果
访问zk:http://192.168.122.128:2181
访问es:http://192.168.122.128:9200
访问kibana:http://192.168.122.128:5601/app/kibana#/home?_g=()
2.4. 安装logstash
提前安装jdk环境,logstash需要https://blog.csdn.net/weixin_40816738/article/details/108532702
上传或者下载logstash-6.4.3.tar.gz到服务器中
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.3.tar.gz
解压
tar -zxvf logstash-6.4.3.tar.gz
安装插件
cd logstash-6.4.3
bin/logstash-plugin install logstash-input-kafka
bin/logstash-plugin install logstash-output-elasticsearch
编写配置文件
cd config
vim elk-kafka.conf
内容如下
input {kafka {bootstrap_servers => "192.168.122.128:9092"topics => "mayikt-log"}
}
filter {#Only matched data are send to output.
}output {elasticsearch {action => "index" #The operation on EShosts => "192.168.122.128:9200" #Ellasticsearch host,can be array.index => "mayikt_logs" #The index towrite data to.}
}
启动logstash
cd bin
./logstash -f ../config/elk-kafka.conf
三、微信项目投递消息kafka
3.1. 微信集成kafka
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
3.2. 配置kafka
bootstrap.yml
spring:kafka:bootstrap-servers: 192.168.122.128:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default_consumer_group #群组IDenable-auto-commit: trueauto-commit-interval: 1000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.3. aop拦截
package com.mayikt.api.impl.elk.log;import com.alibaba.fastjson.JSONObject;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;/*** * elk+kafka采集*/
@Aspect
@Component
public class AopLogAspect {@Value("${server.port}")private String serverPort;@Autowiredprivate LogContainer logContainer;// 申明一个切点 里面是 execution表达式@Pointcut("execution(* com.mayikt.api.impl.*.*.*(..))")private void serviceAspect() {}//// 请求method前打印内容@Before(value = "serviceAspect()")public void methodBefore(JoinPoint joinPoint) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式jsonObject.put("request_time", df.format(new Date()));jsonObject.put("request_url", request.getRequestURL().toString());jsonObject.put("request_method", request.getMethod());jsonObject.put("signature", joinPoint.getSignature());jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs()));// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);JSONObject requestJsonObject = new JSONObject();requestJsonObject.put("request", jsonObject);jsonObject.put("request_time", df.format(new Date()));jsonObject.put("log_type", "info");// 将日志信息投递到kafka中String log = requestJsonObject.toJSONString();logContainer.putLog(log);}//// 在方法执行完结后打印返回内容@AfterReturning(returning = "o", pointcut = "serviceAspect()")public void methodAfterReturing(Object o) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject respJSONObject = new JSONObject();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式jsonObject.put("response_time", df.format(new Date()));jsonObject.put("response_content", JSONObject.toJSONString(o));// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);jsonObject.put("log_type", "info");respJSONObject.put("response", jsonObject);// 将日志信息投递到kafka中
// kafkaTemplate.send("mayikt-log",respJSONObject.toJSONString());logContainer.putLog(respJSONObject.toJSONString());}
//
///*** 异常通知** @param point*/@AfterThrowing(pointcut = "serviceAspect()", throwing = "e")public void serviceAspect(JoinPoint point, Exception e) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式jsonObject.put("request_time", df.format(new Date()));jsonObject.put("request_url", request.getRequestURL().toString());jsonObject.put("request_method", request.getMethod());jsonObject.put("signature", point.getSignature());jsonObject.put("request_args", Arrays.toString(point.getArgs()));jsonObject.put("error", e.toString());// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);jsonObject.put("log_type", "error");JSONObject requestJsonObject = new JSONObject();requestJsonObject.put("request", jsonObject);// 将日志信息投递到kafka中String log = requestJsonObject.toJSONString();logContainer.putLog(log);}
//public static String getIpAddr(HttpServletRequest request) {//X-Forwarded-For(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。String ipAddress = request.getHeader("x-forwarded-for");if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getHeader("Proxy-Client-IP");}if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getHeader("WL-Proxy-Client-IP");}if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getRemoteAddr();if (ipAddress.equals("127.0.0.1") || ipAddress.equals("0:0:0:0:0:0:0:1")) {//根据网卡取本机配置的IPInetAddress inet = null;try {inet = InetAddress.getLocalHost();} catch (UnknownHostException e) {e.printStackTrace();}ipAddress = inet.getHostAddress();}}//对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割if (ipAddress != null && ipAddress.length() > 15) { //"***.***.***.***".length() = 15if (ipAddress.indexOf(",") > 0) {ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));}}return ipAddress;}
}
3.4. 消息投递
package com.mayikt.api.impl.elk.log;import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.LinkedBlockingDeque;@Component
public class LogContainer {private LogThread logThread;@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;public LogContainer() {logThread = new LogThread();logThread.start();}private static LinkedBlockingDeque<String> logs = new LinkedBlockingDeque<>();/*** 存入一条日志消息到并发队列中** @param log*/public void putLog(String log) {logs.offer(log);}/*** 异步日志线程 实时从队列中获取内容*/class LogThread extends Thread {@Overridepublic void run() {while (true) {/*** 代码的优化* 当前线程批量获取多条日志消息 投递kafka 批量**/String log = logs.poll();if (!StringUtils.isEmpty(log)) {/// 将该消息投递到kafka中 批量形式投递kafkakafkaTemplate.send("mayikt-log", log);}}}}}
3.5. 测试接口
package com.mayikt.api.weixin;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;public interface WeChatService {/*** feign rpc远程调用 405* @param a* @return*/@GetMapping("/getWeChat")String getWeChat( @RequestParam("a")Integer a);
}
3.6. apipost 发送请求
http://localhost:9000/getWeChat?a=123456888
3.7. kibana 查看日志