post发送byte数组_KAFKA消息发送

消息发送的整体架构

3de5d3fe08705f4255949fb2ac6b8215.png

RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认值为 33554432B,即32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer 的 send() 方法调用要么被阻塞,要么抛出异常,这个取决于参数 max.block.ms 的配置,此参数的默认值为60000,即60秒。

Kafka是通过broker中未确认的消息数来判断broker的负载的.未确认的消息数越多则负载越高.Sender线程通过InFlightRequests来缓存已经发出去但还没有收到响应的请求,具体形式为Map.

4e97c753a971473778de45efc0865cce.png

消息有序性

Kafka 可以保证同一个分区中的消息是有序的。如果生产者按照一定的顺序发送消息,那么这些消息也会顺序地写入分区,进而消费者也可以按照同样的顺序消费它们。

如果将acks参数配置为非零值,并且max.in.flight.requests.per.connection 参数配置为大于1的值,那么就会出现错序的现象:如果第一批次消息写入失败,而第二批次消息写入成功,那么生产者会重试发送第一批次的消息,此时如果第一批次的消息写入成功,那么这两个批次的消息就出现了错序。一般而言,在需要保证消息顺序的场合建议把参数 max.in.flight.requests.per.connection配置为1,而不是把 acks 配置为0,不过这样也会影响整体的吞吐。

max.in.flight.requests.per.connection = 1 限制客户端在单个连接上能够发送的未响应请求的个数(也就是客户端与 Node 之间的连接)。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。注意:设置此参数是为了避免消息乱序

消息发送的三种模式

  • 发后即忘(fire-and-forget,不保证消息到达broker,会丢消息)

  • 同步(sync,同步发送,一条发完才发送下一条,每次都会返回Future值或抛异常,如果是可重试的异常,那么如果配置了retries参数则可自动重试)

  • 异步(async,会有一个回调函数来通知消息的处理结果是成功还是异常)

同步代码

