Nodejs 第七十九章(Kafka进阶)

在这里插入图片描述

kafka前置知识在上一章讲过了 不再复述

kafka进阶

1. server.properties配置文件

server.properties是Kafka服务器的配置文件,它用于配置Kafka服务的各个方面,包括网络设置、日志存储、消息保留策略、安全认证

#broker的全局唯一编号,不能重复
broker.id=0
#端口号
port=9092
#处理网络请求的线程数量
#接收线程会将接收到的消息放到内存中,然后再从内存中写入磁盘。
num.network.threads=3
#用来处理磁盘IO的线程数量
#消息从内存中写入磁盘是时候使用的线程数量。
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=./logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#每个topic的分区数
offsets.topic.replication.factor=1
#每个topic的副本数
transaction.state.log.replication.factor=1
#每个topic的最小副本数
transaction.state.log.min.isr=1
#日志保留时间,单位小时 168就是7天
log.retention.hours=168
#定期检查日志是否过期的间隔,单位毫秒
log.retention.check.interval.ms=300000
#日志清理器是否启用
log.cleaner.enable=true
#zookeeper地址
zookeeper.connect=localhost:2181
#zookeeper连接超时时间
zookeeper.connection.timeout.ms=18000
#zookeeper会话超时时间
group.initial.rebalance.delay.ms=0
2.producer.properties配置文件

producer.properties是Kafka生产者客户端的配置文件,用于配置Kafka生产者的行为和属性。当你使用Kafka生产者API发送消息到Kafka集群时,可以使用该配置文件哟

#配置生产者的broker列表 可以配置多个,以逗号隔开 也就是做集群的
#来获取每一个topic的分片数等元数据信息。
bootstrap.servers=localhost:9092
# 配置数据压缩方式 有none,gzip,snappy,lz4,zstd
compression.type=none
#客户端等待请求的响应的最长时间 超时时间
#request.timeout.ms=
#定期发送消息的时间间隔,一般配合batch.size使用,例如设置了50ms,那么每50ms就会发送一次消息合集
#linger.ms=
#每次发送给Kafka服务器请求消息的最大大小
#max.request.size=
#批量发送消息比如说设置了值16KB,那么消息内容凑够16KB就会被发送出去,否则就不会发送,这样可以避免单条消息太大导致的发送失败
#batch.size=
#约束producer缓存池的大小,默认是32MB,可以根据实际情况调整
#buffer.memory=
3.consumer.properties配置文件

用于配置Kafka消费者的属性。它包含了一系列用于定义消费者行为的参数和数值

#定义Kafka的Broker列表 可以配置多个,以逗号隔开 也就是做集群的
bootstrap.servers=localhost:9092
#定义消费者组的ID
group.id=test-consumer-group
#用于指定当消费者加入一个消费者组但没有可用的消费位移时的行为
#有三种选项 earliest/latest/none
#earliest:表示消费者将从最早的可用消费位移开始消费。消费者将从主题的最早消息开始消费,即使这些消息已经过期。
#latest:表示消费者将从最新的可用消费位移开始消费。消费者将从主题的最新消息开始消费,即跳过已经过期的消息。
#none:表示如果没有可用的消费位移,消费者将抛出异常。这样可以确保消费者只消费已经提交的消费位移。
#auto.offset.reset=
#心跳间隔用于保持消费者活跃状态
#session.timeout.ms
#指定消费者一次性获取最大的消息数量,如果为0表示不限制
#fetch.max.bytes=1048576
#指定消费者一次性获取的最大等待时间,如果为0表示不限制
#fetch.max.wait.ms=500

消息模式

kafka同样支持发布订阅的方式发送消息 我们来编写一下案例

官方文档 https://kafka.js.org/docs/getting-started

1. 压缩

引入CompressionTypes 选择压缩模式 GIZP LZ4 zSTD

import { Kafka,CompressionTypes } from 'kafkajs'await producer.send({topic: 'xiaoman',compression: CompressionTypes.GZIP,messages: [{value: '测试数据1',headers: {'name': Buffer.from('小满')}},{ value: Buffer.from('测试数据2') },],
})
2. 标头

允许使用标头传递对象元数据,把需要传递的数据放在headers即可数据将一起被发送过去

await producer.send({topic: 'xiaoman',messages: [{value: '测试数据1',headers: {'name': Buffer.from('小满')}},{ value: Buffer.from('测试数据2') },],
})

