Kafka学习(一)-------- Quickstart

参考官网:http://kafka.apache.org/quickstart

一、下载Kafka

官网下载地址 http://kafka.apache.org/downloads

截至2019年7月8日 最新版本为 2.3.0 2.12为编译的scala版本 2.3.0为kafka版本

  • Scala 2.12  - kafka_2.12-2.3.0.tgz (asc, sha512)

    解压
    > tar -xzf kafka_2.12-2.3.0.tgz
    > cd kafka_2.12-2.3.0

二、启动服务

要先启动zookeeper kafka内置了一个 也可以不用

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

三、创建topic

replication-factor为1   partitions为1
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
查看topic
> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test

也可以不创建topic 设置自动创建 当publish的时候

四、发送消息

用command line client 进行测试 一行就是一条消息

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

五、消费者

command line consumer 可以接收消息

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

六、设置多broker集群

单broker没有意思 我们可以设置三个broker

首先为每个broker 复制配置文件

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

然后编辑

config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dirs=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dirs=/tmp/kafka-logs-2

broker.id是唯一的 cluster中每一个node的名字 我们在same machine上 所有要设置listeners和log.dirs 以防冲突

建一个topic 一个partitions 三个replication-factor

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
用describe看看都是什么情况
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
  • 有几个概念 :

  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.

  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.

  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

刚才那个topic
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

发送 接收

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

试一下容错 fault-tolerance

> ps aux | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564看一下变化:Leader换了一个  因为1被干掉了
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0
还是收到了消息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

七、使用kafka import/export data

刚才都是console 的数据,其他的sources other systems呢 用Kafka Connect

弄一个数据
> echo -e "foo\nbar" > test.txt
启动  指定配置文件
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
验证一下
> more test.sink.txt
foo
bar
消费者端
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
可以继续写入
> echo Another line>> test.txt

八、使用Kafka Streams

http://kafka.apache.org/22/documentation/streams/quickstart

WordCountDemo

https://github.com/apache/kafka/blob/2.2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java

