C++无锁队列的原理与实现

目录

1.无锁队列原理

1.1.队列操作模型

1.2.无锁队列简介

1.3.CAS操作

2.无锁队列方案

2.1.boost方案

2.2.ConcurrentQueue

2.3.Disruptor

3.无锁队列实现

3.1.环形缓冲区

3.2.单生产者单消费者

3.3.多生产者单消费者

3.4.RingBuffer实现

3.5.LockFreeQueue实现

4.kfifo内核队列

4.1.kfifo内核队列简介

4.2.kfifo内核队列实现

4.3.kfifo设计要点


1.无锁队列原理

1.1.队列操作模型

队列是一种非常重要的数据结构,其特性是先进先出(FIFO),符合流水线业务流程。在进程间通信、网络通信间经常采用队列做缓存,缓解数据处理压力。根据操作队列的场景分为:单生产者——单消费者、多生产者——单消费者、单生产者——多消费者、多生产者——多消费者四大模型。根据队列中数据分为:队列中的数据是定长的、队列中的数据是变长的。

(1)单生产者——单消费者

(2)多生产者——单消费者

(3)单生产者——多消费者

(4)多生产者——多消费者

(5)数据定长队列

(6)数据变长队列

1.2.无锁队列简介

生产环境中广泛使用生产者和消费者模型,要求生产者在生产的同时,消费者可以进行消费,通常使用互斥锁保证数据同步。但线程互斥锁的开销仍然比较大,因此在要求高性能、低延时场景中,推荐使用无锁队列。

1.3.CAS操作

CAS即Compare and Swap,是所有CPU指令都支持CAS的原子操作(X86中CMPXCHG汇编指令),用于实现实现各种无锁(lock free)数据结构。

CAS操作的C语言实现如下:

bool compare_and_swap ( int *memory_location, int expected_value, int new_value)
{if (*memory_location == expected_value){*memory_location = new_value;return true;}return false;
}

CAS用于检查一个内存位置是否包含预期值,如果包含,则把新值复赋值到内存位置。成功返回true,失败返回false。

(1)GGC对CAS支持

GCC4.1+版本中支持CAS原子操作。

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...);type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);

(2)Windows对CAS支持

Windows中使用Windows API支持CAS。

LONG InterlockedCompareExchange(LONG volatile *Destination,LONG          ExChange,LONG          Comperand
);

(3)C++11对CAS支持

   C++11 STL中atomic函数支持CAS并可以跨平台。

template< class T >
bool atomic_compare_exchange_weak( std::atomic* obj,T* expected, T desired );template< class T >
bool atomic_compare_exchange_weak( volatile std::atomic* obj,T* expected, T desired );

其它原子操作如下:

Fetch-And-Add:一般用来对变量做+1的原子操作

Test-and-set:写值到某个内存位置并传回其旧值

2.无锁队列方案

2.1.boost方案

boost提供了三种无锁方案,分别适用不同使用场景。

boost::lockfree::queue是支持多个生产者和多个消费者线程的无锁队列。

boost::lockfree::stack是支持多个生产者和多个消费者线程的无锁栈。

boost::lockfree::spsc_queue是仅支持单个生产者和单个消费者线程的无锁队列,比boost::lockfree::queue性能更好。

Boost无锁数据结构的API通过轻量级原子锁实现lock-free,不是真正意义的无锁。

Boost提供的queue可以设置初始容量,添加新元素时如果容量不够,则总容量自动增长;但对于无锁数据结构,添加新元素时如果容量不够,总容量不会自动增长。

2.2.ConcurrentQueue

ConcurrentQueue是基于C++实现的工业级无锁队列方案。

GitHub:https://github.com/cameron314/concurrentqueue

ReaderWriterQueue是基于C++实现的单生产者单消费者场景的无锁队列方案。

GitHub:https://github.com/cameron314/readerwriterqueue

2.3.Disruptor

Disruptor是英国外汇交易公司LMAX基于JAVA开发的一个高性能队列。

GitHub:https://github.com/LMAX-Exchange/disruptor

3.无锁队列实现

3.1.环形缓冲区

RingBuffer是生产者和消费者模型中常用的数据结构,生产者将数据追加到数组尾端,当达到数组的尾部时,生产者绕回到数组的头部;消费者从数组头端取走数据,当到达数组的尾部时,消费者绕回到数组头部。