消费者获取headers 元数据

await consumer.run({eachMessage: async ({ topic, partition, message }) => {console.log({topic,partition,value: message.value.toString(),headers: message.headers?.name?.toString(),})},
})
3. 多主题派发

send 发方法换成 sendBatch 增加 topicMessages 是个数组

await producer.sendBatch({topicMessages: [{topic: 'xiaoman',messages: [{ value: Buffer.from('测试数据1') },],},{topic: 'xiaoman2',messages: [{ value: Buffer.from('测试数据2') },],},],
})

消费多个消息时候的时候可以根据业务自由选择模式

  1. 逐条处理

  2. 批量处理(批量处理可以减少网络开销)

await consumer.subscribe({ topic: 'xiaoman', fromBeginning: true })
await consumer.subscribe({ topic: 'xiaoman2', fromBeginning: true })
//逐条处理
await consumer.run({eachMessage: async ({ topic, partition, message }) => {console.log({topic,partition,value: message.value.toString(),headers: message.headers?.name?.toString(),})},
})
//批量处理
await consumer.run({eachBatch: async ({ batch }) => {batch.messages.forEach( (message) => {console.log('Received message', message.value.toString())})},
})

案例演示

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/31712.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL数据库初体验+数据库管理(其一)

【1】 操作系统介绍: Linux操作系统有 RedHat CentOS Debian Ubuntu OpenSUSE 信创标准 国产系统 : 华为(欧拉) 阿里(龙蜥) 腾讯 (tencentOS) 麒麟&#xf…

【日记】梦到兄长要给鳄鱼换牙齿……(421 字)

正文 今天中午睡了一个小时多一点,做了一个很奇怪的梦。梦见兄长要给一条鳄鱼换牙齿,还说早上不好操作,要三天之后的中午或晚上,颇有一种翻黄历寻个良辰吉日之感。但我没那样大的耐性,便捏住鳄鱼的嘴,左摔右…

实战18:基于tkinter+jupyter notebook开发的情感分析系统

项目演示: 完整代码: import pandas as pd import numpy as np from collections import Counter import re import jieba from tqdm import tqdm from sklearn.metrics import roc_curve, auc import joblib import gensim from sklearn.svm import SVC from gensim.mode…

STM32小项目———感应垃圾桶

文章目录 前言一、超声波测距1.超声波简介2.超声波测距原理2.超声波测距步骤 二、舵机的控制三、硬件搭建及功能展示总结 前言 一个学习STM32的小白~ 有问题请评论区或私信指出 提示:以下是本篇文章正文内容,下面案例可供参考 一、超声波测距 1.超声波…

Studying-代码随想录训练营day16| 513找到左下角的值、112.路径总和、106从中序与后序遍历序列构造二叉树

第十六天,二叉树part03💪💪💪,编程语言:C 目录 513找到左下角的值 112.路径总和 113.路径总和II 106从中序与后序遍历序列构造二叉树 105.从前序与中序遍历序列构造二叉树 总结 513找到左下角的值…

[240621] Anthropic 发布了 Claude 3.5 Sonnet AI 助手 | Socket.IO 拒绝服务漏洞

目录 Anthropic 发布 Claude 3.5 Sonnet AI 助手Scoket.IO 拒绝服务漏洞(CVE-2024-38355) Anthropic 发布 Claude 3.5 Sonnet AI 助手 Claude 3.5 Sonnet: 更智能、更快速、更安全的 AI 助手 一、 引言 Anthropic 发布了 Claude 3.5 Sonnet&#xff0…

慢阻肺患者为何容易营养不良?朗格力教你轻松改善

#肺科营养#朗格力#班古营养#复合营养素#肺部营养#肺部健康# 慢阻肺是我国常见的、高患病率的慢性呼吸系统疾病,会对肺结构和功能产生影响,从而引起各种不良反应,其中营养不良是常见并发症之一。慢阻肺为什么会发生营养不良?营养不良又是怎么伤害慢阻肺的呢?为什么像班古精准…

鸿蒙开发:【进程模型概述】

进程模型概述 系统的进程模型如下图所示: 应用中(同一包名)的所有PageAbility、ServiceAbility、DataAbility、FormAbility运行在同一个独立进程中,即图中绿色部分的“Main Process”。 WebView拥有独立的渲染进程,即…

什么是光子带隙光纤?

