java打印设备集中管理_Kafka+Log4j实现日志集中管理

记录如何使用Kafka+Log4j实现集中日志管理的过程。

引言

前面写的《Spring+Log4j+ActiveMQ实现远程记录日志——实战+分析》得到了许多同学的认可,在认可的同时,也有同学提出可以使用Kafka来集中管理日志,于是今天就来学习一下。

特别说明,由于网络上关于Kafka+Log4j的完整例子并不多,我也是一边学习一边使用,因此如果有解释得不好或者错误的地方,欢迎批评指正,如果你有好的想法,也欢迎留言探讨。

第一部分 搭建Kafka环境

安装Kafka

下载:http://kafka.apache.org/downloads.html

tar zxf kafka-.tgz

cd kafka-

启动Zookeeper

启动Zookeeper前需要配置一下config/zookeeper.properties:

83857f0857fe4d32c3c7e11eb216bfb6.png

接下来启动Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

启动Kafka Server

启动Kafka Server前需要配置一下config/server.properties。主要配置以下几项,内容就不说了,注释里都很详细:

3333b8c32b6ffce5dfe17b00452585c3.png

然后启动Kafka Server:

bin/kafka-server-start.sh config/server.properties

创建Topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看创建的Topic

>bin/kafka-topics.sh --list --zookeeper localhost:2181

启动控制台Producer,向Kafka发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a message

This is another message

^C

启动控制台Consumer,消费刚刚发送的消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

This is a message

This is another message

删除Topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

注:只有当delete.topic.enable=true时,该操作才有效

配置Kafka集群(单台机器上)

首先拷贝server.properties文件为多份(这里演示4个节点的Kafka集群,因此还需要拷贝3份配置文件):

cp config/server.properties config/server1.properties

cp config/server.properties config/server2.properties

cp config/server.properties config/server3.properties

修改server1.properties的以下内容:

broker.id=1

port=9093

log.dir=/tmp/kafka-logs-1

同理修改server2.properties和server3.properties的这些内容,并保持所有配置文件的zookeeper.connect属性都指向运行在本机的zookeeper地址localhost:2181。注意,由于这几个Kafka节点都将运行在同一台机器上,因此需要保证这几个值不同,这里以累加的方式处理。例如在server2.properties上:

broker.id=2

port=9094

log.dir=/tmp/kafka-logs-2

把server3.properties也配置好以后,依次启动这些节点:

bin/kafka-server-start.sh config/server1.properties &

bin/kafka-server-start.sh config/server2.properties &

bin/kafka-server-start.sh config/server3.properties &

Topic & Partition

Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。

现在在Kafka集群上创建备份因子为3,分区数为4的Topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic kafka

说明:备份因子replication-factor越大,则说明集群容错性越强,就是当集群down掉后,数据恢复的可能性越大。所有的分区数里的内容共同组成了一份数据,分区数partions越大,则该topic的消息就越分散,集群中的消息分布就越均匀。

然后使用kafka-topics.sh的--describe参数查看一下Topic为kafka的详情:

dbd43236194378779662d12153c1341b.png

输出的第一行是所有分区的概要,接下来的每一行是一个分区的描述。可以看到Topic为kafka的消息,PartionCount=4,ReplicationFactor=3正是我们创建时指定的分区数和备份因子。

另外:Leader是指负责这个分区所有读写的节点;Replicas是指这个分区所在的所有节点(不论它是否活着);ISR是Replicas的子集,代表存有这个分区信息而且当前活着的节点。

拿partition:0这个分区来说,该分区的Leader是server0,分布在id为0,1,2这三个节点上,而且这三个节点都活着。

再来看下Kafka集群的日志:

9c8fa303ef3ec37abec34f27bd50456b.png

其中kafka-logs-0代表server0的日志,kafka-logs-1代表server1的日志,以此类推。

