分布式ELK+KAFKA日志采集 docker-compose

在这里插入图片描述
在这里插入图片描述

文章目录

          • 一、安装docker-compose插件
            • 1. 下载docker-compose插件
            • 2. 赋予权限
          • 二、搭建ELK+KAFKA环境
            • 2.1. 编写docker-compose
            • 2.2. 启动docker-compose
            • 2.3. 验证效果
            • 2.4. 安装logstash
          • 三、微信项目投递消息kafka
            • 3.1. 微信集成kafka
            • 3.2. 配置kafka
            • 3.3. aop拦截
            • 3.4. 消息投递
            • 3.5. 测试接口
            • 3.6. apipost 发送请求
            • 3.7. kibana 查看日志

一、安装docker-compose插件
1. 下载docker-compose插件
curl -L https://github.com/docker/compose/releases/download/1.23.2/docker-compose-$(uname -s)-$(uname -m) -o /usr/local/bin/docker-compose
2. 赋予权限
chmod +x /usr/local/bin/docker-compose
二、搭建ELK+KAFKA环境

内存建议4g及以上

2.1. 编写docker-compose
cd /app/
mkdir mayiktelkkafka

上传docker-compose.yml

version: '2'
services:zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafkavolumes:- /etc/localtime:/etc/localtimeports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.122.128KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_PORT: 9092KAFKA_LOG_RETENTION_HOURS: 120KAFKA_MESSAGE_MAX_BYTES: 10000000KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000KAFKA_NUM_PARTITIONS: 3KAFKA_DELETE_RETENTION_MS: 1000kafka-manager:image: sheepkiller/kafka-managerenvironment:ZK_HOSTS: 192.168.122.128ports:- "9001:9001"elasticsearch:image: daocloud.io/library/elasticsearch:6.5.4restart: alwayscontainer_name: elasticsearchports:- "9200:9200"kibana:image: daocloud.io/library/kibana:6.5.4restart: alwayscontainer_name: kibanaports:- "5601:5601"environment:- elasticsearch_url=http://192.168.122.128:9200depends_on:- elasticsearch
2.2. 启动docker-compose
docker-compose up

在这里插入图片描述

这个错误需要你检查一下命令后面是否有多余的空格,删除重新运行即可

启动成功后的效果图
在这里插入图片描述
成功启动后有5个容器,如果容器个数不够根据容器ID查看日志,我使用的是虚拟机,启动后es容器启动失败,查查看日志
异常信息+解决方案->跳转:max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

2.3. 验证效果

访问zk:http://192.168.122.128:2181
在这里插入图片描述
访问es:http://192.168.122.128:9200
在这里插入图片描述
访问kibana:http://192.168.122.128:5601/app/kibana#/home?_g=()
在这里插入图片描述

2.4. 安装logstash

提前安装jdk环境,logstash需要https://blog.csdn.net/weixin_40816738/article/details/108532702

在这里插入图片描述
上传或者下载logstash-6.4.3.tar.gz到服务器中

wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.3.tar.gz

解压

tar -zxvf logstash-6.4.3.tar.gz

安装插件

cd logstash-6.4.3
bin/logstash-plugin install logstash-input-kafka
bin/logstash-plugin install logstash-output-elasticsearch

在这里插入图片描述
编写配置文件

cd config
vim elk-kafka.conf

内容如下