长期以来,光纤通信的发展受到纤芯材料特性的限制,特别是损耗特性。二氧化硅在可见光至近红外波长范围内损耗低,与激光器工作波长相匹配,因此成为长途电信应用中光纤纤芯的首选材料。 这类光纤的纤芯是实心的,传输原理是基于全内反射(Total Internal Reflection, TIR),其…

ServBay 下一代Web开发环境

ServBay是一个集成式、图形化的本地化Web开发环境。开发者通过ServBay几分钟就能部署一个本地化的开发环境。解决了Web开发者(比如PHP、Nodejs)、测试工程师、小型团队安装和维护开发测试环境的问题,同时可以快速的进行环境的升级以及维护。S…

Docker:认识Docker Host/Container/none网络

文章目录 Docker Host网络认识Docker Host网络实际操作网络模式区别使用场景 Docker Container网络认识Docker Container网络操作实例使用场景 Docker None网络使用场景 Docker Host网络 认识Docker Host网络 Docker容器运行默认会分配独立的Network Namespace隔离子系统&…

不破不立,B站终于跳出“舒适圈”?

哔哩哔哩已经很久没有这么振奋人心的时刻了。 6月19日,哔哩哔哩当日股价涨超18%,最高达到145.6元每股,时隔11个月,再次回归高位。从时间线上看,这次的股价大涨明显与哔哩哔哩刚(以下简称“B站”&#xff0…

基于单电阻采样的电流重构

1. 单电阻采样电流重构原理 图1(a)所示是电压型三相逆变器,定义三相开 关信号为 Sa 、Sb 、Sc 。当 Sa = 1 表示A相上桥臂导 通,下桥臂关断;Sa = 0 表示相反。三相逆变器采用 SVPWM调制方式控制,有8种开关工作状态,包括 6个非零电压矢量V1 ~ V6 和2个零电压矢量V0 、V7…

约束求解器方案设计

1.约束求解介绍 给定一个几何对象(点、直线段、圆、圆弧、平面等)的集合G和一个关于集合G中几何对象之间约束(点的位置、直线段的长度、圆弧对应的圆心角角度、垂直、相切等) 的集合C,则在二元组(G,C)中根…

入门Ansible常用模块

自动化运维Devops-Ansible Ansible是新出现的自动化运维工具,基于Python 开发,集合了众多运维工具(puppet 、cfengine、chef、func、fabric)的优点,实现了批量系统配置 、批量程序部署、批量运行命令 等功能。Ansible…

58-DOS与DDOS分析(正常TCP会话与SYN Flood攻击、ICMP Flood 攻击、SNMP放大攻击等)

目录 正常 TCP 会话与 SYN Flood 攻击 1、正常的三次握手过程: 2、 SYN Flood 攻击 一、攻击windows系统: 二、攻击web网站 : 拒绝服务攻击工具-Hping3-Syn Flood 攻击 拒绝服务攻击工具--Hping3--ICMP Flood 攻击 sockstress攻击 So…

不在枯燥用第三方库简化你的编程之路

简介: Python作为一种多用途的编程语言,得益于其丰富的第三方库和框架,极大地拓展了其功能和应用领域.这些工具不仅提升了开发效率,也使得Python在各个领域展现出色的表现. 今天我们就来聊一聊Python 第三方库是由第三方开发者编写并共享的库,可用于扩展 Python 的…

Unity【入门】光源、物理、音效系统

核心系统 文章目录 核心系统1、光源系统基础1、光源组件2、光面板相关 2、物理系统之碰撞检测1、刚体 RigidBody2、碰撞器 Collider3、物理材质4、碰撞检测函数5、刚体加力 3、音效系统1、音频文件导入2、音频源和音频监听器脚本3、代码控制音频源4、麦克风输入相关 1、光源系统…

智慧办公新篇章:可视化技术引领园区管理革命

随着科技的飞速发展,办公方式也在经历着前所未有的变革。在这个信息爆炸的时代,如何高效、智能地管理办公空间,成为了每个企业和园区管理者面临的重要课题。 智慧办公园区作为未来办公的新趋势,以其高效、便捷、智能的特点&#x…

Redis 7.x 系列【2】单机部署

有道无术,术尚可求,有术无道,止于术。 本系列Redis 版本 7.2.5 源码地址:https://gitee.com/pearl-organization/study-redis-demo 文章目录 1. Windows2. Linux 1. Windows Redis作为一个高性能的内存数据库,和Linu…