从上面的配置可知,id为0,1,2,3的节点分别对应server0, server1, server2, server3。而上例中的partition:0分布在id为0, 1, 2这三个节点上,因此可以在server0, server1, server2这三个节点上看到有kafka-0这个文件夹。这个kafka-0就代表Topic为kafka的partion0。

第二部分 Kafka+Log4j项目整合

先来看下Maven项目结构图:

4c0e92f2823076f25c682040d1e406af.png

作为Demo,文件不多。先看看pom.xml引入了哪些jar包:

org.apache.kafka

kafka_2.9.2

0.8.2.1

org.apache.kafka

kafka-clients

0.8.2.1

com.google.guava

guava

18.0

重要的内容是log4j.properties:

log4j.rootLogger=INFO,console

# for package com.demo.kafka, log would be sent to kafka appender.

log4j.logger.com.demo.kafka=DEBUG,kafka

# appender kafka

log4j.appender.kafka=kafka.producer.KafkaLog4jAppender

log4j.appender.kafka.topic=kafka

# multiple brokers are separated by comma ",".

log4j.appender.kafka.brokerList=localhost:9092, localhost:9093, localhost:9094, localhost:9095

log4j.appender.kafka.compressionType=none

log4j.appender.kafka.syncSend=true

log4j.appender.kafka.layout=org.apache.log4j.PatternLayout

log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

# appender console

log4j.appender.console=org.apache.log4j.ConsoleAppender

log4j.appender.console.target=System.out

log4j.appender.console.layout=org.apache.log4j.PatternLayout

log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

App.Java里面就很简单啦,主要是通过log4j输出日志:

package com.demo.kafka;

import org.apache.log4j.Logger;

public class App {

private static final Logger LOGGER = Logger.getLogger(App.class);

public static void main(String[] args) throws InterruptedException {

for (int i = 0; i < 20; i++) {

LOGGER.info("Info [" + i + "]");

Thread.sleep(1000);

}

}

}

MyConsumer.java用于消费kafka中的信息:

package com.demo.kafka;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import com.google.common.collect.ImmutableMap;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.MessageAndMetadata;

public class MyConsumer {

private static final String ZOOKEEPER = "localhost:2181";

//groupName可以随意给,因为对于kafka里的每条消息,每个group都会完整的处理一遍

private static final String GROUP_NAME = "test_group";

private static final String TOPIC_NAME = "kafka";

private static final int CONSUMER_NUM = 4;

private static final int PARTITION_NUM = 4;

public static void main(String[] args) {

// specify some consumer properties

Properties props = new Properties();

props.put("zookeeper.connect", ZOOKEEPER);

props.put("zookeeper.connectiontimeout.ms", "1000000");

props.put("group.id", GROUP_NAME);

// Create the connection to the cluster

ConsumerConfig consumerConfig = new ConsumerConfig(props);

ConsumerConnector consumerConnector =

Consumer.createJavaConsumerConnector(consumerConfig);

// create 4 partitions of the stream for topic “test”, to allow 4

// threads to consume

Map>> topicMessageStreams =

consumerConnector.createMessageStreams(

ImmutableMap.of(TOPIC_NAME, PARTITION_NUM));

List> streams = topicMessageStreams.get(TOPIC_NAME);

// create list of 4 threads to consume from each of the partitions

ExecutorService executor = Executors.newFixedThreadPool(CONSUMER_NUM);

// consume the messages in the threads

for (final KafkaStream stream : streams) {

executor.submit(new Runnable() {

public void run() {

for (MessageAndMetadata msgAndMetadata : stream) {

// process message (msgAndMetadata.message())

System.out.println(new String(msgAndMetadata.message()));

}

}

});

}

}

}

MyProducer.java用于向Kafka发送消息,但不通过log4j的appender发送。此案例中可以不要。但是我还是放在这里:

package com.demo.kafka;

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

