如何在 Ubuntu 20.04 上安装 Apache Kafka

前些天发现了一个人工智能学习网站,通俗易懂,风趣幽默,最重要的屌图甚多,忍不住分享一下给大家。点击跳转到网站。

如何在 Ubuntu 20.04 上安装 Apache Kafka

介绍

Apache Kafka是一种分布式消息代理,旨在处理大量实时数据。Kafka 集群具有高度可扩展性和容错性。与ActiveMQ和RabbitMQ等其他消息代理相比,它还具有更高的吞吐量。尽管它通常用作发布/订阅消息传递系统,但许多项目也将其用于日志聚合,因为它为已发布的消息提供持久存储。

发布/订阅消息系统允许一个或多个生产者发布消息,而无需考虑消费者的数量或他们将如何处理消息。订阅的客户端会自动收到有关更新和新消息创建的通知。该系统比客户端定期轮询以确定是否有新消息可用的系统更高效且可扩展。

在本文中,我们将在 Ubuntu 20.04 上安装和配置 Apache Kafka 2.8.2。

先决条件

要继续操作,需要:

  • 具有至少 4 GB RAM 和具有sudo权限的非 root 用户的Ubuntu 20.04服务器。如果没有设置非 root 用户,可以按照我的使用 Ubuntu 20.04 进行初始服务器设置进行设置。RAM 小于 4GB 的安装可能会导致 Kafka 服务失败。
  • OpenJDK 11 。Kafka是用Java编写的,因此需要JVM。

第 1 步 — 为 Kafka 创建用户

由于 Kafka 可以通过网络处理请求,因此第一步是为该服务创建专用用户。如果有人破坏 Kafka 服务器,这可以最大限度地减少对 Ubuntu 计算机的损害。在此步骤中创建一个专用kafka的用户。

以非 root的sudo用户身份登录到服务器,然后创建一个名为kafka 的用户:

sudo adduser kafka

按照提示设置密码并创建kafka用户。

接下来,使用命令将kafka用户添加到组中。我们需要这些权限来安装 Kafka 的依赖项:

sudo adduser kafka sudo

现在kafka用户现在已准备就绪。使用以下方式登录帐户:

su -l kafka

现在我们已经创建了 Kafka 特定的用户,可以下载并提取 Kafka 二进制文件了。

第 2 步 — 下载并解压 Kafka 二进制文件

在此步骤中,将下载 Kafka 二进制文件并将其解压到kafka用户主目录中的专用文件夹中。

首先,在/home/kafka创建一个目录Downloads来存储下载:

mkdir ~/Downloads

curl下载 Kafka 二进制文件:

curl "https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz" -o ~/Downloads/kafka.tgz

创建一个名为 kafka的目录并移动到该目录。下来将使用此目录作为 Kafka 安装的基本目录:

mkdir ~/kafka && cd ~/kafka

使用以下命令提取下载的存档:

tar -xvzf ~/Downloads/kafka.tgz --strip 1

指定该--strip 1标志以确保存档的内容是在~/kafka/其自身中提取的,而不是在其中的另一个目录(例如~/kafka/kafka_2.13-2.8.2/)中提取的。

现在已成功下载并提取二进制文件,可以开始配置 Kafka 服务器。

第 3 步 — 配置 Kafka 服务器

Kafka主题是可以发布消息的类别、组或主题名称。但是,Kafka 的默认行为不允许删除主题。要修改此设置,必须编辑配置文件,下来将在此步骤中执行此操作。

Kafka 的配置选项在server.properties 中指定。使用最喜欢的编辑器nano打开此文件:

nano ~/kafka/config/server.properties

首先,添加一个允许删除 Kafka 主题的设置。将以下行添加到文件底部:

〜/ kafka / config / server.properties

delete.topic.enable = true

其次,将通过修改属性来更改存储 Kafka 日志的目录log.dirs。找到该log.dirs属性并将现有路线替换为突出显示的路线:

〜/ kafka / config / server.properties

log.dirs=/home/kafka/logs