input {kafka {bootstrap_servers => "192.168.122.128:9092"topics => "mayikt-log"}
}
filter {#Only matched data are send to output.
}output {elasticsearch {action => "index"  #The operation on EShosts  => "192.168.122.128:9200" #Ellasticsearch host,can be array.index  => "mayikt_logs" #The index towrite data to.}
}

启动logstash

cd bin
./logstash -f ../config/elk-kafka.conf
三、微信项目投递消息kafka
3.1. 微信集成kafka
    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
3.2. 配置kafka

bootstrap.yml

spring:kafka:bootstrap-servers: 192.168.122.128:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default_consumer_group #群组IDenable-auto-commit: trueauto-commit-interval: 1000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.3. aop拦截
package com.mayikt.api.impl.elk.log;import com.alibaba.fastjson.JSONObject;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;/*** * elk+kafka采集*/
@Aspect
@Component
public class AopLogAspect {@Value("${server.port}")private String serverPort;@Autowiredprivate LogContainer logContainer;// 申明一个切点 里面是 execution表达式@Pointcut("execution(* com.mayikt.api.impl.*.*.*(..))")private void serviceAspect() {}//// 请求method前打印内容@Before(value = "serviceAspect()")public void methodBefore(JoinPoint joinPoint) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式jsonObject.put("request_time", df.format(new Date()));jsonObject.put("request_url", request.getRequestURL().toString());jsonObject.put("request_method", request.getMethod());jsonObject.put("signature", joinPoint.getSignature());jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs()));// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);JSONObject requestJsonObject = new JSONObject();requestJsonObject.put("request", jsonObject);jsonObject.put("request_time", df.format(new Date()));jsonObject.put("log_type", "info");// 将日志信息投递到kafka中String log = requestJsonObject.toJSONString();logContainer.putLog(log);}//// 在方法执行完结后打印返回内容@AfterReturning(returning = "o", pointcut = "serviceAspect()")public void methodAfterReturing(Object o) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject respJSONObject = new JSONObject();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式jsonObject.put("response_time", df.format(new Date()));jsonObject.put("response_content", JSONObject.toJSONString(o));// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);jsonObject.put("log_type", "info");respJSONObject.put("response", jsonObject);// 将日志信息投递到kafka中
//      kafkaTemplate.send("mayikt-log",respJSONObject.toJSONString());logContainer.putLog(respJSONObject.toJSONString());}
//
///*** 异常通知** @param point*/@AfterThrowing(pointcut = "serviceAspect()", throwing = "e")public void serviceAspect(JoinPoint point, Exception e) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式jsonObject.put("request_time", df.format(new Date()));jsonObject.put("request_url", request.getRequestURL().toString());jsonObject.put("request_method", request.getMethod());jsonObject.put("signature", point.getSignature());jsonObject.put("request_args", Arrays.toString(point.getArgs()));jsonObject.put("error", e.toString());// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);jsonObject.put("log_type", "error");JSONObject requestJsonObject = new JSONObject();requestJsonObject.put("request", jsonObject);// 将日志信息投递到kafka中String log = requestJsonObject.toJSONString();logContainer.putLog(log);}
//public static String getIpAddr(HttpServletRequest request) {//X-Forwarded-For(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。String ipAddress = request.getHeader("x-forwarded-for");if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getHeader("Proxy-Client-IP");}if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getHeader("WL-Proxy-Client-IP");}if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getRemoteAddr();if (ipAddress.equals("127.0.0.1") || ipAddress.equals("0:0:0:0:0:0:0:1")) {//根据网卡取本机配置的IPInetAddress inet = null;try {inet = InetAddress.getLocalHost();} catch (UnknownHostException e) {e.printStackTrace();}ipAddress = inet.getHostAddress();}}//对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割if (ipAddress != null && ipAddress.length() > 15) { //"***.***.***.***".length() = 15if (ipAddress.indexOf(",") > 0) {ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));}}return ipAddress;}
}
3.4. 消息投递
package com.mayikt.api.impl.elk.log;import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.LinkedBlockingDeque;@Component
public class LogContainer {private LogThread logThread;@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;public LogContainer() {logThread = new LogThread();logThread.start();}private static LinkedBlockingDeque<String> logs = new LinkedBlockingDeque<>();/*** 存入一条日志消息到并发队列中** @param log*/public void putLog(String log) {logs.offer(log);}/*** 异步日志线程 实时从队列中获取内容*/class LogThread extends Thread {@Overridepublic void run() {while (true) {/*** 代码的优化* 当前线程批量获取多条日志消息 投递kafka   批量**/String log = logs.poll();if (!StringUtils.isEmpty(log)) {/// 将该消息投递到kafka中 批量形式投递kafkakafkaTemplate.send("mayikt-log", log);}}}}}
3.5. 测试接口
package com.mayikt.api.weixin;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;public interface WeChatService {/*** feign rpc远程调用 405* @param a* @return*/@GetMapping("/getWeChat")String getWeChat( @RequestParam("a")Integer a);
}
3.6. apipost 发送请求
http://localhost:9000/getWeChat?a=123456888

在这里插入图片描述

3.7. kibana 查看日志

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/515216.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java安全编码指南之:基础篇

简介&#xff1a; 作为一个程序员&#xff0c;只是写出好用的代码是不够的&#xff0c;我们还需要考虑到程序的安全性。在这个不能跟陌生人说话世界&#xff0c;扶老奶奶过马路都是一件很困难的事情。那么对于程序员来说&#xff0c;尤其是对于开发那种对外可以公开访问的网站的…

Gartner:70%新应用由低代码开发,AI热潮后小数据崛起

作者 | 宋慧 出品 | CSDN云计算 头图 | 付费下载于东方IC 国际研究机构Gartner在近日发布了2021年十大数据和分析趋势。纵观这十个趋势&#xff0c;基本可以归纳为三类主题&#xff0c;分别是&#xff1a; 加速数据和分析变革&#xff1a;运用AI创新、经过改进的可组合性以及…

Sentinel 1.8.0 年度版本发布,熔断降级重构升级!

在经过数月的打磨后&#xff0c;Sentinel 1.8.0 版本正式发布&#xff01;该版本是本年度最重要的版本之一&#xff0c;包含大量特性改进与 bug 修复&#xff0c;尤其是针对熔断降级特性的完善升级&#xff08;支持任意统计时长、慢调用比例降级策略、熔断器事件监听&#xff0…

清华大学-美团数字生活联合研究院成立

转载自清华新闻网 4月12日&#xff0c;清华大学-美团数字生活联合研究院&#xff08;以下简称“清华美团数字生活研究院”&#xff09;揭牌仪式暨管委会第一次会议在清华大学举行。仪式上&#xff0c;清华大学副校长杨斌与美团联合创始人王慧文共同为联合研究院揭牌。 杨斌表…

SpringCloud 应用在 Kubernetes 上的最佳实践 — 高可用(熔断)

前言 阿里巴巴十多年的双十一&#xff0c;锤炼出来了一套业界领先的高可用技术&#xff0c;有一些已经商业化&#xff08;云产品 PTS、AHAS&#xff09;&#xff0c;也有的开源了如&#xff1a;Sentinel、ChaosBlade。我们这一系列的高可用章节也主要介绍这方面的内容。今天介…

shadingjdbc实战分表分库

文章目录一、问题汇总1. 水平与垂直拆分之间的区别&#xff1f;2. 单表达到多大量开始进行分库分表&#xff1f;3. 基于客户端与服务端实现分表分库区别&#xff1f;4. 数据库分表分库策略有哪些&#xff1f;5. 自定义范围分表算法实现分表?二、整合ShardingSphere实现分表2.1…

阿里云机器学习怎么玩?这本新手入门指南揭秘了!

想知道我是怎样免费在阿里云上玩机器学习的吗&#xff1f; 不慌&#xff0c;这就告诉你答案~ 它来了--阿里云向个人免费开放云端深度学习开发环境DSW&#xff08;DataScienceWorkshop&#xff09;&#xff0c;还有免费GPU资源可以使用&#xff0c;实验的数据还会免费保存30天&a…

华为庞鑫:闪存3.0时代,四大变化激发全闪存数据中心潜能释放

从2005年到2019年间&#xff0c;中国数字经济总体规模由2.6万亿元增加至35.8万亿元&#xff0c;数字经济在GDP的占比也由14.2%提升至36.2%。随着数字经济蓬勃发展&#xff0c;数据也成为当之无愧的关键生产要素&#xff0c;是基础性资源和战略性资源。数据洪流的到来进一步驱动…

基于RabbitMQ订单未支付30分钟自动取消

文章目录一、原理实现1. 超时消费流程图2. 死信队列的架构原理3. 订单超时30分钟实现原理二、核心代码实战2.1. 记录订单待支付数据2.2. 超时消费者监听2.3. 订单核对校验一、原理实现 1. 超时消费流程图 2. 死信队列的架构原理 相同点&#xff1a; 死信队列和普通队列区别不…

蚂蚁mPaaS:有人修建高楼,有人重构城市

简介&#xff1a; 纵览这时代的先声&#xff0c;在高楼之巅&#xff0c;在海天之外。 2018年2月&#xff0c;春运拉开序幕。 这是人类史上最大规模的迁徙活动&#xff0c;3.82亿人坐进车厢&#xff0c;被31万趟车次送往不同的目的地。如果有一台摄影机从高空对准中国大地&…

全场景闪存加速、全场景数据保护,华为助力医院实现智能化转型

数字经济时代的来临&#xff0c;是影响当今医疗健康服务领域最重要的大趋势。在这种大背景下&#xff0c;新时期的智能医疗必将在医疗行业内掀起一阵浪潮。2020年&#xff0c;新冠疫情的肆虐势必推进浪潮的提前到来。 首都医科大学附属北京同仁医院&#xff0c;始建于1886年&a…

从Cloudflare事件,看DNS服务的重要性

简介&#xff1a; 美国时间7月17日&#xff0c;美国知名的网络安全服务提供商Cloudflare&#xff0c;出现了突发网络服务故障。通过这个事件&#xff0c;和大家聊聊关于网络安全稳定的思考&#xff0c;以及稳定、安全的DNS服务的重要性。 7.17事件 美国时间7月17日下午&#…

基于Redis订单未支付30分钟自动取消

文章目录一、原理实现1. 超时消费流程图2. 订单超时30分钟实现原理二、核心代码实战2.1. 记录订单待支付数据2.2. redis配置2.3. 超时消费者监听一、原理实现 1. 超时消费流程图 2. 订单超时30分钟实现原理 ①用户下单之后&#xff0c;投递一个订单号码存放到redis服务端&…

面向 K8s 设计误区

作者 | 姬望来源 | 阿里巴巴中间件头图 | 下载于视觉中国K8s 设计模式Kubernetes 是一个具有普遍意义的容器编排工具&#xff0c;它提供了一套基于容器构建分布式系统的基础依赖&#xff0c;其意义等同于 Linux 在操作系统中的地位&#xff0c;可以认为是分布式的操作系统。自定…

安装docker-compose插件

文章目录一、安装docker-compose插件1. 下载docker-compose插件2. 赋予权限3. 验证一、安装docker-compose插件 1. 下载docker-compose插件 curl -L https://github.com/docker/compose/releases/download/1.23.2/docker-compose-$(uname -s)-$(uname -m) -o /usr/local/bin/…

突围数字化转型,让特步同比增长24.8%的全渠道中台

简介&#xff1a; 多年前&#xff0c;曾有媒体向丁水波提问&#xff1a;“对于你个人来说&#xff0c;转型过程中最痛苦的部分是什么&#xff1f;”“最关键的是市场意识的转变。耳听为虚眼见为实&#xff0c;做起来给外界看到了&#xff0c;他们才会明白和接受。很多东西得做完…

赠书 | 什么是 Knative?

作者 | 李志伟、游杨来源 | 华章计算机头图 | 下载于视觉中国✎ 导读 什么是Knative&#xff1f;本文将对Knative的产生背景及发展历程&#xff0c;架构设计&#xff0c;受众群体等做详细介绍。Knative是由谷歌发起&#xff0c;有Pivotal、IBM、Red Hat等公司共同参与开发的Ser…

canal kafka 实现mysql与es/redis 数据同步

文章目录一、原理实现1. 方案设计流程图2. 实现原理二、mysql开启binlog模式2.1. 配置my.ini2.2. 重启mysql服务2.3. 验证binlog模式2.4. 创建canal账号2.5. 账号验证三、docker-compose环境搭建3.1. 环境总览3.2. 编写docker-compose.yml3.3. 安装docker-compose3.4. 构建环境…

免费下载!《阿里工程师的自我修养》公开10位阿里大牛解决问题的思维方式

简介&#xff1a; 今天&#xff0c;阿里技术公布一波阿里P8、P9技术大牛的思维模型&#xff0c;将他们的思维模式呈现出来。你可以在阿里资深专家职业生涯的真切感悟中&#xff0c;找到应对危机的最佳方法。《阿里工程师的自我修养》现已正式公开&#xff0c;可免费下载阅读。 …

云原生时代消息中间件的演进路线

简介&#xff1a; 本文整理自作者于 2020 年云原生微服务大会上的分享《云原生时代的消息中间件演进》&#xff0c;主要探讨了传统的消息中间件如何持续进化为云原生的消息服务。 作者 | 周礼&#xff08;不铭&#xff09; 阿里巴巴集团消息中间件架构师 导读&#xff1a;本文…