如果只有一个生产者和一个消费者,环形缓冲区可以无锁访问,环形缓冲区的写入index只允许生产者访问并修改,只要生产者在更新index前将新的值保存到缓冲区中,则消费者将始终看到一致的数据结构;读取index也只允许消费者访问并修改,消费者只要在取走数据后更新读index,则生产者将始终看到一致的数据结构。

空队列时,front与rear相等;当有元素进队,则rear后移;有元素出队,则front后移。

空队列时,rear等于front;满队列时,队列尾部空一个位置,因此判断循环队列满时使用(rear-front+maxn)%maxn。

入队操作:

data[rear] = x;rear = (rear+1)%maxn;

出队操作:

x = data[front];front = (front+1)%maxn;

3.2.单生产者单消费者

对于单生产者和单消费者场景,由于read_index和write_index都只会有一个线程写,因此不需要加锁也不需要原子操作,直接修改即可,但读写数据时需要考虑遇到数组尾部的情况。

线程对write_index和read_index的读写操作如下:

(1)写操作。先判断队列时否为满,如果队列未满,则先写数据,写完数据后再修改write_index。

(2)读操作。先判断队列是否为空,如果队列不为空,则先读数据,读完再修改read_index。

3.3.多生产者单消费者

多生产者和单消费者场景中,由于多个生产者都会修改write_index,所以在不加锁的情况下必须使用原子操作。

3.4.RingBuffer实现

RingBuffer.hpp文件:

#pragma oncetemplate <class T>
class RingBuffer
{
public:RingBuffer(unsigned size): m_size(size), m_front(0), m_rear(0){m_data = new T[size];}~RingBuffer(){delete [] m_data;m_data = NULL;}inline bool isEmpty() const{return m_front == m_rear;}inline bool isFull() const{return m_front == (m_rear + 1) % m_size;}bool push(const T& value){if(isFull()){return false;}m_data[m_rear] = value;m_rear = (m_rear + 1) % m_size;return true;}bool push(const T* value){if(isFull()){return false;}m_data[m_rear] = *value;m_rear = (m_rear + 1) % m_size;return true;}inline bool pop(T& value){if(isEmpty()){return false;}value = m_data[m_front];m_front = (m_front + 1) % m_size;return true;}inline unsigned int front()const{return m_front;}inline unsigned int rear()const{return m_rear;}inline unsigned int size()const {return m_size;}
private:unsigned int m_size;// 队列长度int m_front;// 队列头部索引int m_rear;// 队列尾部索引T* m_data;// 数据缓冲区
};

RingBufferTest.cpp测试代码:

#include <stdio.h>
#include <thread>
#include <unistd.h>
#include <sys/time.h>
#include "RingBuffer.hpp"class Test
{
public:Test(int id = 0, int value = 0){this->id = id;this->value = value;sprintf(data, "id = %d, value = %d\n", this->id, this->value);}void display(){printf("%s", data);}
private:int id;int value;char data[128];
};double getdetlatimeofday(struct timeval *begin, struct timeval *end)
{return (end->tv_sec + end->tv_usec * 1.0 / 1000000) -(begin->tv_sec + begin->tv_usec * 1.0 / 1000000);
}RingBuffer<Test> queue(1 << 12);2u000#define N (10 * (1 << 20))void produce()
{struct timeval begin, end;gettimeofday(&begin, NULL);unsigned int i = 0;while(i < N){if(queue.push(Test(i % 1024, i))){i++;}}gettimeofday(&end, NULL);double tm = getdetlatimeofday(&begin, &end);printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
}void consume()
{sleep(1);Test test;struct timeval begin, end;gettimeofday(&begin, NULL);unsigned int i = 0;while(i < N){if(queue.pop(test)){// test.display();i++;}}gettimeofday(&end, NULL);double tm = getdetlatimeofday(&begin, &end);printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f, size=%u \n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
}int main(int argc, char const *argv[])
{std::thread producer1(produce);std::thread consumer(consume);producer1.join();consumer.join();return 0;
}

编译:

g++ --std=c++11  RingBufferTest.cpp -o test -pthread

单生产者单消费者场景下,消息吞吐量为350万条/秒左右。

3.5.LockFreeQueue实现

LockFreeQueue.hpp:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdbool.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/mman.h>#define SHM_NAME_LEN 128
#define MIN(a, b) ((a) > (b) ? (b) : (a))
#define IS_POT(x) ((x) && !((x) & ((x)-1)))
#define MEMORY_BARRIER __sync_synchronize()template <class T>
class LockFreeQueue
{
protected:typedef struct{int m_lock;inline void spinlock_init(){m_lock = 0;}inline void spinlock_lock(){while(!__sync_bool_compare_and_swap(&m_lock, 0, 1)) {}}inline void spinlock_unlock(){__sync_lock_release(&m_lock);}} spinlock_t;public:// size:队列大小// name:共享内存key的路径名称,默认为NULL,使用数组作为底层缓冲区。LockFreeQueue(unsigned int size, const char* name = NULL){memset(shm_name, 0, sizeof(shm_name));createQueue(name, size);}~LockFreeQueue(){if(shm_name[0] == 0){delete [] m_buffer;m_buffer = NULL;}else{if (munmap(m_buffer, m_size * sizeof(T)) == -1) {perror("munmap");}if (shm_unlink(shm_name) == -1) {perror("shm_unlink");}}}bool isFull()const{
#ifdef USE_POTreturn m_head == (m_tail + 1) & (m_size - 1);
#elsereturn m_head == (m_tail + 1) % m_size;
#endif}bool isEmpty()const{return m_head == m_tail;}unsigned int front()const{return m_head;}unsigned int tail()const{return m_tail;}bool push(const T& value){
#ifdef USE_LOCKm_spinLock.spinlock_lock();
#endifif(isFull()){
#ifdef USE_LOCKm_spinLock.spinlock_unlock();
#endifreturn false;}memcpy(m_buffer + m_tail, &value, sizeof(T));
#ifdef USE_MBMEMORY_BARRIER;
#endif#ifdef USE_POTm_tail = (m_tail + 1) & (m_size - 1);
#elsem_tail = (m_tail + 1) % m_size;
#endif#ifdef USE_LOCKm_spinLock.spinlock_unlock();
#endifreturn true;}bool pop(T& value){
#ifdef USE_LOCKm_spinLock.spinlock_lock();
#endifif (isEmpty()){
#ifdef USE_LOCKm_spinLock.spinlock_unlock();
#endifreturn false;}memcpy(&value, m_buffer + m_head, sizeof(T));
#ifdef USE_MBMEMORY_BARRIER;
#endif#ifdef USE_POTm_head = (m_head + 1) & (m_size - 1);
#elsem_head = (m_head + 1) % m_size;
#endif#ifdef USE_LOCKm_spinLock.spinlock_unlock();
#endifreturn true;}protected:virtual void createQueue(const char* name, unsigned int size){
#ifdef USE_POTif (!IS_POT(size)){size = roundup_pow_of_two(size);}
#endifm_size = size;m_head = m_tail = 0;if(name == NULL){m_buffer = new T[m_size];}else{int shm_fd = shm_open(name, O_CREAT | O_RDWR, 0666);if (shm_fd < 0){perror("shm_open");}if (ftruncate(shm_fd, m_size * sizeof(T)) < 0){perror("ftruncate");close(shm_fd);}void *addr = mmap(0, m_size * sizeof(T), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);if (addr == MAP_FAILED){perror("mmap");close(shm_fd);}if (close(shm_fd) == -1){perror("close");exit(1);}m_buffer = static_cast<T*>(addr);memcpy(shm_name, name, SHM_NAME_LEN - 1);}
#ifdef USE_LOCKspinlock_init(m_lock);
#endif}inline unsigned int roundup_pow_of_two(size_t size){size |= size >> 1;size |= size >> 2;size |= size >> 4;size |= size >> 8;size |= size >> 16;size |= size >> 32;return size + 1;}
protected:char shm_name[SHM_NAME_LEN];volatile unsigned int m_head;volatile unsigned int m_tail;unsigned int m_size;
#ifdef USE_LOCKspinlock_t m_spinLock;
#endifT* m_buffer;
};

#define USE_LOCK

开启spinlock锁,多生产者多消费者场景

#define USE_MB

开启Memory Barrier

#define USE_POT

开启队列大小的2的幂对齐

LockFreeQueueTest.cpp测试文件:

#include "LockFreeQueue.hpp"
#include <thread>//#define USE_LOCKclass Test
{
public:Test(int id = 0, int value = 0){this->id = id;this->value = value;sprintf(data, "id = %d, value = %d\n", this->id, this->value);}void display(){printf("%s", data);}
private:int id;int value;char data[128];
};double getdetlatimeofday(struct timeval *begin, struct timeval *end)
{return (end->tv_sec + end->tv_usec * 1.0 / 1000000) -(begin->tv_sec + begin->tv_usec * 1.0 / 1000000);
}LockFreeQueue<Test> queue(1 << 10, "/shm");#define N ((1 << 20))void produce()
{struct timeval begin, end;gettimeofday(&begin, NULL);unsigned int i = 0;while(i < N){if(queue.push(Test(i >> 10, i)))i++;}gettimeofday(&end, NULL);double tm = getdetlatimeofday(&begin, &end);printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
}void consume()
{Test test;struct timeval begin, end;gettimeofday(&begin, NULL);unsigned int i = 0;while(i < N){if(queue.pop(test)){//test.display();i++;}}gettimeofday(&end, NULL);double tm = getdetlatimeofday(&begin, &end);printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
}int main(int argc, char const *argv[])
{std::thread producer1(produce);//std::thread producer2(produce);std::thread consumer(consume);producer1.join();//producer2.join();consumer.join();return 0;
}

多线程场景下,需要定义USE_LOCK宏,开启锁保护。

编译:

g++ --std=c++11 -O3 LockFreeQueueTest.cpp -o test -lrt -pthread

4.kfifo内核队列

4.1.kfifo内核队列简介

kfifo是Linux内核的一个FIFO数据结构,采用环形循环队列的数据结构来实现,提供一个无边界的字节流服务,并且使用并行无锁编程技术,即单生产者单消费者场景下两个线程可以并发操作,不需要任何加锁行为就可以保证kfifo线程安全。

4.2.kfifo内核队列实现

kfifo数据结构定义如下:

struct kfifo
{unsigned char *buffer;unsigned int size;unsigned int in;unsigned int out;spinlock_t *lock;
};// 创建队列
struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{struct kfifo *fifo;// 判断是否为2的幂BUG_ON(!is_power_of_2(size));fifo = kmalloc(sizeof(struct kfifo), gfp_mask);if (!fifo)return ERR_PTR(-ENOMEM);fifo->buffer = buffer;fifo->size = size;fifo->in = fifo->out = 0;fifo->lock = lock;return fifo;
}// 分配空间
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{unsigned char *buffer;struct kfifo *ret;// 判断是否为2的幂if (!is_power_of_2(size)){BUG_ON(size > 0x80000000);// 向上扩展成2的幂size = roundup_pow_of_two(size);}buffer = kmalloc(size, gfp_mask);if (!buffer)return ERR_PTR(-ENOMEM);ret = kfifo_init(buffer, size, gfp_mask, lock);if (IS_ERR(ret))kfree(buffer);return ret;
}void kfifo_free(struct kfifo *fifo)
{kfree(fifo->buffer);kfree(fifo);
}// 入队操作
static inline unsigned int kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len)
{unsigned long flags;unsigned int ret;spin_lock_irqsave(fifo->lock, flags);ret = __kfifo_put(fifo, buffer, len);spin_unlock_irqrestore(fifo->lock, flags);return ret;
}// 出队操作
static inline unsigned int kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len)
{unsigned long flags;unsigned int ret;spin_lock_irqsave(fifo->lock, flags);ret = __kfifo_get(fifo, buffer, len);//当fifo->in == fifo->out时,buufer为空if (fifo->in == fifo->out)fifo->in = fifo->out = 0;spin_unlock_irqrestore(fifo->lock, flags);return ret;
}// 入队操作
unsigned int __kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len)
{unsigned int l;//buffer中空的长度len = min(len, fifo->size - fifo->in + fifo->out);// 内存屏障:smp_mb(),smp_rmb(), smp_wmb()来保证对方观察到的内存操作顺序smp_mb();// 将数据追加到队列尾部l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);memcpy(fifo->buffer, buffer + l, len - l);smp_wmb();//每次累加,到达最大值后溢出,自动转为0fifo->in += len;return len;
}// 出队操作
unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len)
{unsigned int l;//有数据的缓冲区的长度len = min(len, fifo->in - fifo->out);smp_rmb();l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);memcpy(buffer + l, fifo->buffer, len - l);smp_mb();fifo->out += len; //每次累加,到达最大值后溢出,自动转为0return len;
}static inline void __kfifo_reset(struct kfifo *fifo)
{fifo->in = fifo->out = 0;
}static inline void kfifo_reset(struct kfifo *fifo)
{unsigned long flags;spin_lock_irqsave(fifo->lock, flags);__kfifo_reset(fifo);spin_unlock_irqrestore(fifo->lock, flags);
}static inline unsigned int __kfifo_len(struct kfifo *fifo)
{return fifo->in - fifo->out;
}static inline unsigned int kfifo_len(struct kfifo *fifo)
{unsigned long flags;unsigned int ret;spin_lock_irqsave(fifo->lock, flags);ret = __kfifo_len(fifo);spin_unlock_irqrestore(fifo->lock, flags);return ret;
}

