【Kafka框架代码实践 一】简易的生产者消费者模式

用多线程来简易的实现一个消息队列的生产者消费者模式:10个线程生成数字消息投递到消息队列,10个线程做数字消息的消费者,消费生产者投递的消息。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class ProducerConsumerDemo {private static final int QUEUE_CAPACITY = 50;private static final int PRODUCER_COUNT = 10;private static final int CONSUMER_COUNT = 10;public static void main(String[] args) {BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);// 创建生产者线程for (int i = 0; i < PRODUCER_COUNT; i++) {new Thread(new Producer(queue), "Producer-" + i).start();}// 创建消费者线程for (int i = 0; i < CONSUMER_COUNT; i++) {new Thread(new Consumer(queue), "Consumer-" + i).start();}}
}class Producer implements Runnable {private final BlockingQueue<Integer> queue;private static int count = 0;public Producer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) {int number = produce();queue.put(number);System.out.println(Thread.currentThread().getName() + " produced: " + number);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}private int produce() {return count++;}
}class Consumer implements Runnable {private final BlockingQueue<Integer> queue;public Consumer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) {int number = queue.take();consume(number);System.out.println(Thread.currentThread().getName() + " consumed: " + number);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}private void consume(int number) {// 消费数字的逻辑}
}

BlockingQueue 是 Java 中一个用于线程间通信的接口,它具有以下几个重要特性:

  1. 阻塞操作

    • BlockingQueue 提供了阻塞的 puttake 操作,这意味着如果队列满了,调用 put 方法的线程会被阻塞,直到队列有空闲位置;如果队列空了,调用 take 方法的线程会被阻塞,直到队列中有新元素被加入。
    • 这使得 BlockingQueue 特别适用于生产者-消费者模式。
  2. 线程安全

    • BlockingQueue 本质上是线程安全的,所有的插入、移除和检查操作都是原子的,并且使用了内部锁和其他同步机制来保证线程安全。
  3. 不同的实现

    • BlockingQueue 接口有多种实现,每种实现具有不同的特性和用途:
      • ArrayBlockingQueue:一个有界的阻塞队列,内部实现是数组。
      • LinkedBlockingQueue:一个可选有界的阻塞队列,内部实现是链表。
      • PriorityBlockingQueue:一个无界的阻塞优先队列,内部元素按优先级排序。
      • DelayQueue:一个无界的阻塞队列,其中元素只有在其延迟期满时才能被取走。
      • SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的对应移除操作,反之亦然。
      • LinkedTransferQueue:一个无界的 TransferQueue,它在没有消费者时不会积压生产者的数据。
  4. 容量控制

    • BlockingQueue 可以有界,也可以无界。有界队列在创建时可以指定容量,控制队列中的元素数量,以避免过多的内存消耗。
    • putoffer 操作可以被用来添加元素到队列,put 操作在队列满时会阻塞,而 offer 操作则会在队列满时返回 false
  5. 无等待队列操作

    • BlockingQueue 还提供了一些无等待的队列操作,例如 polloffer,它们不会阻塞当前线程,而是在特定条件不满足时立即返回。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class BlockingQueueExample {public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); // 有界队列,容量为5// 生产者线程Thread producer = new Thread(() -> {try {for (int i = 0; i < 10; i++) {queue.put(i); // 如果队列满了,会阻塞System.out.println("Produced: " + i);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});// 消费者线程Thread consumer = new Thread(() -> {try {while (true) {Integer item = queue.take(); // 如果队列空了,会阻塞System.out.println("Consumed: " + item);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});producer.start();consumer.start();producer.join();consumer.join();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/45881.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue 前端项目调用后端接口记录