try {    Future<RecordMetadata> future = producer.send(record);    //阻塞获取结果,然后才能下一条发送    RecordMetadata metadata = future.get();    System.out.println(metadata.topic() + "-" +metadata.partition() + ":" + metadata.offset());} catch (ExecutionException | InterruptedException e) {    //常见的可重试异常有:NetworkException、LeaderNotAvailableException、    //UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。    //对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。    //不可重试异常如LeaderNotAvailableException ,RecordTooLargeException则是直接抛异常}

异步代码

public class KafkaAsyncSender{    private static final Logger logger   = LoggerFactory.getLogger(KafkaAsyncSender.class);      //KafkaProducer 而言,它是线程安全的  private Producer producer;    @Autowired    private UdpSerializer udpSerializer;  @Value("${kafka_connect_string}")  private String kafkaConnectString;    private Cache<String, Integer> cache;    private KafkaTopicPartitionMapper mapper;     @PostConstruct    public void init() {        Properties props = new Properties();        props.put("metadata.broker.list", kafkaConnectString.trim());        props.put("bootstrap.servers", kafkaConnectString.trim());        props.put("producer.type", "async");//消息发送类型同步(sync)还是异步(async将本地buffer)        props.put("compression.codec", "none");//消息的压缩格式,默认为none不压缩,gzip, snappy, lz4        生产者发送消息之后,只要分区的 leader 副本成功写入消息,那么它就会收到来自服务端的成功响应        props.put("request.required.acks", "1");         //发送失败后重试的次数,允许重试        //如果 max.in.flight.requests.per.connection 设置不为1,可能会导致乱序        props.put("message.send.max.retries", 3);//失败重试次数        props.put("retry.backoff.ms", 100);//重试间隔        props.put("queue.buffering.max.ms", 10);//缓存数据的最大时间间隔        props.put("batch.num.messages", 1000);//缓存数据的最大条数        //限制生产者客户端能发送的消息的最大值,默认值为1048576B,即1MB,需注意broker端的message.max.bytes        props.put("max.request.size", 1024 * 1024);        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");        this.producer = new KafkaProducer<String, String>(props);        this.mapper = new KafkaTopicPartitionMapper(this.producer);        this.cache = CacheBuilder.newBuilder().refreshAfterWrite(1, TimeUnit.SECONDS)            .build(this.mapper);            }    //将被下面的handle类调用  public boolean sendMsg(final String topic, Object body, Callback callback) {         //直接发送bytes数组     if(body instanceof byte[]){       ProducerRecord <String,byte[]>record = new ProducerRecord<String,byte[]>(topic,(byte[])body);       producer.send(record);                 }else if(body instanceof BinlogEventInfo){                //对象类型的消息            final BinlogEventInfo binlogEventInfo = (BinlogEventInfo)body;            Integer num = null;               //获取发送到kafka的key,这里是使用guava缓存了key            String cacheKey = genCacheKey(topic, binlogEventInfo);        num = this.cache.getIfPresent(cacheKey);        if(num == null){          try {            num = this.mapper.load(cacheKey);          } catch (Exception e) {            logger.error("load kafka partition cache exception :", e);          }          if(num == null){            this.cache.put(cacheKey, Integer.MIN_VALUE);          } else {            this.cache.put(cacheKey, num);          }        }                       //构造发送的消息体,注意序列化是使用Byte序列化,没使用默认的String            ProducerRecord <String,byte[]>record = null;            if(num == Integer.MIN_VALUE || num == null){                     //获取key失败,不使用key的构造发送发送数据              if(logger.isDebugEnabled()){                logger.debug("get partition fail , send to {}, info {}" , topic, JsonUtils.toJson((binlogEventInfo)));                }              record = new ProducerRecord<String,byte[]>(topic,udpSerializer.serialize(binlogEventInfo));              } else {                     //根据key指定到哪一个分区的发送              if(logger.isDebugEnabled()){                logger.debug("send to {}, partition {},  info {}" , topic, num,  JsonUtils.toJson((binlogEventInfo)));                }              //这里有三个可能影响到分区数的因素 : 1.直接指定分区数 2,直接指定key 3.无任何指定                     //在直接指定了分区数的情况下,那么将直接发送往此分区                     //若分区数未指定,但key指定了,那么因为计算的hash值一样,那么相同的key也会发送到一样的分区                     //若都未指定,则直接轮询分区来发送消息              record = new ProducerRecord<String,byte[]>(topic, num , null ,udpSerializer.serialize(binlogEventInfo));            }                    producer.send(record, callback);          }    return true;  }  private String genCacheKey(String topic, BinlogEventInfo binlogEventInfo) {    return topic + "-" + binlogEventInfo.getHost() + "-" + binlogEventInfo.getSchemaName() + "-" + binlogEventInfo.getTableName();  }}

Mapper的查询分区

public class KafkaTopicPartitionMapper extends CacheLoader<String, Integer>{  private Producer producer;    public KafkaTopicPartitionMapper(Producer producer){    this.producer = producer;  }  @Override  //格式   topic-host-database-table  public Integer load(String key) throws Exception {    try{      String[] arr = key.split("-");      int hash = this.hash(key);      List list = this.producer.partitionsFor(arr[0]);      int psize = list.size();      if(psize == 0){        return null;      } else {                  //根据哈希值 % 分区数        return parlist.get(Math.abs(hash % psize));      }      } catch(Exception e){      return null;    }  }      //计算hash值  private  int hash(Object key) {        int h;        return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);    }  }

异步的消息回调

//kafka消息管理类,发送消息以及回调处理public class KafkaQueueChannelHandler extends AsyncQueueChannelHandler{  private KafkaAsyncSender sender;    public KafkaQueueChannelHandler(){    super("kafka");    this.sender = SpringContextUtil.getBean(KafkaAsyncSender.class);  }    public KafkaQueueChannelHandler(String identity) {    super(identity);    this.sender = SpringContextUtil.getBean(KafkaAsyncSender.class);  }  @Override  public void sendMessage(BinlogEventInfo info, DeliverInfo deliverInfo) {         //发送主题消息的时候设置回调    if(!sender.sendMsg(deliverInfo.getSendTopic(), info, new KafkaCallback(info, deliverInfo))) {      this.stopDeliverAndNotify(info,Constants.SENDTYPE_KAFKA,"",deliverInfo.getSendTopic());    }  }  @Override  public void sendMessageInner(BinlogEventInfo info, DeliverInfo deliverInfo) {    this.sender.sendMsg(deliverInfo.getSendTopic(), info, new KafkaCallback(info, deliverInfo));  }  private class KafkaCallback implements Callback{    private BinlogEventInfo info;    private DeliverInfo deliverInfo;    public KafkaCallback(BinlogEventInfo info, DeliverInfo deliverInfo) {      this.info = info;      this.deliverInfo = deliverInfo;    }        //异步回调方法    @Override    public void onCompletion(RecordMetadata metadata, Exception exception) {            //onCompletion() 方法的两个参数是互斥的,消息发送成功时,metadata 不为 null 而 exception 为 null;            //消息发送异常时,metadata 为 null 而 exception 不为 null。      try{        if(metadata!=null){          logger.info("kafka回调信息:topic=【{}】,partition=【{}】,offset=【{}】,发送内容=【{}】,exception=【{}】,",metadata.topic(),metadata.partition(),              metadata.offset(),JSON.toJSONString(info),exception);        }else{          logger.info("kafka回调信息:发送内容=【{}】,exception=【{}】,",JSON.toJSONString(info),exception);        }        if(exception!=null){          stopDeliverAndNotify( info,Constants.SENDTYPE_KAFKA,"", this.deliverInfo.getSendTopic());        } else {                      //将位置信息保存到内存,异步更新到数据库          updatePosition(info, this.deliverInfo);        }      }catch(Exception e){        logger.error("KafkaQueueChannelHandler kafka回调处理失败:发送内容=【{}】,exception=【{}】",JSON.toJSONString(info),e);        stopDeliverAndNotify( info,Constants.SENDTYPE_KAFKA,"", this.deliverInfo.getSendTopic());      }    }      }    }

生产者拦截器

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。

生产者拦截器的使用也很方便,主要是自定义实现 org.apache.kafka.clients.producer. ProducerInterceptor 接口。ProducerInterceptor 接口中包含3个方法:

//KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的 onSend() 方法来对消息进行相应的定制化操作。public ProducerRecordonSend(ProducerRecord record);//KafkaProducer会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的onAcknowledgement() 方法,//优先于用户设定的 Callback 之前执行。public void onAcknowledgement(RecordMetadata metadata, Exception exception);public void close();

添加生产者拦截器

//此参数默认值为""properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,        ProducerInterceptorPrefix.class.getName());

实现

//接口的这3个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。public class ProducerInterceptorPrefix implements ProducerInterceptor<String,String>{    @Override    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {        //更改消息内容        String modifiedValue = "prefix1-" + record.value();        return new ProducerRecord<>(record.topic(),                 record.partition(), record.timestamp(),                record.key(), modifiedValue, record.headers());    }    @Override    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}    @Override    public void close() {}    @Override    public void configure(Map<String, ?> map) {}}

参考链接

https://juejin.im/book/5c7d467e5188251b9156fdc0/section/5c7d5391f265da2db7183fe5

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/486103.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

未来的趋势发展 802.11v网络协议解析

目前的无线网络中&#xff0c;一个基站通常与拥有最强信号的接入点联系在一起。但是&#xff0c;这个接入点也许过载了。在802.11v标准中&#xff0c;包括了一个指令&#xff0c;接入点能够使用这个指令要求一个基站报告它支持的无线电信道、传输的功率范围、数据速率和支持的身…

从特斯拉到爱因斯坦,物理学家为何钟情于猫

牛顿和猫洞的故事&#xff0c;图片来自3milliondogs.com来源&#xff1a;数学中国尽管我们看到了大量的猫被用作实验对象。面对科学发展过程中这黑暗一面&#xff0c;我有必要强调一下&#xff0c;纵观历史&#xff0c;许多物理学家与他们的猫的关系要友好得多&#xff0c;猫成…

win10远程桌面连接凭据怎么设置_想在家办公,只需打开win10远程桌面连接就可以了,还犹豫什么...

远程桌面连接是一种使用户能够坐在一台计算机前连接到其他位置的“远程计算机”的技术。例如&#xff0c;用户可以从家庭计算机连接到工作计算机&#xff0c;并访问所有程序、文件和网络资源&#xff0c;就好像坐在工作计算机前一样。另外&#xff0c;用户可以让程序在工作计算…

Java的加载与执行

Java的运行包含两个特别重要的阶段&#xff1a;编译阶段&#xff0c;运行阶段。 编译阶段&#xff1a;编译阶段的主要任务是检查Java源程序是否符合Java语法。 符合Java语法则能够生成正常的字节码文件&#xff08;xxxx.class&#xff09; 不符合Java语法规则则无法生成字节码…

费曼:所有的科学知识都是不确定的

编辑 ∑Gemini来源:设计与哲学一、不存在决定什么是好概念的权威 观察是一个概念是否含有真理的判官&#xff0c;但这个概念从何而来的呢&#xff1f;科学的快速进步和发展要求人类发明出一些东西用以检验。在中世纪&#xff0c;人们认为只要多做观察&#xff0c;观察结果本身就…

Java--PATH环境变量

JDK安装成功后&#xff0c;对源程序首先需要进行的就是编译。 在DOS环境下输入javac&#xff0c;会出现以下提示&#xff1a;javac 不是内部或外部命令&#xff0c;也不是可运行的程序 或批处理文件。 出现这个问题的原因是&#xff1a;Windows操作系统无法找到javac命令文件…

RISC-V会被卡吗?那么你觉得C语言会不会被卡? | 包云岗

来源&#xff1a; 风云之声本文是对于知乎问题《从技术的角度来看&#xff0c;RISC-V 能对芯片发展、科技自主起到哪些作用&#xff1f;》的回答&#xff08;https://www.zhihu.com/question/425542531/answer/1607862976&#xff09;。一、关于RISC-V的一些解读看到很多回答都…

easyui 调用dialog中的方法_SolidWorks中标准件库的创建及调用方法

在使用SolidWorks进行产品设计时&#xff0c;常用的标准件&#xff08;如螺栓、螺母、垫圈等&#xff09;通常可以在安装了SolidWorksToolbox插件后调出使用&#xff0c;而许多标准件在Toolbox并不存在&#xff0c;不能从插件中直接调用。在用到这些零件时&#xff0c;设计人员…

【测试设计】基于正交法的测试用例设计工具--PICT

前言 我们都知道成对组合覆盖是一种非常有效的测试用例设计方法&#xff0c;但是实际工作过程中当成对组合量太大&#xff0c;我们往往很难做到有效的用例覆盖。 PICT是微软公司出品的一款成对组合命令行生成工具,它很好的解决了上述的难题。使用它我们可以有效地按照两两测试的…

Alpha fold: 人工智能在蛋白质结构预测上跑赢人类的启示

来自孙卫涛科学网博客2020年12月&#xff0c;Alpha Fold2在CASP14上 获得了惊人的进步&#xff0c;其蛋白质结构预测的能力已经达到了与实验方法相媲美的程度&#xff0c;国内外该领域的专家学者都为之惊叹&#xff0c;同时也都感受到巨大的压力&#xff0c;人工智能首次把一个…

卷积神经网络爬虫实现新闻在线分类系统

卷积神经网络&&爬虫实现网易新闻自动爬取并分类 项目地址 采用THUCnews全部数据集进行训练&#xff0c;效果如下。 详细实现见./text_classification 部署步骤如下&#xff1a; 运行环境 服务器&#xff1a;Ubuntu 16.04 数据库&#xff1a;Mysql 5.6 python&#xf…

搞基础理论研究有什么用?

来源&#xff1a;数学中国人类文明的诞生是一个奇迹&#xff0c;构筑在现代科学技术基础之上的现代人类文明的诞生更是奇迹中的奇迹。这个奇迹中的奇迹的根基是现代技术及其广泛应用&#xff0c;而现代技术的根基则是现代科学&#xff0c;科学的根基是以数学为主要工具的基础科…

由内而外:大脑是如何形成感官记忆的

大数据文摘出品来源&#xff1a;sciencedaily编译&#xff1a;张大笔茹通常&#xff0c;大脑会对我们感官收集的信息进行编码。为了感知环境并与之进行建设性的互动&#xff0c;这些感官信号需要在以往的经验和当前目标的背景下进行解释。最新一期的《科学》杂志上&#xff0c;…

oracle clob截取_Oracle数据库设计规范建议

Oracle-数据库设计规范建议来源于项目资料目的本规范的主要目的是希望规范数据库设计&#xff0c;尽量提前避免由于数据库设计不当而产生的麻烦&#xff1b;同时好的规范&#xff0c;在执行的时候可以培养出好的习惯&#xff0c;好的习惯是软件质量的很好的保证。数据库设计是指…

揭秘美国空军如何用AI技术提升“战斗力”

以AI技术为基础&#xff0c;美国空军正努力将自身转化为更强调协作性的组织。来源丨Forbes作者丨Kathleen Walch编译丨科技行者通过增加数据规模与相关素养提升&#xff0c;美国空军各部门及人员&#xff0c;将建立起更强的决策、战略、任务执行以及网络安全保障效率与能力。以…

数字孪生:如何撑起一个万亿市场的产业变革?

来源&#xff1a; 脑极体 今天我们介绍一个在产业界如火如荼&#xff0c;但大众还非常陌生的概念&#xff1a;数字孪生&#xff08;Digital Twin&#xff09;。在解释这一晦涩难懂的概念前&#xff0c;我首先想到了一个人&#xff0c;前苏联著名的昆虫学家、数学家和哲学家——…

二、python框架相关知识体系

Django框架 1、django框架、flask框架和Tornado框架的区别&#xff1f; django框架&#xff0c;内置组件多&#xff0c;自身功能强大&#xff0c;是一个大而全的框架&#xff0c;ORM、Admin、中间件、Form、ModelFrom、信号、缓存、csrf等flask框架&#xff0c;内置组件少&…

Android跟web哪个好,比系统自带的WebView更好用 | AgentWeb

名称AgentWeb语言Android平台GitHub作者Justson在混合化开发大行其道的今天&#xff0c;安卓开发经常会用到WebView&#xff0c;用于加载网页。系统自带的WebView性能和流畅度都一般&#xff0c;今天给大家推荐一款第三方WebView&#xff0c;性能比系统自带的要好&#xff0c;功…

神经科学如何影响人工智能?看DeepMind在NeurIPS2020最新《神经科学人工智能》报告,126页ppt...

来源&#xff1a;专知Jane Wang是DeepMind神经科学团队的一名研究科学家&#xff0c;研究元强化学习和受神经科学启发的人工智能代理。她的背景是物理、复杂系统、计算和认知神经科学。Kevin Miller是DeepMind神经科学团队的研究科学家&#xff0c;也是伦敦大学学院的博士后。他…

科普长文揭秘生命为何会具有主观能动性

来源&#xff1a;混沌巡洋舰动物的免疫系统依赖于被称为巨噬细胞的白细胞吞噬并吞噬入侵者。这些细胞有着坚定的决心和热情: 在显微镜下&#xff0c;你可以看到一个像球状的巨噬细胞在玻璃片上追逐一个细菌&#xff0c;它的猎物试图通过红细胞的障碍物逃跑时&#xff0c;而在它…