简介
消息队列是一种应用间的通讯方式,消息发送后可以立即放回,由消息系统来确保消息的可靠传递。消息发布者只需要将消息发布到消息队列中,而不需要管谁来取。消息使用者只管从消息队列中取消息而不管谁发布的。这样发布者和使用者都不同知道对方的存在。
消息队列普遍使用在生产者和消费者模型中。
- 优点
- 应用解耦: 应用之间不用那么多的同步调用,发消息到消息队列就行,消费者可以自己消费,消费生产者不用管了,降低应用之间的耦合。
- 降低延时: 应用之间用同步调用,需要等待对方响应,等待时间比较长,用消息之后,发送消息到消息队列就行,应用就可以返回了,对客户来讲降低了应用延时。
- 削峰填谷:请求比较多的时候,应用处理不过来,会丢弃请求;请求比较少时,应用不饱和。
请求比较多时,把请求放到消息队列,消费者按特定处理速度来处理,请求少时,也让应用有事情可以做;能做到忙时不丢请求,闲时不闲置应用资源。
主流的消息队列:Kafka、ActiveMQ、RabbitMQ、RocketMQ
下面使用C++实现一个简单的消息队列。
具体实现
- 消息队列类
里面主要包含一个数组和一个队列,都保存消息。
应用从数组中拿消息处理,当数组满的时候,消息保存到队列中,当数组中消息处理完,从队列中取消息,再处理。
数组实现的是一个环形数组,记录下消息写游标和读游标,超过数组大小,对数组大小取余。
//消息长度和消息
typedef std::pair<size_t, char*> msgPair;
#define MsgQueueSize 102400class zMsgQueue
{//消息对,first表示是否存放消息typedef std::pair<bool, msgPair> msgQueue;
public:zMsgQueue();~zMsgQueue();void* msgMalloc(const size_t len);void msgFree(void* p);//获得一个消息msgPair* get();//放入一个消息bool put(const void* msg, size_t msgLen);//将队列中的消息放到消息数组中bool putMsgQueue2Arr();//删除一个消息void erase();bool empty();bool msgArrEmpty();
private:void clear();// 保存正在处理的消息msgQueue msgArr_[MsgQueueSize];// 保存等待处理的消息std::queue<msgPair> msgQueue_;//消息写游标size_t queueWrite_;//消息读游标size_t queueRead_;
};
实现:
zMsgQueue::zMsgQueue()
{bzero(msgArr_, sizeof(msgArr_));queueWrite_ = 0;queueRead_ = 0;
}zMsgQueue::~zMsgQueue()
{clear();
}void* zMsgQueue::msgMalloc(const size_t len)
{char* p = (char*)malloc(len + 1);return (void*)(p + 1);
}void zMsgQueue::msgFree(void* p)
{free((char*)p - 1);
}//获得一个消息
msgPair* zMsgQueue::get()
{if(queueRead_ >= MsgQueueSize)return NULL;if(msgArrEmpty())putMsgQueue2Arr();msgPair* ret = NULL;if(msgArr_[queueRead_].first)ret = &msgArr_[queueRead_].second;return ret;
}//放入一个消息
bool zMsgQueue::put(const void* msg, size_t msgLen)
{char* buf = (char*)msgMalloc(msgLen);if(buf){bcopy(msg, buf, msgLen);//先将队列中的消息放到数组中//数组中还有位置直接放到数组中//没有位置放到队列中if(!putMsgQueue2Arr() && !msgArr_[queueWrite_].first){msgArr_[queueWrite_].first = true;msgArr_[queueWrite_].second.first = msgLen;msgArr_[queueWrite_].second.second = buf;queueWrite_++;queueWrite_ %= MsgQueueSize;}else {msgQueue_.push(std::make_pair(msgLen, buf));}return true;} return false;
}//将队列中的消息放到消息数组中
bool zMsgQueue::putMsgQueue2Arr()
{bool isLeft = false;while(!msgQueue_.empty()){if(!msgArr_[queueWrite_].first){msgArr_[queueWrite_].first = true;msgArr_[queueWrite_].second = msgQueue_.front();queueWrite_++;queueWrite_ %= MsgQueueSize;msgQueue_.pop();}else {isLeft = true;break;}}return isLeft;
}//删除一个消息
void zMsgQueue::erase()
{if(!msgArr_[queueRead_].first)return;msgFree(msgArr_[queueRead_].second.second);msgArr_[queueRead_].second.second = NULL;msgArr_[queueRead_].second.first = 0;msgArr_[queueRead_].first = false;queueRead_++;queueRead_ %= MsgQueueSize;
}void zMsgQueue::clear()
{//队列中还有消息while(putMsgQueue2Arr()){//数组中还有消息while(get()){erase();}}//数组中还有消息while(get()){erase();}
}bool zMsgQueue::empty()
{if(putMsgQueue2Arr()) return false;return msgArrEmpty();
}bool zMsgQueue::msgArrEmpty()
{if(queueRead_ == queueWrite_ && !msgArr_[queueRead_].first){return true;}return false;
}
- 消息队列的封装
对消息队列的封装主要是为了对消息进行解析和处理。
消息解析和处理函数定义成了虚函数,当需要使用消息队列并处理消息时,只需要继承消息队列,然后重写虚函数,进行对应处理即可。
类中还使用到了读写锁,当多线程的情况下,消息队列是一个临界资源,线程共享,需要进行上锁。单线程的情况下不需要加锁。
//T表示使用的消息队列
//msgT表示消息的类型,有的需要消息头,消息正文等,需要解析,这里是直接使用
template<class T=zMsgQueue, class msgT=char>
class messageQueue : public rwLocker
{
public:messageQueue(){}~messageQueue(){}bool putMsg(const msgT* msg, const size_t msgLen){rwLocker::wlock();msgQueue_.put(msg, msgLen);rwLocker::unlock();return true;}//解析消息,处理消息virtual bool msgParse(const msgT* msg, const size_t msgLen) = 0;//获取消息,解析消息,处理消息bool doCmd(){rwLocker::wlock();msgPair* msg = msgQueue_.get();while(msg){msgParse(msg->second, msg->first); msgQueue_.erase();msg = msgQueue_.get();}rwLocker::unlock();return true;}bool empty(){return msgQueue_.empty();}private:T msgQueue_;
};
- 读写锁的封装
读写锁:可以多个线程进行读,只能一个线程进行写。写时独享资源,读时共享资源。写锁的优先级高。 - 为什么读写锁需要读锁?
为了防止其他线程请求写锁。一个线程请求了读锁,其他线程在请求写锁会阻塞,但是请求读锁不会阻塞。一个线程请求了写锁,其他线程请求读锁和写锁都会阻塞。
#include <pthread.h>class rwLock
{
public:rwLock(){pthread_rwlock_init(&rwlc_, NULL);}~rwLock(){pthread_rwlock_destroy(&rwlc_);}void rlock(){pthread_rwlock_rdlock(&rwlc_);}void wlock(){pthread_rwlock_wrlock(&rwlc_);}void unlock(){pthread_rwlock_unlock(&rwlc_);}private:pthread_rwlock_t rwlc_;
};class rwLocker
{
public:void rlock(){rwlc_.rlock();}void wlock(){rwlc_.wlock();}void unlock(){rwlc_.unlock();}private:rwLock rwlc_;
};
- Makefile:
# ini1=main.cpp
# in2=messageQueue.cpp
out=main
cc=g++
std=-std=c++11 -lpthread#$(out):$(in1) $(in2)
$(out): main.cpp messageQueue.cpp rwlock.h$(cc) $^ -o $@ $(std).PHONY:clean
clean:rm -rf $(out)
- 代码测试
实现一个类继承消息队列,重写消息处理函数。
定义对象,调用doCmd函数即可。
#include "messageQueue.h"class test : public messageQueue<>
{bool msgParse(const char* msg, const size_t msgLen){std::cout << msgLen << ":" << msg << std::endl;return true;}
};int main()
{//模拟客户端发送消息char buf[256] = "hello world!";test t;//消息队列放消息t.putMsg(buf, strlen(buf));//处理消息t.doCmd();return 0;
}