Flume与Kafka整合案例详解

环境配置

名称 版本 下载地址
Centos 7.0 64x 百度
Zookeeper 3.4.5  
Flume 1.6.0  
Kafka 2.1.0  

flume笔记

直接贴配置文件

[root@zero239 kafka_2.10-0.10.1.1]# cat /opt/hadoop/apache-flume-1.6.0-bin/conf/kafka-conf.properties 
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'agent.sources = r1
agent.channels = c1
agent.sinks = s1# For each one of the sources, the type is defined
#agent.sources.r1.type = spooldir
#agent.sources.r1.command = /opt/test/logs/data
#agent.sources.r1.fileHeader = true
#agent.sources.r1.channels = c1agent.sources.r1.type = spooldir
agent.sources.r1.spoolDir = /opt/test/logs/data
agent.sources.r1.fileHeader = true# Each sink's type must be defined#agent.sinks.s1.type = loggeragent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.s1.topic = logstest
agent.sinks.s1.brokerList = zero230:9092
agent.sinks.s1.requiredAcks = 1
agent.sinks.s1.batchSize = 2# Each channel's type is defined.
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100
agent.sources.r1.channels = c1
agent.sinks.s1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

配置Kafka

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.# see kafka.server.KafkaConfig for additional details and defaults############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=2# Switch to enable topic deletion or not, default value is false
#delete.topic.enable=true############################# Socket Server Settings ############################## The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = security_protocol://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092# The number of threads handling network requests
num.network.threads=3# The number of threads doing disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma seperated list of directories under which to store log files
log.dirs=/opt/hadoop/kafka_2.10-0.10.1.1/logs/tmp# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=zero230:2181,zero231:2181,zero239:2181# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120

我已经配置了集群Zookeeper所以在这里我指定是我配置的Zookeeper地址如果你没有配置的话可以直接使用Kafka内置的Zokeeper

Zookeeper集群搭建配置

启动Kafka验证是否成功

  1. 启动Zookeeper 如果没有配置集群的这一步跳过
  2. 启动Kafka内置Zookeeper

    bin/zookeeper-server-start.sh config/zookeeper.properties

3.启动Kafka

server1.properties 为刚刚自己编辑的名称
bin/kafka-server-start.sh config/server1.properties
  • 1
  • 2
  • 3

4.创建一个名为logstesttopic

./bin/kafka-topics.sh --create --zookeeper zero230:2181 --replication-factor 1 --partitions 1 --topic logstest
  • 1
  • 2

5.查看Topic是否创建成功

./bin/kafka-topics.sh --list --zookeeper localhost:2181
  • 1
  • 2

6.创建一个生产端(相当于是一个已经数据产生的用户吧)这样容易理解

bin/kafka-console-producer.sh --broker-list zero230:9092 --topic logstest
  • 1
  • 2

7.创建一个消费端(意思就是可以看到生产者意思就是生产出来的数据可以看到输出)

bin/kafka-console-consumer.sh --zookeeper zero230:2181 --topic logstest --from-beginning
  • 1
  • 2

启动验证Flume是否能与Kafka对接

[root@zero239 apache-flume-1.6.0-bin]# ./bin/flume-ng agent --conf conf -f ./conf/kafka-conf.properties -n agent -Dflume.root.logger=INFO,console
  • 1
  • 2

对接成功截图

各位同学可以看到在Flumesinks配置中我设置的是Kafka意思就是输出到Kafka

agent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.s1.topic = logstest 刚刚创建的Topic名称
agent.sinks.s1.brokerList = zero230:9092 创建生产的机
  • 1
  • 2
  • 3
  • 4

在这里Flume与Kafka已经整合完毕了。

下节剧透


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/423772.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mybatis学习(37):动态sql-trim

