架构师系列- 消息中间件(15)-kafka业务实战

7.1 顺序性场景

7.1.1 场景概述

假设我们要传输一批订单到另一个系统,那么订单对应状态的演变是有顺序性要求的。

已下单 → 已支付 → 已确认

不允许错乱!

7.1.2 顺序级别

1)全局有序:

串行化。每条经过kafka的消息必须严格保障有序性。

这就要求kafka单通道,每个groupid下单消费者

极大的影响性能,现实业务下几乎没必要

2)局部有序:

业务局部有序。同一条订单有序即可,不同订单可以并行处理。不同订单的顺序前后无所谓

充分利用kafka多分区的并发性,只需要想办法让需要顺序的一批数据进同一分区即可。

7.1.3 实现方案

1)发送端:

指定key发送,key=order.id即可,案例回顾:4.2.3,PartitionProducer

2)发送中:

给队列配置多分区保障并发性。

3)读取端:

单消费者:显然不合理

吞吐量显然上不去,kafka开多个分区还有何意义?

所以开多个消费者指定分区消费,理想状况下,每个分区配一个。

但是,这个吞吐量依然有限,那如何处理呢?

方案:多线程

在每个消费者上再开多线程,是个解决办法。但是,要警惕顺序性被打破!

参考下图:thread处理后,会将data变成 2-1-3

改进:接收后分发二级内存队列

消费者取到消息后不做处理,根据key二次分发到多个阻塞队列。

再开启多个线程,每个队列分配一个线程处理。提升吞吐量

 

 

7.1.4 代码验证

1)新建一个sort队列,2个分区

2)启动order项目

源码参考:

SortedProducer(顺序性发送端)

SortedConsumer(顺序性消费端 - 阻塞队列实现,方便大家理解设计思路)

SortedConsumer2(顺序性消费端 - 线程池实现,现实中推荐这种方式!)

 3)通过swagger请求

 

7.2 海量同步场景

假设大数据部门需要大屏来展示用户的打车订单情况,需要把订单数据送入druid

这里不涉及顺序,只要下单就传输,但是对实时性和并发量要求较高

7.2.1 常规架构

在下单完成mysql后,通过程序代码打印,直接进入kafka

或者logback和kafka集成,通过log输送

优点:

更符合常规的思维。将数据送给想要的部门

缺点:

耦合度高,将kafka发送消息嵌入了订单下单的主业务,形成代码入侵。

下单不关心,也不应该关注送入kafka的情况,一旦kafka不可用,程序受影响

7.2.2 解耦合

借助canal,监听订单表的数据变化,不再影响主业务。

 

7.2.3 部署实现