保存并关闭文件。

现在我们已经配置了 Kafka,可以创建systemd用于在启动时运行和启用 Kafka 服务器的单元文件。

第 4步 — 创建systemd单元文件并启动 Kafka 服务器

在本部分中,我们将为Kafka 服务创建systemd单元文件。这些文件将帮助我们与其他 Linux 服务一致的方式执行常见的服务操作,例如启动、停止和重新启动 Kafka。

Kafka 使用Zookeeper来管理其集群状态和配置(最新版的可以选择是否是ZK)。它被用于许多分布式系统中,我们可以在官方 Zookeeper 文档中阅读有关该工具的更多信息。下面演示将使用 Zookeper 作为这些单元文件的服务。

创建单元文件zookeeper

sudo nano /etc/systemd/system/zookeeper.service

在文件中输入以下单位定义:

/etc/systemd/system/zookeeper.service

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal[Install]
WantedBy=multi-user.target

[Unit]部分指定 Zookeeper 需要网络和文件系统准备好才能启动。

[Service]部分指定systemd应使用zookeeper-server-start.shzookeeper-server-stop.shshell 文件来启动和停止服务。它还指定如果Zookeeper异常退出则应重新启动。

添加此内容后,保存并关闭文件。

接下来,创建kafka systemd 服务文件:

sudo nano /etc/systemd/system/kafka.service

在文件中输入以下单位定义:

/etc/systemd/system/kafka.service