public class MyProducer {

private static final String TOPIC = "kafka";

private static final String CONTENT = "This is a single message";

private static final String BROKER_LIST = "localhost:9092";

private static final String SERIALIZER_CLASS = "kafka.serializer.StringEncoder";

public static void main(String[] args) {

Properties props = new Properties();

props.put("serializer.class", SERIALIZER_CLASS);

props.put("metadata.broker.list", BROKER_LIST);

ProducerConfig config = new ProducerConfig(props);

Producer producer = new Producer(config);

//Send one message.

KeyedMessage message =

new KeyedMessage(TOPIC, CONTENT);

producer.send(message);

//Send multiple messages.

List> messages =

new ArrayList>();

for (int i = 0; i < 5; i++) {

messages.add(new KeyedMessage

(TOPIC, "Multiple message at a time. " + i));

}

producer.send(messages);

}

}

到这里,代码就结束了。

第三部分 运行与验证

先运行MyConsumer,使其处于监听状态。同时,还可以启动Kafka自带的ConsoleConsumer来验证是否跟MyConsumer的结果一致。最后运行App.java。

先来看看MyConsumer的输出:

668ecc7b77882b29a72d6b96886dad94.png

再来看看ConsoleConsumer的输出:

0f1e9f2b194da418f0b1a3ce3145ba22.png

可以看到,尽管发往Kafka的消息去往了不同的地方,但是内容是一样的,而且一条也不少。最后再来看看Kafka的日志。

我们知道,Topic为kafka的消息有4个partion,从之前的截图可知这4个partion均匀分布在4个kafka节点上,于是我对每一个partion随机选取一个节点查看了日志内容。

上图中黄色选中部分依次代表在server0上查看partion0,在server1上查看partion1,以此类推。

而红色部分是日志内容,由于在创建Topic时准备将20条日志分成4个区存储,可以很清楚的看到,这20条日志确实是很均匀的存储在了几个partion上。

摘一点Infoq上的话:每个日志文件都是一个log entrie序列,每个log entrie包含一个4字节整型数值(值为N+5),1个字节的"magic value",4个字节的CRC校验码,其后跟N个字节的消息体。每条消息都有一个当前Partition下唯一的64字节的offset,它指明了这条消息的起始位置。磁盘上存储的消息格式如下:

message length : 4 bytes (value: 1+4+n)

"magic" value : 1 byte

crc : 4 bytes

payload : n bytes

这里我们看到的日志文件的每一行,就是一个log entrie,每一行前面无法显示的字符(蓝色选中部分),就是(message length + magic value + crc)了。而log entrie的后部分,则是消息体的内容了。

问题:

1. 如果要使用此种方式,有一种场景是提取某天或者某小时的日志,那么如何设计Topic呢?是不是要在Topic上带入日期或者小时数?还有更好的设计方案吗?

2. 假设按每小时设计Topic,那么如何在使用诸如logger.info()这样的方法时,自动根据时间去改变Topic呢?有类似的例子吗?

----欢迎交流,共同进步。

样例下载:

------------------------------------------分割线------------------------------------------

具体下载目录在 /2015年资料/12月/13日/Kafka+Log4j实现日志集中管理

------------------------------------------分割线------------------------------------------

参考页面:

相关阅读:

Kafka 的详细介绍:请点这里

Kafka 的下载地址:请点这里

0b1331709591d260c1c78e86d0c51c18.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/308415.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

7-27 家谱处理 (30 分)(详解+map做法)map真香啊

一&#xff1a;题目 人类学研究对于家族很感兴趣&#xff0c;于是研究人员搜集了一些家族的家谱进行研究。实验中&#xff0c;使用计算机处理家谱。为了实现这个目的&#xff0c;研究人员将家谱转换为文本文件。下面为家谱文本文件的实例&#xff1a; John Robert Frank Andr…

微软开源基于 Envoy 的服务网格 Open Service Mesh

原文地址&#xff1a;https://techcrunch.com/2020/08/05/microsoft-launches-open-service-mesh/Open Service Mesh&#xff08;OSM&#xff09;是一个轻量级的、可扩展的、云原生的服务网格&#xff0c;它允许用户对高度动态的微服务环境进行统一管理、安全保护&#xff0c;并…