1)mysql部署注意,需要打开binlog,8.0 默认处于开启状态#启动mysql8
docker run --name mysql8 -v /opt/kafka/data/mysql8:/var/lib/mysql -p 3306:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=123456 -d daocloud.io/mysql:8.0
连上mysql,执行以下sql,添加canal用户CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
创建订单表CREATE TABLE `orders` (`id` int unsigned NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
);2)canal部署#canal.properties
#附带资料里有,放到服务器 /opt/kafka/data/canal/ 目录下
#修改servers为你的kafka的机器地址
canal.serverMode = kafka
kafka.bootstrap.servers = 192.168.10.30:10903,192.168.10.30:10904
#docker-compose.yml
#附带资料里有canal.yml,随便找个目录,重命名为docker-compose.yml
#修改mysql的链接信息的链接信息
#然后在当前目录下执行 docker-compose up -d
version: '2'
services:canal:image: canal/canal-servercontainer_name: canalrestart: alwaysports:- "10908:11111"environment:#mysql的链接信息canal.instance.master.address: 192.168.10.30:3306canal.instance.dbUsername: canalcanal.instance.dbPassword: canal#投放到kafka的哪个主题?要提前准备好!canal.mq.topic: canalvolumes:- "/opt/kafka/data/canal/canal.properties:/home/admin/canal-server/conf/canal.properties"3)数据通道验证进入kafka容器,用上面3.2.4里的命令行方式监听canal队列./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic canal在mysql上创建orders表,增删数据试一下mysql> insert into orders (name) values ('张三');
Query OK, 1 row affected (0.03 sec)在kafka控制台,可以看到同步的消息{"data":[{"id":"1","name":"张三"}],"database":"canal","es":1611657853000,"id":5,"isDdl":false,"mysqlType":{"id":"int unsigned","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"orders","ts":1611657853802,"type":"INSERT"}数据通道已打通,还缺少的是druid作为消费端来接收消息4)druid部署#druid.yml
#在附带资料里有
#随便找个目录,执行
docker-compose -f druid.yml up -d5)验证配置druid的数据源,从kafka读取数据,验证数据可以正确进入druid。

7.3 kafka监控

7.3.1 eagle简介

Kafka Eagle监控系统是一款用来监控Kafka集群的工具,支持管理多个Kafka集群、管理Kafka主题(包含查看、删除、创建等)、消费者组合消费者实例监控、消息阻塞告警、Kafka集群健康状态查看等。

7.3.2 部署

推荐docker-compose启动

将配备的资料中 eagle.yml , 拷贝到服务器任意目录

修改对应的ip地址为你服务器的地址

 

#注意ip地址:192.168.10.30,全部换成你自己服务器的version: '3'
services:zookeeper:image: zookeeper:3.4.13kafka-1:container_name: kafka-1image: wurstmeister/kafka:2.12-2.2.2ports:- 10903:9092- 10913:10913environment:KAFKA_BROKER_ID: 1 HOST_IP: 192.168.10.30KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181#docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30KAFKA_ADVERTISED_PORT: 10903 KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.10.30 -Dcom.sun.management.jmxremote.rmi.port=10913"JMX_PORT: 10913volumes:- /etc/localtime:/etc/localtimedepends_on:- zookeeper           kafka-2:container_name: kafka-2image: wurstmeister/kafka:2.12-2.2.2ports:- 10904:9092- 10914:10914environment:KAFKA_BROKER_ID: 2 HOST_IP: 192.168.10.30KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30KAFKA_ADVERTISED_PORT: 10904 KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.10.30 -Dcom.sun.management.jmxremote.rmi.port=10914"JMX_PORT: 10914volumes:- /etc/localtime:/etc/localtimedepends_on:- zookeeper eagle:image: gui66497/kafka_eaglecontainer_name: kerestart: alwaysdepends_on:- kafka-1- kafka-2ports:- "10907:8048"environment:ZKSERVER: "zookeeper:2181"

执行 docker-compose -f eagle.yml up -d

7.3.3 使用说明

访问 : http://192.168.10.30:10907/ke/

默认用户名密码: admin/ 123456

如果要删除topic等操作,需要管理token: keadmin

 

与km到底选哪个呢?

  • 界面美观程度和监控曲线优于km,有登录权限控制
  • 功能操作上不如km简单直白,但是km需要配置一定的连接信息

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/829498.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Swift中TableView的下拉刷新和上拉加载

在Swift中,可以使用UIRefreshControl来实现TableView的下拉刷新。首先,需要在ViewDidLoad方法中创建一个UIRefreshControl的实例,并添加到TableView的subview中。然后,设置refreshControl的action为TableView的下拉刷新函数&#…

windows SDK编程 --- 消息之键盘消息(4)

前置知识 一、 键盘消息 在Windows操作系统中,键盘消息是用来通知应用程序有关键盘输入事件的一种机制。当用户在键盘上进行操作,比如按键或释放键时,Windows会生成相应的消息并发送给处理这些输入的应用程序。这些消息对于开发图形用户界面…

ElasticSearch教程入门到精通——第二部分(基于ELK技术栈elasticsearch 7.x+8.x新特性)

ElasticSearch教程入门到精通——第二部分(基于ELK技术栈elasticsearch 7.x8.x新特性) 1. JavaAPI-环境准备1.1 新建Maven工程——添加依赖1.2 HelloElasticsearch 2. 索引2.1 索引——创建2.2 索引——查询2.3 索引——删除 3. 文档3.1 文档——重构3.2…

react,Chart

一、基础图:https://ant-design-charts.antgroup.com/ Ant Design Charts 1. 首先要下载ant-design/charts,然后在页面中添加如下柱状图代码: import React from react; import { Column } from ant-design/chartsconst DemoColumn: React.FC () …

Grafana- bug- User sync failed - User already exists

Grafana security release: New versions of Grafana with a critical security fix for CVE-2023-3128 Vardan Torosyan • 22 Jun, 2023 • 4 min Today we are releasing Grafana 10.0.1, 9.5.5, 9.4.13, 9.3.16, 9.2.20, and 8.5.27. Alongside other bug fixes, these pa…

百度智能云千帆 ModelBuilder 技术实践系列:通过 SDK 快速构建并发布垂域模型

​百度智能云千帆大模型平台(百度智能云千帆大模型平台 ModelBuilder)作为面向企业开发者的一站式大模型开发平台,自上线以来受到了广大开发者、企业的关注。至今已经上线收纳了超过 70 种预置模型服务,用户可以快速的调用&#x…

深入理解冯诺依曼体系结构

文章目录 冯诺依曼体系结构概念冯诺依曼体系结构的优势冯诺依曼体系结构的现实体现 冯诺依曼体系结构概念 冯诺依曼体系结构也称普林斯顿结构,是现代计算机发展的基础。它的主要特点是“程序存储,共享数据,顺序执行”,即程序指令和…

代码随想录算法训练营Day10 | 232.用栈实现队列、225. 用队列实现栈

232.用栈实现队列 题目:请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作(push、pop、peek、empty): 实现 MyQueue 类: void push(int x) 将元素 x 推到队列的末尾int pop() 从队列的开头移除…

【leetcode面试经典150题】75. 二叉树展开为链表(C++)

【leetcode面试经典150题】专栏系列将为准备暑期实习生以及秋招的同学们提高在面试时的经典面试算法题的思路和想法。本专栏将以一题多解和精简算法思路为主,题解使用C语言。(若有使用其他语言的同学也可了解题解思路,本质上语法内容一致&…

修改Docker容器内文件的三种方式

说明:本文介绍修改Docker容器内文件的三种方式 方式一:直接修改 敲下面的命令,进入Docker容器,如mysql docker exec -it mysql /bin/bash修改mysql的配置文件,/etc/my.cnf vim /etc/my.cnf如下,如果vim…

基于单片机的煤气泄漏报警系统设计

摘要:本文设计了一种基于单片机控制的煤气泄漏检测报警系统,该系统以AT89S52单片机为核心,通过气敏电阻传感器MQ-7将采样到的 一氧化碳气体转换为电信号经处理后送给单片机,单片机对获取的信号进行分析,控制声光报警系统进行声光报警。该系统可以对室内空 气中的CO含量是否…

【Hadoop】-Apache Hive使用语法与概念原理[15]

一、数据库操作 创建数据库 create database if not exists myhive; 使用数据库 use myhive; 查看数据库详细信息 desc database myhive; 数据库本质上就是在HDFS之上的文件夹。 默认数据库的存放路径是HDFS的:/user/hive/warehouse内 创建数据库并指定hdfs…

attempt to compare nil with number -- 黑马点评出现问题

问题情况 : 主要问题 : 调用lua执行redis时,有一个值会接受nil(因为redis中没有该数据)或者数值,当该值为nil时执行报错,因为会用到将该值与其他数字比较,故报错attempt to compare nil with number 当然…

生成完美口型同步的 AI 代言人视频(及其实现原理详解)

目录 什么是Heygen? Heygen注册 Video Translation(视频翻译 完美口型同步) 实现原理详解 视频翻译部分 完美口型同步部分 什么是Heygen? Heygen是一款在线工具,可帮助您生成具有完美口型同步的 AI 代言人视频。 Heygen注册 https:…

关于springboot内置tomcat最大请求数配置的一些问题

前言 springboot内置了tomcat。那么一个springboot web应用,最大的请求链接数是多少呢?很早以前就知道这个是有个配置,需要的时候,百度一下即可。但,事实并非如此,有几个问题我想大多数人还真不知道。比如…

前端学习<四>JavaScript——54-原型链

常见概念 构造函数 构造函数-扩展 原型规则和示例 原型链 instanceof 构造函数 任何一个函数都可以被 new,new 了之后,就成了构造方法。 如下: function Foo(name, age) {this.name name;this.age age;//retrun this; //默认有这…

NEFU计算机图形学实验四

编写二次插值样条曲线生成函数,然后利用该函数根据自己设计的型值点绘制出相应的曲线图形。 // erView.cpp : implementation of the CErView class //#include "stdafx.h" #include "er.h"#include "erDoc.h" #include "erVie…

大型语言模型高效推理综述

论文地址:2404.14294.pdf (arxiv.org) 大型语言模型(LLMs)由于在各种任务中的卓越表现而受到广泛关注。然而,LLM推理的大量计算和内存需求给资源受限的部署场景带来了挑战。该领域的努力已经朝着开发旨在提高LLM推理效率的技术方…

C语言递归刷题(一)

目录 走台阶题目思路代码 西格玛题目思路代码 用函数实现数的阶乘题目思路代码 digit题目思路代码 Hermite多项式题目思路代码 排列数题目思路代码 逆序输出题目思路代码 结语 走台阶 题目 描述 小乐乐上课需要走n阶台阶,因为他腿比较长,所以每次可以选…

常见的SSH功能

SSH(Secure Shell)是一种加密的网络传输协议,用于在不安全的网络中为网络服务提供安全的传输环境。SSH最初是由芬兰的一家公司开发的,现在已经成为互联网上最常用的远程登录工具之一。SSH提供了许多强大的功能,让我们能…