[Unit]
Requires=zookeeper.service
After=zookeeper.service[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal[Install]
WantedBy=multi-user.target

[Unit]部分指定该单元文件依赖于zookeeper.service,这将确保在服务启动zookeeper时自动启动kafka

[Service]部分指定systemd应使用kafka-server-start.shkafka-server-stop.shshell 文件来启动和停止服务。它还指定如果Kafka异常退出则应重新启动。

保存并关闭文件。

现在已经定义了单位,使用以下命令启动 Kafka:

sudo systemctl start kafka

要确保服务器已成功启动,请检查该kafka单元的日志日志:

sudo systemctl status kafka

将收到如下输出:

Output● kafka.serviceLoaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset>Active: active (running) since Wed 2023-02-01 23:44:12 UTC; 4s agoMain PID: 17770 (sh)Tasks: 69 (limit: 4677)Memory: 321.9MCGroup: /system.slice/kafka.service├─17770 /bin/sh -c /home/kafka/kafka/bin/kafka-server-start.sh /ho>└─17793 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMill>

现在有一个 Kafka 服务器正在侦听端口9092,这是 Kafka 服务器使用的默认端口。

现在已启动该kafka服务。但是如果要重新启动服务器,Kafka将不会自动重新启动。要在服务器启动时启用该kafka服务,请运行以下命令:

sudo systemctl enable zookeeper

将收到已创建符号链接的响应:

OutputCreated symlink /etc/systemd/system/multi-user.target.wants/zookeeper.service → /etc/systemd/system/zookeeper.service.

然后运行这个命令:

sudo systemctl enable kafka

将收到已创建符号链接的响应:

OutputCreated symlink /etc/systemd/system/multi-user.target.wants/kafka.service → /etc/systemd/system/kafka.service.

在此步骤中,我们启动并启用了kafkazookeeper服务。在下一步中,将检查 Kafka 安装。

第 5 步 — 测试 Kafka 安装

在此步骤中,将测试 Kafka 安装。我们将发布并使用一条Hello World消息,以确保 Kafka 服务器按预期运行。

在 Kafka 中发布消息需要:

  • 生产者*,*能够将记录和数据发布到主题。
  • 消费者,从主题读取消息和数据。

首先,创建一个名为TutorialTopic 的主题:

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

我们使用脚本从命令行创建生产者kafka-console-producer.sh。它需要 Kafka 服务器的主机名、端口和主题作为参数。

将收到主题已创建的响应:

OutputCreated topic TutorialTopic.

现在将字符串发布"Hello, World"TutorialTopic主题:

echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

接下来,使用该脚本创建一个 Kafka 消费者kafka-console-consumer.sh。它需要 ZooKeeper 服务器的主机名和端口以及主题名称作为参数。以下命令使用来自TutorialTopic 的消息。请注意该标志的使用--from-beginning,它允许消费在消费者启动之前发布的消息:

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

如果没有配置问题,将Hello, World在终端中收到响应:

OutputHello, World

该脚本将继续运行,等待更多消息发布。要对此进行测试,请打开一个新的终端窗口并登录到您的服务器。请记住以你的用户身份登录kafka

su -l kafka

在这个新终端中,启动生产者来发布第二条消息:

echo "Hello World from Sammy at DigitalOcean!" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

此消息将加载到原始终端中消费者的输出中:

OutputHello, World
Hello World from Sammy at DigitalOcean!

完成测试后,按CTRL+C停止原始终端中的消费者脚本。

到此现在已经在 Ubuntu 20.04 上安装并配置了 Kafka 服务器。在下一步中,我们将执行一些快速任务来强化 Kafka 服务器的安全性。

第 6 步 — 强化 Kafka 服务器

安装完成后,可以删除kafka用户的管理权限并强化 Kafka 服务器。

执行此操作之前,请注销并以任何其他非 rootsudo用户身份重新登录。如果仍在运行开始本教程时使用的同一个 shell 会话,请输入exit

从 sudo 组中删除用户kafka

sudo deluser kafka sudo

为了进一步提高 Kafka 服务器的安全性,请使用该命令锁定用户的密码passwd。此操作可确保任何人都无法使用此帐户直接登录服务器:

sudo passwd kafka -l

-l标志锁定更改用户密码的命令 ( passwd)。

此时,只有一个root或一个用户可以使用以下命令sudo登录:

sudo su - kafka

将来,如果想解锁更改密码的功能,请使用passwd下的-u选项:

sudo passwd kafka -u

现在已成功限制kafka用户的管理员权限。现在可以选择执行下一步,这会将 Kafka 添加到你的系统中。

第 7 步 — 安装 KafkaT(可选)

开发KafkaT 的目的是提高查看 Kafka 集群详细信息以及从命令行执行某些管理任务的能力。需要 Ruby 才能使用它。还需要build-essential包来构建其它所需依赖。

build-essential使用以下命令安装 Ruby 和软件包:

sudo apt install ruby ruby-dev build-essential

现在可以使用以下命令安装 KafkaT :

sudo CFLAGS=-Wno-error=format-overflow gem install kafkat

Wno-error=format-overflow需要编译标志来抑制 Zookeeper 在安装过程中的警告和错误。

安装完成后,将收到安装完成的响应:

Output...
Done installing documentation for json, colored, retryable, highline, trollop, zookeeper, zk, kafkat after 3 seconds
8 gems installed

KafkaT用作.kafkatcfg配置文件来确定Kafka服务器的安装和日志目录。它还应该有一个将 KafkaT 指向我们的 ZooKeeper 实例的条目。

创建一个名为的新文件.kafkatcfg

nano ~/.kafkatcfg

添加以下行以指定有关 Kafka 服务器和 Zookeeper 实例的所需信息:

〜/.kafkatcfg

{"kafka_path": "~/kafka","log_path": "/home/kafka/logs","zk_path": "localhost:2181"
}

保存并关闭文件。

要查看有关所有 Kafka 分区的详细信息,请尝试运行以下命令:

kafkat partitions

将收到以下输出:

Output[DEPRECATION] The trollop gem has been renamed to optimist and will no longer be supported. Please switch to optimist as soon as possible.
/var/lib/gems/2.7.0/gems/json-1.8.6/lib/json/common.rb:155: warning: Using the last argument as keyword parameters is deprecated
...
Topic                 Partition   Leader      Replicas        ISRs
TutorialTopic         0             0         [0]             [0]
__consumer_offsets	  0		          0		      [0]							[0]
...
...

输出将包括TutorialTopic__consumer_offsets,这是 Kafka 用于存储客户端相关信息的内部主题。可以安全地忽略以 __consumer_offsets开头的行。

要了解有关 KafkaT 的更多信息,请参阅其GitHub 存储库。

结论

现在,Apache Kafka 在 Ubuntu 服务器上安全运行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/615109.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一体机旅游景区污水处理设备工艺说明

一体机旅游景区污水处理设备工艺说明 原水浓度:COD≤500mg/L,BOD≤300mg/L,NH3-N≤40mg/L,超过以上浓度需另行设计。 出水标准:COD≤60mg/L,BOD≤20mg/L,NH3-N≤15mg/L,出水要求如更…

常用Java代码-Java中的异常传播

在Java中,异常传播是一个重要的概念,它描述了异常如何在方法之间传播。当一个方法抛出一个异常时,调用该方法的代码必须处理该异常,否则程序将终止。如果调用该方法的代码也抛出了异常,那么这个异常会继续向上传播&…

问CHATawsec2怎么部署实例?

CHAT回复:在AWS EC2(Elastic Compute Cloud)上部署实例主要涉及以下步骤: 1. 登录AWS管理控制台:使用你的AWS账户登录AWS管理控制台。 2. 导航到EC2服务:在顶部菜单栏中,点击"服务"然…

android——rxjava的使用

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、rxjava是什么?二、使用步骤 1.引入库2.读入数据总结 前言 本文介绍项目开发中使用到rxjava的情形,以及详细的代码。 一、rxjava是什…

2023年终小结

亲爱的小伙伴们: 随着2023年即将结束,我想回顾一下过去一年作为一名程序员的成长和经验,并分享一些我在技术和团队合作方面的收获。 1. 技术成长与学习: 在过去的一年里,我积极追求技术的学习和发展。我深入研究了新…

Canvas 指南与总结

背景 Canvas API 提供了一个通过 JavaScript 和 HTML 的元素来绘制图形的方式。它可以用于动画、游戏画面、数据可视化、图片编辑以及实时视频处理等方面。 Canvas API 主要聚焦于 2D 图形。而同样使用元素的 WebGL API 则用于绘制硬件加速的 2D 和 3D 图形。 简单例子 <…

运维管理软件:助力大学校园破浪前行的数字引擎

随着校园信息化的深入推进&#xff0c;智慧校园的建设面临着诸多挑战。庞大的IT环境、复杂的网络架构、多样化的应用需求&#xff0c;都对运维管理提出了更高的要求。同时&#xff0c;数据安全、隐私保护以及高效的资源利用也成为智慧校园运维的核心需求。 在这场数字化时代的浪…

本地静态资源打包出来,本地配置ng访问服务器(uniapp打包成h5后,使用打包资源连接测试环境测试)

1.下载ng https://nginx.org/en/download.html 2.解压下载的压缩包 3.打包h5静态资源 4.将打包出来的资源放入ng -》html文件夹下面 5.进入ng-》conf-》nginx.conf 进行转发配置 6.启动ng服务&#xff0c;点击nginx.exe 7.浏览器直接访问http://localhost:8081/#/&#x…

mysql触发器的简单使用

mysql触发器 触发器是一个特殊的存储过程&#xff0c;在事件delete、insert、update发生时自动执行一条或多条SQL语句&#xff08;执行多条SQL语句需要用begin、end 包裹起来&#xff09; 创建触发器 创建触发器的四大必要条件 唯一的触发器名称触发器关联的表触发器响应的…

Elasticsearch倒排索引详解

倒排索引&#xff1a; 组成 term index(词项索引 &#xff0c;存放前后缀指针) Term Dictionary&#xff08;词项字典&#xff0c;所有词项经过文档与处理后按照字典顺序组成的一个字典&#xff08;相关度&#xff09;&#xff09; Posting List&#xff08;倒排表&#xf…

Web实战丨基于Django与HTML的新闻发布系统

文章目录 写在前面项目简介项目框架实验内容安装依赖库1.创建项目2.系统配置3.配置视图文件4.配置模型文件5.配置管理员文件6.配置模板文件7.创建数据库8.启动项目 运行结果写在后面 写在前面 本期内容&#xff1a;基于Django与HTML的简单新闻发布系统。 项目需求&#xff1a…

快速入门Semantic Kernel:构建您的第一个AI应用

快速入门Semantic Kernel&#xff1a;构建您的第一个AI应用 引言Semantic Kernel基础知识核心功能操作原理 环境准备和安装环境准备安装Semantic Kernel 创建第一个Semantic Kernel项目项目设置示例代码测试和运行 设计有效的Prompt基本原则示例测试和迭代 常见问题和解决方案问…

order by 与 分页 的冲突

order by 与 分页 的冲突 问题背景 Oracle拼接SQL&#xff0c;JAVA使用SQLQueryExecutor执行拼接的SQL&#xff0c;SQL如下&#xff1a; SELECT col_key, col_other_info FROM tb_tableName WHERE col_where_info 一些筛选条件 order by col_updatetime desc 该表中的数…

python股票分析挖掘预测技术指标知识跳空缺口指标详解(5)

本人股市多年的老韭菜&#xff0c;各种股票分析书籍&#xff0c;技术指标书籍阅历无数&#xff0c;萌发想法&#xff0c;何不自己开发个股票预测分析软件&#xff0c;选择python因为够强大&#xff0c;它提供了很多高效便捷的数据分析工具包。 我们已经初步的接触与学习其中数…

cad的模型怎么打散导入3d---模大狮模型网

将CAD中的模型打散并导入3D建模软件&#xff0c;需要以下步骤&#xff1a; 将CAD中的模型进行分组或分层&#xff1a;在CAD中&#xff0c;将模型按照不同的组或层进行分组或分层。这样可以方便地控制每个部分的显示和隐藏&#xff0c;在导入3D建模软件后&#xff0c;也可以更方…

ChatGLM3-6B的本地api调用

ChatGLM3-6B的本地api调用方式 1.运行openai_api_demo路径下的openai_api.py 启动后界面&#xff1a; 注意&#xff1a;本地api调到的前提是——本地部署了ChatGLM3-6B,本地部署的教程可参考&#xff1a; 20分钟部署ChatGLM3-6B 部署了若CUDA可用&#xff0c;默认会以CUDA方…

阿里云OSS上传视频,可分片上传

uniappH5实现 阿里云OSS上传视频 示例图&#xff1a; 上传视频完整示例代码&#xff1a; 使用npm安装SDK开发包&#xff0c;安装命令为 npm install ali-oss --save accessKeyId 和 accessKeySecret 还有 bucket 替换成你的就行。 multipartUpload 的第一个入参是&#x…

[开发语言][c++]:左值、右值、左值引用、右值引用和std::move()

左值、右值、左值引用、右值引用和std::move 1. 什么是左值、右值2. 什么是左值引用、右值引用3. **右值引用和std::move的应用场景**3.1 实现移动语义3.2 **实例&#xff1a;vector::push_back使用std::move提高性能** **4. 完美转发 std::forward**5. Reference 写在前面&…

【分享贴】大话ESD和浪涌

从事电子产品开发的朋友应该都知道&#xff0c;电子产品样机完成之后&#xff0c;会进入产品性能测试阶段&#xff0c;而其中的EMC&#xff08;电磁兼容&#xff09;测试则是至关重要的一项。 EMC&#xff08;电磁兼容&#xff09;又被分为两大类&#xff1a;EMI&#xff08;电…

【React 常用的 TS 类型】持续更新

1&#xff09;定义样式的 TS 类型 【 React.CSSProperties 】 一般定义样式时需要的类型限制&#xff0c;如下&#xff1a; const customStyle: React.CSSProperties {color: blue,fontSize: 16px,margin: 10px,}; 2&#xff09;定义 Input Ref 属性时的 TS 类型限制 【 R…