axios中不同的类型的请求附带数据使用的关键字 请求类型关键字示例GETparamsaxios({ method: get, url: example.com, params: { key: value } })POSTdataaxios({ method: post, url: example.com, data: { key: value } })PUTdataaxios({ method: put, url: example.com, dat…

MICCAI 2024 每日一篇论文 纯纯直读 CUTS:用于多粒度无监督医学图像分割的深度学习和拓扑框架

MICCAI 2024 CUTS: A Deep Learning and Topological Framework for Multigranular Unsupervised Medical Image Segmentation CUTS: 用于多粒度无监督医学图像分割的深度学习和拓扑框架 作者 陈璐1*、Matthew Amodio1*、梁博伦.沈2、冯高3、阿曼阿维斯塔4、Sanjay Aneja3,5…

Linux 命令集

修改主机名/关机/重启 1&#xff09;hostnamectl 命令 $ hostnamectl # 查看操作系统信息&#xff08;内核、操作系统发行版本、主机名等&#xff09; $ hostnamectl set-hostname redhatu8 # 修改主机名2&#xff09;shutdown 关机 $ shutdown -h now # 马上关机3&#…

C++9 构造析构

class Int { private:int value; public:Int(int x 0) :value(x){cout << value << endl;}~Int(){cout << value << endl;}void SetValue(int x )value x;}}; 构造函数可以使用&#xff1a;进行初始化&#xff0c;而初始化只能进行一次&#xff0c;…

[RuoYi-Vue] - 1. 项目搭建

文章目录 &#x1f42c;初始化后端项目拉取RuoYi-Vue代码Maven构建导入数据库ry-vue修改配置信息启动Redis启动项目 &#x1f30c;初始化前端项目拉取RuoYi-Vue3代码项目运行成功页面 &#x1f42c;初始化后端项目 拉取RuoYi-Vue代码 若依/RuoYi-Vue 代码地址 Maven构建 导入数…

7.2 AQS原理

AQS 原理 概述 全称是 AbstractQueuedSynchronizer&#xff0c;是阻塞式锁和相关的同步器工具的框架。 特点&#xff1a; 用 state 属性来表示资源的状态&#xff08;分独占模式和共享模式&#xff09;&#xff0c;子类需要定义如何维护这个状态&#xff0c;控制如何获取锁和…

Postman 中的 API 安全性测试:最佳实践与技巧

在当今快速发展的数字化世界中&#xff0c;API&#xff08;应用程序编程接口&#xff09;已成为软件系统之间通信的桥梁。然而&#xff0c;随着API使用的增加&#xff0c;安全风险也随之上升。本文将详细介绍如何在 Postman 中进行 API 的安全性测试&#xff0c;帮助开发者和测…

Mybatis SQL注解使用场景

MyBatis 提供了几种常用的注解&#xff0c;主要用于简化 XML 映射文件的编写&#xff0c;使得 SQL 查询和操作可以直接在 Java 接口中定义。下面列出了主要的注解以及它们在被调用时的写法示例&#xff1a; 1. Select Select 注解用于执行查询操作&#xff0c;并将查询结果映…

LruCache、Glide和SmartRefreshLayout使用总结

&#xff08;一&#xff09;Android智能下拉刷新框架-SmartRefreshLayout https://github.com/scwang90/SmartRefreshLayout?tabreadme-ov-file &#xff08;二&#xff09;LruCache使用 使用它可以进行图片的内存缓存 public class ImageLoaderUtil {private LruCache<St…

three.js官方案例(animation / skinning / ik)webgl_animation_skinning_ik.html学习记录

目录 1 WebGLCubeRenderTarget 2 TransformControls 3 CCDIKSolver 4 CCDIKHelper 4 全部脚本 1 WebGLCubeRenderTarget 球体亮 //WebGLCubeRenderTarget(size : Number, options : Object) //size - the size, in pixels. Default is 1. //options - (可选)一个保存…

软件设计之Java入门视频(15)

软件设计之Java入门视频(15) 视频教程来自B站尚硅谷&#xff1a; 尚硅谷Java入门视频教程&#xff0c;宋红康java基础视频 相关文件资料&#xff08;百度网盘&#xff09; 提取密码&#xff1a;8op3 idea 下载可以关注 软件管家 公众号 学习内容&#xff1a; 该视频共分为1-7…

Java中的CountDownLatch详解

Java中的CountDownLatch详解 大家好&#xff0c;我是微赚淘客系统3.0的小编&#xff0c;是个冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 一、什么是CountDownLatch&#xff1f; CountDownLatch 是Java并发包中的一个工具类&#xff0c;用于实现线程间的等待…

Linux桌面溯源

X窗口系统(X Window System) Linux起源于X窗口系统&#xff08;X Window System&#xff09;&#xff0c;亦即常说的X11&#xff0c;因其版本止于11之故。 X窗口系统&#xff08;X Window System&#xff0c;也常称为X11或X&#xff09;是一种以位图方式显示的软件窗口系统。…

zabbix 7.0 SNMP Hex数据预处理新功能

一、简介 zabbix7.0新特性是监控项新增支持SNMP Hex数据预处理。其中内置了对snmp请求结果Hex转换处理&#xff0c;不再需要使用繁琐的方式&#xff0c;如javascript脚本、替换、修整等方式处理将监控项取值做可视化处理&#xff0c;大福提升SNMP采集获取到Hex数据的处理效率。…

windows安全加固

window安全加固-账号与口令管理 1、账号分配检查 名称 账号分配检查&#xff0c;避免共享账号与无用账号存在 实施目的 根据系统的要求&#xff0c;设定不同的账户和账户组&#xff0c;管理员用户&#xff0c;数据库用户&#xff0c;审计用户&#xff0c;来宾用户等&#xf…

浅析 VO、DTO、DO、PO 的概念

文章目录 I 浅析 VO、DTO、DO、PO1.1 概念1.2 模型1.3 VO与DTO的区别I 浅析 VO、DTO、DO、PO 1.1 概念 VO(View Object) 视图对象,用于展示层,它的作用是把某个指定页面(或组件)的所有数据封装起来。DTO(Data Transfer Object): 数据传输对象,这个概念来源于J2EE的设…

C++《日期》实现

C《日期》实现 头文件实现文件 头文件 在该文件中是为了声明函数和定义类成员 using namespace std; class Date {friend ostream& operator<<(ostream& out, const Date& d);//友元friend istream& operator>>(istream& cin, Date& d);//…

利率上升,利率债价格下跌但信用债价格上涨的理论分析

利率上升&#xff0c;利率债价格下跌但信用债价格上涨的理论分析 在利率上升的环境下&#xff0c;通常会观察到利率债价格下跌&#xff0c;而有时信用债价格却可能上涨。以下是对这种现象的详细理论分析。 利率债和信用债的价格机制 利率债价格下跌 利率债定义&#xff1a;…

flask+mysql入门案例

在 Flask 中集成 MySQL 数据库进行用户管理是一个常见的项目需求。下面将提供一个基础的步骤和代码示例来理解如何从零开始搭建这样一个系统。 1. 环境准备 首先确保你已经安装了 Python 和必要的包。你需要安装 Flask 和用于连接 MySQL 的包 Flask-SQLAlchemy 或者 Flask-My…

【C++】优先级队列(底层代码解释)

一. 定义 优先级队列是一个容器适配器&#xff0c;他可以根据不同的需求采用不同的容器来实现这个数据结构&#xff0c;优先级队列采用了堆的数据结构&#xff0c;默认使用vector作为容器&#xff0c;且采用大堆的结构进行存储数据。 &#xff08;1&#xff09;在第一个构造函数…