代码片段

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream("streams-plaintext-input",Consumed.with(stringSerde, stringSerde);KTable<String, Long> wordCounts = textLines// Split each text line, by whitespace, into words..flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))// Group the text words as message keys.groupBy((key, value) -> value)// Count the occurrences of each word (message key)..count()// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

建一个 Kafka producer 指定input topic output topic

> bin/kafka-topics.sh --create \--bootstrap-server localhost:9092 \--replication-factor 1 \--partitions 1 \--topic streams-wordcount-output \--config cleanup.policy=compact
Created topic "streams-wordcount-output".

启动WordCount demo application

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

启动一个生产者写数据

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams

启动一个消费者接数据

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic streams-wordcount-output \--from-beginning \--formatter kafka.tools.DefaultMessageFormatter \--property print.key=true \--property print.value=true \--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializerall     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
kafka   1

转载于:https://www.cnblogs.com/tree1123/p/11150927.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/362048.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于页面布局间距使用的经验之谈

在页面布局的时候遇到一个问题在此记录。 有如下布局需求。页面上大多数都是这样的&#xff0c;一块一块从上到下排列。 块与块之间的间距需要固定由谁来负责。例如第一个块和第二个块之间的间距&#xff0c;就需要第二个块的margin-top完成&#xff0c;文字和第二个块之间的间…

最简洁的js鼠标拖曳效果【原】

请原谅我是一个标题档&#xff0c;不过还是很简洁的&#xff0c;因为只是初步的实现的拖曳效果<!DOCTYPE html><html><head><meta http-equiv"Content-Type"content"text/html; charsetutf-8"/><meta http-equiv"Content-…

safari 音频播放问题

问题描述&#xff1a; 点击播放音频按钮发现并没有声音&#xff08;并不是自动播放&#xff0c;是有用户行为的&#xff09;。 import React, { useEffect, useState, useRef } from reactfunction comp() {let [paused, setPaused] useState(true)let audioDom useRef(null…

canvas绘制经典折线图(一)

最终效果图如下&#xff1a; 实现步骤如下&#xff1a;注-引用了jQuery HTML代码 <!doctype html><html lang"en"><head><meta charset"UTF-8"><meta name"Generator" content"EditPlus"><meta nam…

如何从Java EE无状态应用程序连接到MongoDB

在本文中&#xff0c;我将介绍如何从无状态Java EE应用程序连接到MongoDB&#xff0c;以利用与MongoDB Java驱动程序提供的数据库的内置连接池。 如果您开发的REST API对MongoDB执行操作&#xff0c;则可能是这种情况。 获取Java MongoDb驱动程序 要将Java连接到MongoDB&#…

开发流程补全

在开发过程中我意识到一个问题 具体问题就是我没有一个可靠的机制来防止自己犯错 现在的流程是 开发 调试 -> 测试同学 -> 上线 这里测试的时间会有点长&#xff0c;因为bug会有点多&#xff0c;然后需要修改bug&#xff0c;然后测试验证 改bug时间 理解测试bug描述…

Linux 锁机制

本文讨论了 Linux 内核中可用的大量同步或锁定机制。这些机制为 2.6 版内核的许多可用方法提供了应用程序接口&#xff08;API&#xff09; 。但是在深入学习 API 之前&#xff0c;首先需要明白将要解决的问题。 当存在并发特性时&#xff0c;必须使用同步方法。当在同一时间段…

CSS中越界问题经典解决方案

8.CSS相关知识 (1)如何解决父元素的第一个子元素的margin-top越界问题 1)为父元素加border-top: 1px;——有副作用 2)为父元素指定padding-top: 1px;——有副作用 3)为父元素指定overflow:hidden;——有副作用 4)为父元素添加前置内容生成——推荐使用 .parent:before { conten…

用可编写脚本的终结点遍历REST应用程序

我喜欢JDK附带ScriptEngine的事实。 当您要评估服务器环境中已经部署的应用程序并进行故障排除时&#xff0c;它非常灵活。 将此REST端点添加到Java EE应用程序中&#xff0c;它将使您可以立即访问该应用程序的内部状态。 package myrestapp;import java.io.StringReader; imp…

win7笔记本为手机共享wifi

1、cmd netsh wlan set hostednetwork modeallow ssidyourname keyyourpassword 开启win7的虚拟wifi&#xff0c;让电脑变成无线路由器 这时&#xff0c;网络连接中会多出一个网卡为“Microsoft Virtual WiFi Miniport Adapter”的无线连接2。如果没有&#xff0c;需要更新无线…

createjs中shape的属性regX和regY

官方文档说regX和regY是图形与注册点的距离。 那么注册点是什么呢&#xff1f; 我理解注册点就是图形的x/y对应的点图形动效的原点就是注册点 如果修改图形的regX和regY值图形在画布上的位置是会被改变的&#xff0c;但是注册点其实并没有被改变。因为图形的x/y值并没有被改…

CSS3里的display

默认值&#xff1a;inline 适用于&#xff1a;所有元素 继承性&#xff1a;无 动画性&#xff1a;否 none&#xff1a;隐藏对象。与visibility属性的hidden值不同&#xff0c;其不为被隐藏的对象保留其物理空间inline&#xff1a;指定对象为内联元素。block&#xff1a;指定…

H3C 单区域OSPF配置示例二

转载于:https://www.cnblogs.com/fanweisheng/p/11163688.html

身份反模式:联邦筒仓和意大利面条身份

分析公司Quocirca的最新研究证实&#xff0c;现在许多企业的外部用户比内部用户更多&#xff1a;在欧洲&#xff0c;有58&#xff05;的企业直接与其他企业和/或消费者的用户进行交易&#xff1b; 仅在英国&#xff0c;这一数字就达到了65&#xff05;。 如果您回顾历史&#x…

OpenSSL命令

Openssl Windows下编译过程1、下载openssl源代码以及相应的vc工程2、下载perl工具&#xff0c;如&#xff1a;ActivePerl-5.8.8.820-MSWin32-x86-274739.msi3、安装ActivePerl4、打开控制台程序&#xff0c;在openssl解压后的目录下执行Perl Configure VC-WIN32命令&#xff0c…

修改webpack的publicPath为动态设置以适配公司活动平台

背景&#xff1a; 我们需要将React开发的应用部署到一个活动搭建平台上&#xff0c;这意味我们只需要上传源码&#xff0c;没有搭建服务器的环节&#xff0c;没有配置Nginx的环节。具体步骤就是在该平台新建一个活动&#xff0c;然后将自己的源码传到这个活动下&#xff0c;然…

多个气泡向上冒出!

这里展示白色半透明气泡如下图&#xff1a;实际是动态 思路&#xff1a;HTML里只需要一个CANVAS元素&#xff0c;Javascript里操作canvas 1、给canvas里绘制背景图片 2、在绘制半径为0-10px的圆形&#xff0c;x坐标屏幕水平随机&#xff0c;y所标竖直大于屏幕高度。 圆形背景…

注入域对象而不是基础结构组件

依赖注入是Java&#xff08;以及许多其他编程语言&#xff09;中广泛使用的软件设计模式&#xff0c;用于实现控制反转 。 它提高了可重用性&#xff0c;可测试性&#xff0c;可维护性&#xff0c;并有助于构建松耦合的组件。 如今&#xff0c;依赖注入是将Java对象连接在一起的…

分享25个优秀的网站底部设计案例

相对于网站头部来说&#xff0c;关注网站底部设计的人很少。我们平常也能碰到有些网站的底部设计得很漂亮&#xff0c;给网站的呈现来一个完美的结尾。这篇文章收集了25个优秀的网站底部设计案例&#xff0c;一起欣赏。 me & oli La Bubbly Poogan’s Porch GiftRocket Lin…

wbepack中output.filename和output.chunkFilename

对于webpack配置中filename和chunkFilename在使用中有些不懂的地方&#xff0c;研究之后记录如下。 filename: string | function 此选项决定了每个输出 bundle 的名称。这些 bundle 将写入到 output.path 选项指定的目录下。 对于单个入口起点&#xff0c;filename 会是一个…