基于单例模式懒汉实现方式的线程池
一、LockGuard.hpp
# pragma once
# include <iostream>
# include <pthread.h>
class Mutex
{
public : Mutex ( pthread_mutex_t* lock_p= nullptr ) : _lock_p ( lock_p) { } ~ Mutex ( ) { } void lock ( ) { if ( _lock_p) { pthread_mutex_lock ( _lock_p) ; } } void unlock ( ) { if ( _lock_p) { pthread_mutex_unlock ( _lock_p) ; } }
private : pthread_mutex_t* _lock_p;
} ;
class LockGuard
{
public : LockGuard ( pthread_mutex_t* mutex) : _mutex ( mutex) { _mutex. lock ( ) ; } ~ LockGuard ( ) { _mutex. unlock ( ) ; }
private : Mutex _mutex;
} ;
二、Task.hpp
# pragma once
# include <iostream>
# include <functional>
# include <string>
class Task
{ typedef std:: function< int ( int , int , char ) > func_t;
public : Task ( ) { } Task ( int x, int y, char op, func_t func) : _x ( x) , _y ( y) , _op ( op) , _callBack ( func) { } std:: string operator ( ) ( ) { int result= _callBack ( _x, _y, _op) ; char buffer[ 1024 ] ; snprintf ( buffer, sizeof ( buffer) , "%d %c %d=%d" , _x, _op, _y, result) ; return buffer; } std:: string toTaskString ( ) { char buffer[ 1024 ] ; snprintf ( buffer, sizeof ( buffer) , "%d %c %d=?" , _x, _op, _y) ; return buffer; }
private : int _x; int _y; char _op; func_t _callBack;
} ; const std:: string oper = "+-*/%" ;
int myMath ( int x, int y, char op)
{ int result = 0 ; switch ( op) { case '+' : result = x + y; break ; case '-' : result = x - y; break ; case '*' : result = x * y; break ; case '/' : { if ( y == 0 ) { std:: cerr << "div zero error" << std:: endl; result = - 1 ; } else result = x / y; } break ; case '%' : { if ( y == 0 ) { std:: cerr << "mod zero" << std:: endl; result = - 1 ; } else result = x % y; } break ; default : std:: cout<< "运算符输入有误" << std:: endl; break ; } return result;
}
三、Thread.hpp
# pragma once
# include <iostream>
# include <pthread.h>
# include <string>
# include <functional>
# include <cassert>
namespace ThreadNs
{ typedef std:: function< void * ( void * ) > func_t; const int num = 1024 ; class Thread { private : static void * start_routine ( void * args) { Thread* _this= static_cast < Thread* > ( args) ; return _this-> callback ( ) ; } void * callback ( ) { return _func ( _args) ; } public : Thread ( ) { char nameBuffer[ num] ; snprintf ( nameBuffer, sizeof ( nameBuffer) , "thread-%d" , threadNum++ ) ; _name= nameBuffer; } void start ( func_t func, void * args= nullptr ) { _func= func; _args= args; int n= pthread_create ( & _tid, nullptr , start_routine, this ) ; assert ( n== 0 ) ; ( void ) n; } void join ( ) { int n= pthread_join ( _tid, nullptr ) ; assert ( n== 0 ) ; ( void ) n; } std:: string threadName ( ) { return _name; } ~ Thread ( ) { } private : std:: string _name; func_t _func; void * _args; pthread_t _tid; static int threadNum; } ; int Thread:: threadNum= 1 ;
}
四、ThreadPool.hpp
# pragma once
# include <vector>
# include <queue>
# include <pthread.h>
# include <unistd.h>
# include <mutex>
# include "Thread.hpp"
# include "LockGuard.hpp"
using namespace ThreadNs;
const int gnum = 5 ; template < class T >
class ThreadPool ; template < class T >
struct ThreadData
{ ThreadData ( ThreadPool< T> * tp, const std:: string& s) : _threadPool ( tp) , _name ( s) { } ThreadPool< T> * _threadPool; std:: string _name;
} ;
template < class T >
class ThreadPool
{
private : static void * handlerTask ( void * args) { ThreadData< T> * td= static_cast < ThreadData< T> * > ( args) ; while ( 1 ) { T t; { LockGuard lockGuard ( td-> _threadPool-> mutex ( ) ) ; while ( td-> _threadPool-> IsQueueEmpty ( ) ) { td-> _threadPool-> ThreadWait ( ) ; } t= td-> _threadPool-> Pop ( ) ; } std:: cout<< td-> _name<< "已获取任务:" << t. toTaskString ( ) << "处理结果是:" << t ( ) << std:: endl; } delete td; return nullptr ; } ThreadPool ( const int & num= gnum) : _num ( num) { pthread_mutex_init ( & _mutex, nullptr ) ; pthread_cond_init ( & _cond, nullptr ) ; for ( int i= 0 ; i< _num; ++ i) { _threads. push_back ( new Thread ( ) ) ; } } ThreadPool ( const ThreadPool< T> & ) = delete ; ThreadPool< T> & operator = ( const ThreadPool< T> & ) = delete ; public : void LockQueue ( ) { pthread_mutex_lock ( & _mutex) ; } void UnLockQueue ( ) { pthread_mutex_unlock ( & _mutex) ; } bool IsQueueEmpty ( ) { return _taskQueue. empty ( ) ; } void ThreadWait ( ) { pthread_cond_wait ( & _cond, & _mutex) ; } T Pop ( ) { T t= _taskQueue. front ( ) ; _taskQueue. pop ( ) ; return t; } pthread_mutex_t* mutex ( ) { return & _mutex; }
public : void run ( ) { for ( const auto & t: _threads) { ThreadData< T> * td= new ThreadData < T> ( this , t-> threadName ( ) ) ; t-> start ( handlerTask, ( void * ) td) ; std:: cout<< t-> threadName ( ) << "start..." << std:: endl; } } void push ( const T& in) { LockGuard lockGuard ( & _mutex) ; _taskQueue. push ( in) ; pthread_cond_signal ( & _cond) ; std:: cout<< "任务发送成功" << std:: endl; } ~ ThreadPool ( ) { pthread_mutex_destroy ( & _mutex) ; pthread_cond_destroy ( & _cond) ; for ( const auto & t: _threads) { delete t; } } static ThreadPool< T> * getInstance ( ) { if ( nullptr == tp) { _singletonLock. lock ( ) ; if ( nullptr == tp) { tp= new ThreadPool < T> ( ) ; } _singletonLock. unlock ( ) ; } return tp; }
private : int _num; std:: vector< Thread* > _threads; std:: queue< T> _taskQueue; pthread_mutex_t _mutex; pthread_cond_t _cond; static ThreadPool< T> * tp; static std:: mutex _singletonLock; } ;
template < class T >
ThreadPool< T> * ThreadPool< T> :: tp= nullptr ; template < class T >
std:: mutex ThreadPool< T> :: _singletonLock;