目录结构 com.geyao.mybatis.mapper BlogMapper类 package com.geyao.mybatis.mapper;import java.util.List; import java.util.Map;import org.apache.ibatis.annotations.Param;import com.geyao.mybatis.pojo.Blog;public interface BlogMapper {Blog selectBlog(Integer…

SQL-简单查询

/*人员:LDH功能:SQL-简单查询日期:2018-7-18*/USE TSQLFundamentals2008; GO-- Select some columns information. SELECT empid,lastname,firstname,address,city,country FROM HR.Employees;-- GROUP BY SELECT 国家 country,COUNT(1) AS …

mybatis学习(38):动态sql-foreach

目录结构 com.geyao.mybatis.mapper BlogMapper类 package com.geyao.mybatis.mapper;import java.util.List; import java.util.Map;import org.apache.ibatis.annotations.Param;import com.geyao.mybatis.pojo.Blog;public interface BlogMapper {Blog selectBlog(Integer…

mybatis学习(39):动态sql片段

目录结构 com.geyao.mybatis.mapper BlogMapper类 package com.geyao.mybatis.mapper;import java.util.List; import java.util.Map;import org.apache.ibatis.annotations.Param;import com.geyao.mybatis.pojo.Blog;public interface BlogMapper {Blog selectBlog(Integer…

中国古代十三美男

一、潘安    潘岳,就是人所周知的潘安,西晋时河南人氏,表字安仁,小字檀奴。其人“姿容既好,神情亦佳”。潘岳年轻时,坐车到洛阳城外游玩,当时不少妙龄姑娘见了他,都会怦然心动给他…

mybatis学习(40):逆向工程的创建

目录 首先导入我们的jar包 链接:https://pan.baidu.com/s/1Ent3kAwOagOZLT0XxDLEeA 提取码:zqpu 建立一个com.geyao.generator的包 generator的java类 package com.geyao.generator; import java.io.File; import java.util.*;import org.mybatis.ge…

SpringBoot中Tomcat配置(学习SpringBoot实战)

1、Tomcat配置 Spring Boot默认内嵌的Tomcat为Servlet容器,所以本节只讲对Tomcat配置,其实本节的配置对Tomcat、Jetty和Undertow都是通用的。 1.1 配置Tomcat 关于Tomcat的所有属性都在org.springframework.boot.autoconfigure.web.ServerProperties配…

axios的数据请求方式及跨域

express 的三大功能:静态资源、路由、模板引擎 app.use(express.static(www));  只要是创建这个静态的目录,这个 www 的静态目录里面的文件就可以被访问 数据的请求方式 axios get 的 请求方式    axios.get(url地址).then(function(success){  //…

mybatis学习(41):使用逆向工程

新建一个项目,将逆向工程的生成的拷贝进来 配置文件 log4j.properties ### \u914D\u7F6E\u6839 ### log4j.rootLogger debug,console ,fileAppender,dailyRollingFile,ROLLING_FILE,MAIL,DATABASE### \u8BBE\u7F6E\u8F93\u51FAsql\u7684\u7EA7\u522B\uFF0C\u5176…

[Alg] 二叉树的非递归遍历

1. 非递归遍历二叉树算法 (使用stack) 以非递归方式对二叉树进行遍历的算法需要借助一个栈来存放访问过得节点。 (1) 前序遍历 从整棵树的根节点开始,对于任意节点V,访问节点V并将节点V入栈,并判断节点V的左子节点L是否为空。若L不为空&#…

[c++]访MSN浮出窗口的示例

【声明】严格来讲,这篇文章不属于我的原创。我在这里参考了codeproject上的国外作者的模仿MSN浮出窗口的C#代码。换句话说,可以认为我把C#代码翻译成了C代码。另外,为了简化代码,CloseButton我没有采用自己绘制,而是用…

mybatis学习(42):mybatis的一级缓存

目录结构 com.geyao.mybatis.mapper BlogMapper类 package com.geyao.mybatis.mapper;import java.util.List; import java.util.Map;import org.apache.ibatis.annotations.Param;import com.geyao.mybatis.pojo.Blog;public interface BlogMapper {Blog selectBlog(Integer…

防止ASP.NET按钮多次提交的办法

方法一<asp:Button ID"btnSumbit" runat"server" UseSubmitBehavior"false" OnClientClick"this.valueSumbit;this.disabledtrue; " Text"Sumbit" OnClick"btnSumbit_Click" /> 方法二1<html xmlns"…

mybatis学习(43):一级缓存被刷新情况

目录结构 com.geyao.mybatis.mapper BlogMapper类 package com.geyao.mybatis.mapper;import java.util.List; import java.util.Map;import org.apache.ibatis.annotations.Param;import com.geyao.mybatis.pojo.Blog;public interface BlogMapper {Blog selectBlog(Integer…

python文件读写小结

读文件 打开一个文件用open()方法(open()返回一个文件对象&#xff0c;它是可迭代的)&#xff1a; >>> f open(test.txt, r) r表示是文本文件&#xff0c;rb是二进制文件。&#xff08;这个mode参数默认值就是r&#xff09; 如果文件不存在&#xff0c;open()函数就会…

mybatis学习(44):二级缓存1

目录结构 com.geyao.mybatis.mapper BlogMapper类 package com.geyao.mybatis.mapper;import java.util.List; import java.util.Map;import org.apache.ibatis.annotations.Param;import com.geyao.mybatis.pojo.Blog;public interface BlogMapper {Blog selectBlog(Integer…

HDU6089 恐怖分子(变形线段树)

题目描述 n*m的平面内有K个不安全点&#xff0c;Q个询问位置在(x,y)的人能走到多少个点&#xff1f;从(x,y)走到(x,y)是合法的&#xff0c;当且仅当(x,y)和(x,y)之间的矩形中不包含不安全点。 题解 问题相当于平面中有若干障碍点&#xff0c;询问以某一个点为四个角之一的不包含…

mybatis学习(45):开启二级缓存

目录结构 com.geyao.mybatis.mapper BlogMapper类 package com.geyao.mybatis.mapper;import java.util.List; import java.util.Map;import org.apache.ibatis.annotations.Param;import com.geyao.mybatis.pojo.Blog;public interface BlogMapper {Blog selectBlog(Integer…

【读书笔记】重要的东西

以上摘自一本叫做《大道至简》的小册子&#xff0c;有关介绍可以参考下面的链接 http://www.china-pub.com/34356转载于:https://www.cnblogs.com/chenxizhang/archive/2008/08/11/1264917.html

七夕秀恩爱新姿势!这波操作我给十分!

一、前言 像每一滴酒回不了最初的葡萄&#xff0c;我回不到年少。爱情亦是如此&#xff0c;这就是写一篇小程序的初衷&#xff0c;用来记录我和她最美的恋爱。什么是最美恋爱&#xff1f;就是繁忙之余的一封书信&#xff0c;一起奋斗的目标&#xff0c;精彩的瞬间&#xff0c;旅…