4.3.kfifo设计要点

(1)保证buffer size为2的幂

kfifo->size值在调用者传递参数size的基础上向2的幂扩展,目的是使kfifo->size取模运算可以转化为位与运算(提高运行效率)。kfifo->in % kfifo->size转化为 kfifo->in & (kfifo->size – 1)

保证size是2的幂可以通过位运算的方式求余,在频繁操作队列的情况下可以大大提高效率。

(2)使用spin_lock_irqsave与spin_unlock_irqrestore 实现同步。

Linux内核中有spin_lock、spin_lock_irq和spin_lock_irqsave保证同步。

static inline void __raw_spin_lock(raw_spinlock_t *lock)
{preempt_disable();spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}static inline void __raw_spin_lock_irq(raw_spinlock_t *lock)
{local_irq_disable();preempt_disable();spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

spin_lock比spin_lock_irq速度快,但并不是线程安全的。spin_lock_irq增加调用local_irq_disable函数,即禁止本地中断,是线程安全的,既禁止本地中断,又禁止内核抢占。

spin_lock_irqsave是基于spin_lock_irq实现的一个辅助接口,在进入和离开临界区后,不会改变中断的开启、关闭状态。

如果自旋锁在中断处理函数中被用到,在获取自旋锁前需要关闭本地中断,spin_lock_irqsave实现如下:

A、保存本地中断状态;

B、关闭本地中断;

C、获取自旋锁。

解锁时通过 spin_unlock_irqrestore完成释放锁、恢复本地中断到原来状态等工作。

(3)线性代码结构

代码中没有任何if-else分支来判断是否有足够的空间存放数据,kfifo每次入队或出队只是简单的 +len 判断剩余空间,并没有对kfifo->size 进行取模运算,所以kfifo->in和kfifo->out总是一直增大,直到unsigned in超过最大值时绕回到0这一起始端,但始终满足:kfifo->in - kfifo->out <= kfifo->size。

(4)使用Memory Barrier

mb():适用于多处理器和单处理器的内存屏障。

rmb():适用于多处理器和单处理器的读内存屏障。

wmb():适用于多处理器和单处理器的写内存屏障。

smp_mb():适用于多处理器的内存屏障。

smp_rmb():适用于多处理器的读内存屏障。

smp_wmb():适用于多处理器的写内存屏障。

Memory Barrier使用场景如下:

A、实现同步原语(synchronization primitives)

B、实现无锁数据结构(lock-free data structures)

C、驱动程序

程序在运行时内存实际访问顺序和程序代码编写的访问顺序不一定一致,即内存乱序访问。内存乱序访问行为出现是为了提升程序运行时的性能。内存乱序访问主要发生在两个阶段:

A、编译时,编译器优化导致内存乱序访问(指令重排)。

B、运行时,多CPU间交互引起内存乱序访问。

Memory Barrier能够让CPU或编译器在内存访问上有序。Memory barrier前的内存访问操作必定先于其后的完成。Memory Barrier包括两类:

A、编译器Memory Barrier。

B、CPU Memory Barrier。

通常,编译器和CPU引起内存乱序访问不会带来问题,但如果程序逻辑的正确性依赖于内存访问顺序,内存乱序访问会带来逻辑上的错误。

在编译时,编译器对代码做出优化时可能改变实际执行指令的顺序(如GCC的O2或O3都会改变实际执行指令的顺序)。

在运行时,CPU虽然会乱序执行指令,但在单个CPU上,硬件能够保证程序执行时所有的内存访问操作都是按程序代码编写的顺序执行的,Memory Barrier没有必要使用(不考虑编译器优化)。为了更快执行指令,CPU采取流水线的执行方式,编译器在编译代码时为了使指令更适合CPU的流水线执行方式以及多CPU执行,原本指令就会出现乱序的情况。在乱序执行时,CPU真正执行指令的顺序由可用的输入数据决定,而非程序员编写的顺序。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/635173.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

实现仿ChatGPT光标跟随效果

先看效果 实现效果 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>光标闪烁效果</title>…

网络安全需要对网络风险有独特的理解

迷失在翻译中&#xff1a;网络风险解释的脱节现实 在古印度的一个经典故事中&#xff0c;几个蒙住眼睛的人接近一头大象&#xff0c;每个人检查不同的部位。有人触摸树干&#xff0c;认为它像一条蛇。另一个摸到了一条腿&#xff0c;认为它是一棵树。还有一个拿着象牙的人&…

Java中打印图案最常用的25个图案程序

Java是公认的最流行的编程语言&#xff0c;因为它的简单性和多功能性。还可以使用它开发各种应用程序&#xff0c;包括Web、移动和桌面应用程序。此外&#xff0c;Java为开发人员提供了强大的工具来轻松高效地创建复杂的程序。Java最有前途的特性之一是它能够创建可以以特定格式…

《向量数据库指南》——为什么说向量数据库是更适合AI体质的“硬盘”

其“AI原生”的体质,具体表现在几个方面: 1.更高的效率。 AI算法,要从图像、音频和文本等海量的非结构化数据中学习,提取出以向量为表示形式的“特征”,以便模型能够理解和处理。因此,向量数据库比传统基于索引的数据库有明显优势。 2.更低的成本。 大模型要从一种新…

【stm32】hal库学习笔记-GPIO按键控制LED和蜂鸣器(超详细!)

【stm32】hal库学习笔记-GPIO按键控制LED和蜂鸣器 注&#xff1a;本学习笔记基于stm32f4系列 使用的开发板为正点原子stmf407ZGT6探索者开发板 GPIO引脚使用时&#xff0c;可输入或输出数字信号 例如: 检测按键输入信号&#xff08;Read_Pin&#xff09;输出信号&#xff08;W…

flink operator 拉取阿里云私有镜像(其他私有类似)

创建 k8s secret kubectl --namespace flink create secret docker-registry aliyun-docker-registry --docker-serverregistry.cn-shenzhen.aliyuncs.com --docker-usernameops_acr1060896234 --docker-passwordpasswd --docker-emailDOCKER_EMAIL注意命名空间指定你使用的 我…

Linux:多线程

目录 1.线程的概念 1.1线程的理解 1.2进程的理解 1.3线程如何看待进程内部的资源? 1.4进程 VS 线程 2.线程的控制 2.1线程的创建 2.2线程的等待 2.3线程的终止 2.4线程ID 2.5线程的分离 3.线程的互斥与同步 3.1相关概念 3.2互斥锁 3.2.1概念理解 3.2.2操作理解…

分类预测 | Matlab实现WOA(海象)-XGboost分类【24年新算法】基于海象优化算法(WOA)优化XGBoost的数据分类预测

分类预测 | Matlab实现WOA(海象)-XGboost分类【24年新算法】基于海象优化算法(WOA)优化XGBoost的数据分类预测 目录 分类预测 | Matlab实现WOA(海象)-XGboost分类【24年新算法】基于海象优化算法(WOA)优化XGBoost的数据分类预测分类效果基本描述程序设计参考资料 分类效果 基本…

模型的召回率(Recall)

召回率&#xff08;Recall&#xff09;&#xff0c;也称为灵敏度&#xff08;Sensitivity&#xff09;或真正例率&#xff08;True Positive Rate&#xff09;&#xff0c;是用于评估二分类模型性能的指标之一。召回率衡量了模型正确识别正例的能力&#xff0c;即在所有实际正例…

ctfshow php特性(web89-web101)

目录 web89 web90 web91 web92 web93 web94 web95 web96 web97 web98 web99 web100 web101 php特性(php基础知识) web89 <?php include("flag.php"); highlight_file(_FILE_);if(isset($_GET[num])){$num$_GET[num];if(preg_match("/[0-9]/&…

Docker项目部署()

1.创建文件夹tools mkdir tools 配置阿里云 Docker Yum 源 : yum install - y yum - utils device - mapper - persistent - data lvm2 yum - config - manager -- add - repo http://mirrors.aliyun.com/docker- ce/linux/centos/docker - ce.repo 更新 yum 缓存 yum makec…

Kafka-消费者-KafkaConsumer分析-PartitionAssignor

Leader消费者在收到JoinGroupResponse后&#xff0c;会按照其中指定的分区分配策略进行分区分配&#xff0c;每个分区分配策略就是一个PartitionAssignor接口的实现。图是PartitionAssignor的继承结构及其中的组件。 PartitionAssignor接口中定义了Assignment和Subscription两个…

三国游戏(寒假每日一题+贪心、枚举)

题目 小蓝正在玩一款游戏。 游戏中魏蜀吴三个国家各自拥有一定数量的士兵 X,Y,Z&#xff08;一开始可以认为都为 0&#xff09;。 游戏有 n 个可能会发生的事件&#xff0c;每个事件之间相互独立且最多只会发生一次&#xff0c;当第 i个事件发生时会分别让 X,Y,Z 增加 Ai,Bi…

什么是低代码(Low-Code)?低代码平台的适用人群

低代码平台是一种革命性的工具&#xff0c;它让非专业的开发人员也能轻松创建应用程序。通过直观的可视化界面和拖放功能&#xff0c;开发人员能够轻松地构建和部署应用程序&#xff0c;无需专业的编程知识。低代码平台的出现&#xff0c;降低了应用程序开发的门槛&#xff0c;…

100天精通鸿蒙从入门到跳槽——第8天:TypeScript 知识储备:泛型

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通Golang》…

火速收藏!2024 新年微信红包封面领取全攻略

2024“龙”重登场&#xff01;今年有哪些令人期待的红包封面&#xff1f; 前方大批精美红包封面来袭&#xff0c;全新品牌氛围红包封面上线&#xff0c;支持品牌定制特色氛围元素&#xff0c;沉浸感受浓浓年味儿&#xff0c;收获满满惊喜&#xff01; 新年开好运&#xff0c;微…

C# .NET读取Excel文件并将数据导出到DataTable、数据库及文本

Excel文件是存储表格数据的普遍格式&#xff0c;因此能够高效地读取和提取信息对于我们来说至关重要。C#语言借助.NET Framework和各种库的广泛功能&#xff0c;能够进行高效的数据操作。利用C#读取Excel文件并将数据写入数据库和DataTable&#xff0c;或者将数据用于其他目的&…

终于懂了!医师资格证和医师执业证有啥区别

医师资格证和医师执业证的区别&#xff08;总结篇&#xff09; 1、发证单位不一样: 《医师资格证》是由国家卫生部统一发放的。 《医师执业证书》是你获得了医师资格证书后申请由当地卫生局发的。 2、意义不一样: 《医师资格证》属于医疗技术方面的认可&#xff0c;证明持证人具…

【Linux】信号量基于环形队列的生产消费模型

信号量 信号量的本质是一个计数器&#xff0c;可以用来衡量临界资源中资源数量多少 信号量的PV操作 P操作&#xff1a;申请信号量称为P操作&#xff0c;P操作的本质就是让计数器减1。 V操作&#xff1a;释放信号量称为V操作&#xff0c;V操作的本质就是让计数器加1 POSIX信号量…

[C#]winform部署官方yolov8-obb旋转框检测的onnx模型

【官方框架地址】 https://github.com/ultralytics/ultralytics 【算法介绍】 Yolov8-obb&#xff08;You Only Look Once version 8 with Oriented Bounding Boxes&#xff09;是一种先进的对象检测算法&#xff0c;它在传统的Yolov3和Yolov4基础上进行了优化&#xff0c;加…