java servlet jsp javabean关系图_Servlet+JSP+JavaBean开发模式(MVC)介绍

好伤心...写登陆注册之前看见一篇很好的博文&#xff0c;没有收藏&#xff0c;然后找不到了。前几天在知乎上看见一个问题&#xff0c;什么时候感觉最无力。前两天一直想回答&#xff1a;尝试过google到的所有solve case&#xff0c;结果bug依然在。今天想回答&#xff1a;明明…

7-28 搜索树判断 (25 分)(思路加详解) just easy!

一&#xff1a;题目 对于二叉搜索树&#xff0c;我们规定任一结点的左子树仅包含严格小于该结点的键值&#xff0c;而其右子树包含大于或等于该结点的键值。如果我们交换每个节点的左子树和右子树&#xff0c;得到的树叫做镜像二叉搜索树。 现在我们给出一个整数键值序列&…

Azure DevOps+Docker+Asp.NET Core 实现CI/CD(一 .简介与创建自己的代理池)

前言本文主要是讲解如何使用Azure DevOpsDocker 来实现持续集成Asp.NET Core项目(当然 也可以是任意项目).打算用三个篇幅来记录完整的全过程觉得有帮助的朋友~可以左上角点个关注,右下角点个推荐CI/CD简介首先,我们先来简单的介绍一下什么是CI/CDCI全拼Continuous Integration…

7-31 笛卡尔树(25分)(题目分析+简单算法+详解+思路)

一&#xff1a;题目 7-31 笛卡尔树 (25 分) 笛卡尔树是一种特殊的二叉树&#xff0c;其结点包含两个关键字K1和K2。首先笛卡尔树是关于K1的二叉搜索树&#xff0c;即结点左子树的所有K1值都比该结点的K1值小&#xff0c;右子树则大。其次所有结点的K2关键字满足优先队列&#…

不仅性能秒杀Hadoop,现在连分布式集群功能也开源了

就在昨天&#xff08;2020年8月3日&#xff09;&#xff0c;涛思数据团队正式宣布&#xff0c;物联网大数据平台TDengine集群版开源。此次开源&#xff0c;我们在GitHub上传了23.9万行源代码&#xff0c;1198个源文件&#xff0c;包含我自己疫情期间写的一万余行C代码&#xff…

7-32 哥尼斯堡的“七桥问题” (25 分)(思路+详解+题目分析)两种做法任选其一

一&#xff1a;题目&#xff1a; 哥尼斯堡是位于普累格河上的一座城市&#xff0c;它包含两个岛屿及连接它们的七座桥&#xff0c;如下图所示。 可否走过这样的七座桥&#xff0c;而且每桥只走过一次&#xff1f;瑞士数学家欧拉(Leonhard Euler&#xff0c;1707—1783)最终解…

一次简单的服务器 cpu 占用率高的快速排查实战

前两天&#xff0c;朋友遇到一个线上 cpu 占用率很高的问题&#xff0c;我们俩一起快速定位并解决了这个问题。在征求朋友同意后&#xff0c;特发此文分享整个过程。本文以对话的形式展开&#xff0c;加上我的内心独白。文中对话与实际对话略有出入。友&#xff1a; 在吗&#…

7-33 地下迷宫探索 (30 分)(思路加详解)

一&#xff1a;题目 7-33 地下迷宫探索 (30 分)地道战是在抗日战争时期&#xff0c;在华北平原上抗日军民利用地道打击日本侵略者的作战方式。地道网是房连房、街连街、村连村的地下工事&#xff0c;如下图所示。 我们在回顾前辈们艰苦卓绝的战争生活的同时&#xff0c;真心钦…

联通定时休眠5G基站 戳破皇帝的新衣

近年来&#xff0c;5G被欧美政客、大公司、媒体连番炒作&#xff0c;在公开舆论上&#xff0c;5G成为了“科技制高点”&#xff0c;成为决定国家命运的“外星科技”&#xff0c;个别明星企业家还声称&#xff0c;“5G改变社会”&#xff0c;“5G应用后美国将成为落后国家”。但…

