KAFKA 同步和异步消息的发送(开发实战)

文章目录

          • 一、消费者监听
            • 1. 启动zk
            • 2. 启动kafka
            • 3. 创建主题
            • 4. 消费者监听消息
          • 二、生产者工程
            • 2.1. 依赖
            • 2.2. 生产者代码(同步)
            • 2.3. 生产者代码(异步)
            • 2.4. 发送消息
            • 2.5. 消费者监听消息
            • 2.6. 结果返回

一、消费者监听
1. 启动zk
zkServer.sh start# 监听运行状态
zkServer.sh status
2. 启动kafka
# 后台启动kafka
kafka-server-start.sh -daemon /app/kafka_2.12-2.8.0/config/server.properties
3. 创建主题
# 创建一个主题名称为topic_1  该主题分区1个分区 ,该分区有1个副本
kafka-topics.sh --zookeeper localhost:2181/mykafka --create --topic topic_1 --partitions 1 --replication-factor 1
4. 消费者监听消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1

在这里插入图片描述

二、生产者工程
2.1. 依赖
         <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.1</version></dependency>
2.2. 生产者代码(同步)
package com.gblfy.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class MyProducer1 {public static void main(String[] args) throws ExecutionException, InterruptedException {Map<String, Object> configs = new HashMap<String, Object>();//指定初始化连接用到的broker地址configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.102:9092");//指定key序列化类configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);//指定value序列化类configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//构造生产者对象 指定发送的key和value的类型 配置的参数列表(必填参数+辅助参数)KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);//用于设置用户自定义的消息头字段List<Header> headers = new ArrayList<>();headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));//构造record封装发送消息主体ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic_1", //指定发送主题0,//指定发送分区0,//指定发送key"hello gblfy 0",//指定发送消息主题headers//用于设置用户自定义的消息头字段);//消息的同步确认 调用send方法发送消息final Future<RecordMetadata> future = producer.send(record);//调用get方法接收消息final RecordMetadata metadata = future.get();System.out.println("消息的主题:" + metadata.topic());System.out.println("消息的分区:" + metadata.partition());System.out.println("消息的偏移量:" + metadata.offset());//关闭生产者producer.close();}
}
2.3. 生产者代码(异步)
package com.gblfy.kafka.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class MyProducer1 {public static void main(String[] args) throws ExecutionException, InterruptedException {Map<String, Object> configs = new HashMap<String, Object>();//指定初始化连接用到的broker地址configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.102:9092");//指定key序列化类configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);//指定value序列化类configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//构造生产者对象 指定发送的key和value的类型 配置的参数列表(必填参数+辅助参数)KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);//用于设置用户自定义的消息头字段List<Header> headers = new ArrayList<>();headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));//构造record封装发送消息主体ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic_1", //指定发送主题0,//指定发送分区0,//指定发送key"hello gblfy 0",//指定发送消息主题headers//用于设置用户自定义的消息头字段);//消息的同步确认 调用send方法发送消息final Future<RecordMetadata> future = producer.send(record);//调用get方法接收消息// final RecordMetadata metadata = future.get();// System.out.println("消息的主题:" + metadata.topic());// System.out.println("消息的分区:" + metadata.partition());// System.out.println("消息的偏移量:" + metadata.offset());//消息的异步确认producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("消息的主题:" + metadata.topic());System.out.println("消息的分区:" + metadata.partition());System.out.println("消息的偏移量:" + metadata.offset());} else {System.out.println("异常消息");}}});//关闭生产者producer.close();}
}
2.4. 发送消息

消息有同步发送和异步发送二种

在这里插入图片描述

2.5. 消费者监听消息

在这里插入图片描述

2.6. 结果返回

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/517317.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何通过自动增加索引,实现数据库查询耗时降低50%?

作者 | 利开园责编 | Carol封图 | CSDN 下载自视觉中国很多开发者都遇到类似这样的经历&#xff1a;一个产品功能开发测试都正常&#xff0c;发布上线后也正常&#xff0c;但是过一段后&#xff0c;如果有个活动或流量一大程序就突然卡了&#xff0c;也有可能流量正常也没搞活动…

重磅下载 | 核心系统100%上云,揭秘双11背后的云原生实践

2019 双11&#xff0c;订单创新峰值达到 54.4 万笔/秒&#xff0c;单日数据处理量达到 970PB&#xff0c;面对世界级的流量洪峰&#xff0c;今年的阿里交出了一份亮眼的云原生技术成绩单&#xff0c;并实现了100% 核心应用以云原生的方式上云&#xff1a; 双11 基础设施 100% …

./mysqld: error while loading shared libraries: libnuma.so.1: cannot open shared object file: No suc

./mysqld: error while loading shared libraries: libnuma.so.1: cannot open shared object file: No such file or directory解决方案&#xff1a; yum -y install numactl

MongoDB与阿里云达成战略合作,最新数据库独家上线阿里云!

11月26日&#xff0c;开源数据库厂商MongoDB与阿里云在北京达成战略合作&#xff0c;作为合作的第一步&#xff0c;最新版MongoDB 4.2数据库产品正式上线阿里云平台。 目前阿里云成为全球唯一可提供最新版MongoDB服务的云厂商&#xff0c;双方合作打通了企业在云上使用最新版开…

程序员:我受够了!不想再在小厂里干Java了!

你是否熟悉这样的情形&#xff1a;每天10点到公司&#xff0c;打开电脑&#xff1a;10个小时的增删改查&#xff0c;搬砖写代码的一天就这样开始了。刚毕业时候的你踌躇满志&#xff0c;按照自己的原定计划&#xff0c;这时候应该混到了阿里P6。可现在在小厂苦苦挣扎&#xff0…

AnalyticDB for MySQL技术架构解析

企业数据需求不断变化&#xff0c;近年来变化趋势日益明显&#xff0c;从数据的3V特性看&#xff1a;体积&#xff0c;速度和变化&#xff1b;Big Data强调数据量&#xff0c;PB级以上&#xff0c;是静态数据。而Fast Data在数据量的基础上&#xff0c;意味着速度和和变化&…

双十一|又快又稳!闲鱼实时事件规则计算驱动平台

闲鱼双十一金鳞抽奖玩法 相信今年在11月7日-11月11日期间使用过闲鱼的用户&#xff0c;可能已经被如下图所示的幸运海星“砸”到过了。只要用户进入到指定的几个页面&#xff0c;或者在某些指定的页面有点击行为&#xff0c;就会触发到这样一个幸运之星。这就是今年闲鱼双十一…

“编程能力差的程序员,90%会输在这点上”谷歌AI专家:其实都是瞎努力

最近几年&#xff0c;我看过市面上很多 Python和人工智能的教程和书籍&#xff0c;它们大都这样讲&#xff1a;先从 Python 人工智能的历史讲起开始&#xff0c;再介绍的基本语法规则&#xff0c;Python 的 list, dict, tuple 等数据结构&#xff0c;最后学习机器学习、深度学习…

阿里科学家再获世界级荣誉,平头哥首席科学家谢源当选AAASFellow

11月27日&#xff0c;美国科学促进会&#xff08;AAAS&#xff09;公布了2019年度会士&#xff08;Fellow&#xff09;增选结果&#xff0c;阿里巴巴平头哥首席科学家、达摩院高级研究员谢源当选&#xff0c;这也是信息、计算和通信领域新当选的24名Fellow之一&#xff0c;一同…

开放下载!从RCNN到SSD,这应该是最全的一份目标检测算法盘点

导读&#xff1a;从简单的图像分类到3D姿势识别&#xff0c;计算机视觉从来不缺乏有趣的问题和挑战。通过肉眼我们可以检测出一张宠物照中的猫和狗&#xff0c;可以识别出梵高作品《星夜》中的星星和月亮&#xff0c;那如何通过算法赋予机器“看”的智能&#xff0c;就是我们接…

全网最详细TCP参数讲解,再也不用担心没有面试机会了......

作者 | 小林coding责编 | 王晓曼封图 | CSDN 下载自视觉中国前言TCP 性能的提升不仅考察 TCP 的理论知识&#xff0c;还考察了对于操作系统提供的内核参数的理解与应用。TCP 协议是由操作系统实现&#xff0c;所以操作系统提供了不少调节 TCP 的参数。Linux TCP 参数如何正确有…

图片的缩放与拖拽

这个图片的缩放的流畅度还是很好的&#xff0c;需要引入touch.js,好像是百度团队那边写的 <script src"./js/touch.min.js" type"text/javascript"></script> $(function() { //放大缩小var scaleVal 1;var initialScale scaleVal || …

为了帮助卖家成交,闲鱼工程师做了些什么?

引言 闲鱼是一个C2C平台&#xff0c;提高卖家活跃度不仅有利于成交的提升&#xff0c;对于用户增长也有积极意义。而其中的关键点就在于其成交的效率。而个人卖家由于其专业程度不如专业卖家&#xff0c;成交效率往往并不高。我们希望可以实现两个提升&#xff1a; 能帮助卖家…

TOP互联网公司都在用,为什么SRE比传统运维更抢手?

阿里妹导读&#xff1a;双11的完美收官&#xff0c;2684亿的销售奇迹及顺滑极致的客户体验让双11背后的技术再次被推到风头浪尖。而双11技术热点话题&#xff0c;不得不提集团核心系统100%上云这一技术创举。 作为集团上云的底座产品&#xff0c;ECS承担了集团上云基础设施的重…

***error*** (zip#Browse) unzip not available on your system

文章目录1. 修改jar配置文件2. 现象3. 解决方法1. 修改jar配置文件 vim xxx.jar2. 现象 用不同用户打开&#xff0c;效果是不一样的&#xff0c;下图分别是 root账号、普通用户打开的 root账号显示异常还不明显&#xff0c;切换成普通用户后发现就很明显了&#xff0c;原来…

帅爆了!3个月0基础转型头条数据分析师,他做对了什么?

年初的黑天鹅打乱了我的求职阵脚&#xff0c;专业不对口&#xff0c;无实习经验&#xff0c;在求职路上的竞争优势几乎为0&#xff0c;然而&#xff0c;开启自救模式后&#xff0c;我顺利成为了头条数据分析师&#xff0c;下面我就讲讲人生是怎么开挂的。随着人工智能普及&…

淘宝如何保障业务稳定性——诺亚(Noah)自适应流控

作者|哲良、八风、泽彬 出品|阿里巴巴新零售淘系技术部 诺亚(Noah) 自适应流控解决方案 基于自动控制算法&#xff0c;解决了人工限流配置疏漏或过时的痛点&#xff0c;大幅提升应用抵抗流量冲击的能力。在刚过去的双11中&#xff0c;诺亚(Noah)保障了大量业务应用系统&#x…

倒计时1天 | 张钹院士领衔,AI开发者大会20大论坛全攻略!

2020年7月3—4日&#xff0c;由 CSDN 主办的第三届 AI 开发者大会&#xff08;AI ProCon 2020&#xff09;&#xff08;大会官网&#xff1a;https://aiprocon.csdn.net/&#xff09;将以线上直播的形式与大家相见。本次大会历时2天&#xff0c;一次性设立6大主题、20大精彩分论…

使用html5+的plus调起相机拍照,使用canvas压缩图片,转成base64传到后台

html代码&#xff1a; <div class"form-com door"><label for"">门头照&#xff1a;</label><a href"javascript:void(0);" onclick"getImage(1)"><img id"img1" class"img" src&quo…

1亿人点赞的晚会,如何做技术沉淀?

阿里妹导读&#xff1a;今年是双11的第11年&#xff0c;猫晚的第5年。今年的天猫双11狂欢夜(简称“猫晚”)有超200个国家和地区通过优酷APP观看猫晚直播。5144万人通过猫晚公益直播间观看明星卖农货&#xff0c;网友在淘宝直播间点赞1亿次&#xff0c;海外艺人参与的节目超过了…