java中的线程不安全和实例解析

一&#xff1a;引言&#xff08;特指单核&#xff09; 所谓线程不安全&#xff0c;就是在共享数据时&#xff0c;不同的线程在执行时&#xff0c;出现数据的不准确&#xff0c;&#xff08;以模拟抢票和模拟银行取钱为例&#xff09;&#xff0c;那么我们的线程不安全具体指的…

记近一年线上项目经验及架构变更记录

简介M 项目, 是一个电子社保业务系统&#xff0c;2019.8 月团队接手了这个项目的开发工作&#xff0c;到 2020.7 月客户的业务量翻了&#xff14;倍&#xff0c;工作日同时在线员工数量&#xff14;&#xff10;人&#xff0c;以下记录总结 2019.8-至今项目的架构变化&#xff…

拓扑排序C++实现+实例解析(详解 兄弟们冲呀呀呀呀呀呀呀)

一&#xff1a;引言 既然是一种排序&#xff0c;那么肯定是按照某种规则进行排序&#xff0c;那么这么想的话&#xff0c;先了解基本知识&#xff0c;再来实战演练 1. AOV网&#xff08;Activity On Vertex Network)【顶点——表示活动】 是一个——有向无回路的图 顶点——表…

7-34 任务调度的合理性 (25 分)(思路加详解+兄弟们冲呀)

一&#xff1a;题目 假定一个工程项目由一组子任务构成&#xff0c;子任务之间有的可以并行执行&#xff0c;有的必须在完成了其它一些子任务后才能执行。“任务调度”包括一组子任务、以及每个子任务可以执行所依赖的子任务集。 比如完成一个专业的所有课程学习和毕业设计可…

.NET和.NET Core Web APi FormData多文件上传

【导读】最近因维护.NET和.NET Core项目用到文件上传功能&#xff0c;虽说也做过&#xff0c;但是没做过什么对比&#xff0c;借此将二者利用Ajax通过FormData上传文件做一个总结&#xff0c;通过视图提交表单太简单&#xff0c;这里不做阐述&#xff0c;希望对有需要的童鞋能有…

在ubuntu上实现基于webrtc的多人在线视频聊天服务

最近研究webrtc视频直播技术&#xff0c;网上找了些教程最终都不太能顺利跑起来的&#xff0c;可能是文章写的比较老&#xff0c;使用的一些开源组件已经更新了&#xff0c;有些配置已经不太一样了&#xff0c;所以按照以前的步骤会有问题。折腾了一阵终于跑起来了&#xff0c;…

java并发练习之快乐影院

一&#xff1a;引言 这里是加了个同步块&#xff0c;来保证数据的准确性&#xff0c;用了个容器使&#xff0c;我们可以选位置 二&#xff1a;上码&#xff08;这里是模拟在电影院选位置&#xff09; package com.wyj.three;import java.util.ArrayList; import java.util.L…

Azure DevOps+Docker+Asp.NET Core 实现CI/CD(二.创建CI持续集成管道)

前言本文主要是讲解如何使用Azure DevOpsDocker 来实现持续集成Asp.NET Core项目(当然 也可以是任意项目).上一篇:Azure DevOpsDockerAsp.NET Core 实现CI/CD(一 .简介与创建自己的代理池)觉得有帮助的朋友~可以左上角点个关注,右下角点个推荐今天我们废话不多说 直接开始正文 …

7-35 城市间紧急救援 (25 分)(思路加详解)

一&#xff1a;题目 作为一个城市的应急救援队伍的负责人&#xff0c;你有一张特殊的全国地图。在地图上显示有多个分散的城市和一些连接城市的快速道路。每个城市的救援队数量和每一条连接两个城市的快速道路长度都标在地图上。当其他城市有紧急求助电话给你的时候&#xff0…