令牌桶实例
令牌桶三要素
cps 每秒钟传输字节数
burst 令牌桶内最多能传输的字节数,token的最大值
token 令牌的个数
之前是一个令牌(token)对应一个字节,现在将一个token变为一个cps,cps是解码速率,每攒到一个令牌,就token+=cps
如果需要不同的速率,使用不同的令牌桶,将令牌桶存储在一个数组中。
代码
mytbf.h
#ifndef MYTBF__H_
#define MYTBF__H_#define MYTBF_MAX 1024
typedef void mytbf_t;mytbf_t *mytbf_init(int cps,int burst);int mytbf_fetchtoken(mytbf_t * ,int);int mytbf_returntoken(mytbf_t * ,int );int mytbf_destroy(mytbf_t *);#endif
mytbf.c
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <signal.h>#include "mytbf.h"typedef void (*sighandler_t)(int);//将令牌桶的数据结构存在数组中,由于static修饰,所以未初始化前job[i]是NULL
static struct mytbf_st * job[MYTBF_MAX];
static int inited = 0;
#define MIN(A,B) (A < B ? A : B)
static sighandler_t alrm_handler_save;/*每个token代表一个字节,cps代表解码速率,burst应该是cps的倍数,token=token+cps*/
//这是令牌桶的数据结构,这个数据结构存在数组中
struct mytbf_st
{int cps; //每秒钟传输的字节数int burst; //令牌桶中令牌最大数量int token; //令牌的个数int pos; //记录令牌桶在数组的位置下标
};//信号捕捉函数
static void alrm_handler(int s)
{alarm(1);//为数组中的令牌桶中的令牌做累计for(int i = 0;i < MYTBF_MAX; i++){if(job[i] != NULL){job[i]->token += job[i]->cps;//令牌的数量不能超过令牌的最大数量burstif(job[i]->token > job[i]->burst)job[i]->token = job[i]->burst;}}
}
//关闭时钟发送信号,恢复
static void module_unload(void)
{int i;//恢复SIGALRM到之前的功能signal(SIGALRM,alrm_handler_save);//取消时钟发送信号alarm(0);//释放令牌桶for(i = 0;i < MYTBF_MAX;i++){free(job[i]);}}
//第一次发时钟信号的函数,这个函数只执行一次
static void module_load(void)
{//signal的返回值是注册新的行为(alrm_handler)之前的行为alrm_handler_save = signal(SIGALRM,alrm_handler);alarm(1);//注册钩子函数,这个不是函数调用,而是当调用exit的时候才会调用atexit(module_unload);
}//查找数组中空位置下标
static int get_free_pos(void)
{int i = 0;for(i = 0;i < MYTBF_MAX; i++){if(job[i] == NULL)return i;}return -1;
}mytbf_t *mytbf_init(int cps,int burst)
{int pos = 0;struct mytbf_st *me;//在数组中找到空位下标pospos = get_free_pos();if(pos < 0){return NULL;}if( !inited ){module_load();inited = 1;}me = malloc(sizeof(*me));if(me == NULL){return NULL;}//初始化令牌桶结构体成员me->token = 0;me->cps = cps;me->burst = burst;me->pos = pos;//将令牌桶放到数组中job[pos] = me;return me;
}//取令牌
int mytbf_fetchtoken(mytbf_t *ptr ,int size)
{int n;struct mytbf_st *me = ptr;if(size <= 0)return -1;//查看令牌桶中有没有令牌while(me->token <= 0)pause();//当要取的令牌数大于最大令牌数量,给最大令牌数量n = MIN(me->token,size);me->token -=n;return n;}//归还令牌
int mytbf_returntoken(mytbf_t *ptr ,int size)
{struct mytbf_st *me = ptr;if(size <=0 )return -1;me->token +=size;//判断令牌桶中的令牌是否大于令牌的最大数量burstif(me->token > me->burst)me->token = me->burst;return size;
}//销毁令牌桶
int mytbf_destroy(mytbf_t *ptr)
{ //因为mytbf_t 是void类型,转换下struct mytbf_st *me = ptr;job[me->pos] = NULL;free(me);return 0;
}
main.c
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>
#include <errno.h>
#include "mytbf.h"#define CPS 10
#define BUFSIZE 1024
#define BURST 100int main(int argc,char**argv)
{int sfd,dfd = 1;int size,errnor;char buf[BUFSIZE];int len,ret,pos;mytbf_t * tbf;if(argc < 2){fprintf(stderr,"Usage ....\n");exit(1);}//初始化令牌桶tbf = mytbf_init(CPS,BURST);//打开要读取的文件do{sfd = open(argv[1],O_RDONLY);if(sfd < 0){if(errno != EINTR){perror("open()");exit(1);}}}while(sfd < 0);while(1){ //从令牌桶中取出BUFSIZE个令牌size = mytbf_fetchtoken(tbf,BUFSIZE);if(size < 0){fprintf(stderr,"mytbf_fetchtoken is error\n");exit(1);}while((len = read(sfd,buf,size)) < 0){if(errno == EINTR)continue;perror("read()");break;}if(len == 0)break;//判断令牌是否用完,因为一个token是一个字节,所以size-len是剩余的字节,也是tokenif(size - len > 0)mytbf_returntoken(tbf,size-len);pos = 0;//使用循环写,写够len个字节的内容while(len > 0){ret = write(dfd,buf+pos,len);if(ret < 0){ //假错,继续写if(ret == EINTR)continue;perror("write()");exit(1);}pos += ret;len -= ret;}}mytbf_destroy(tbf);exit(0);
}
makefile
all:mytbf
mytbf:main.c mytbf.cgcc $^ -o $@
clean:rm *.o mytbf
多线程版本
mytbf.h
#ifndef MYTBF_H__
#define MYTBF_H__//流量控制的实现
#define MYTBF_MAX 1024
typedef void mytbf_t;mytbf_t *mytbf_init(int cps,int burst);int mytbf_fetchtoken(mytbf_t *,int);int mytbf_returntoken(mytbf_t *,int);int mytbf_destory(mytbf_t *);#endif
mytbf.c
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include <sys/select.h>
#include "mytbf.h"struct mytbf_st
{int cps;int burst;int pos;int token;pthread_mutex_t mut; //用于操作令牌时使用的互斥锁pthread_cond_t cond;
};//定时函数,定时sec秒时间
static void select_time(int sec)
{struct timeval tm;tm.tv_sec = sec;tm.tv_usec = 0;//调用select()函数后,程序将阻塞在这里,直到超过指定的等待时间或者有信号中断这个等待。select(0, NULL, NULL, NULL, &tm);
}
//静态初始化用于操作数组的互斥锁
pthread_mutex_t job_mut = PTHREAD_MUTEX_INITIALIZER;
static struct mytbf_st *job[MYTBF_MAX];
static pthread_t tid;
pthread_once_t once = PTHREAD_ONCE_INIT;//这个线程用来将数组中令牌桶中的令牌不断累加
void *thr_alrm(void *args)
{int i;while (1){ //操作令牌桶数组要先加锁pthread_mutex_lock(&job_mut);for(i = 0; i < MYTBF_MAX; i++){if(job[i] != NULL){//操作令牌桶中的令牌时也要加锁pthread_mutex_lock(&job[i]->mut);job[i]->token += job[i]->cps;if (job[i]->token > job[i]->burst)job[i]->token = job[i]->burst;//令牌有了,通知阻塞在等待令牌的线程pthread_cond_broadcast(&job[i]->cond);pthread_mutex_unlock(&job[i]->mut);}}pthread_mutex_unlock(&job_mut);//定时1s时间select_time(1);}}void mode_unload()
{//不会立即终止tid线程,而是发送一个取消请求给tid线程。pthread_cancel(tid);pthread_join(tid, NULL);int i;for(i = 0; i < MYTBF_MAX; i++){free(job[i]);}
}void mode_load()
{int err;//创建一个线程按照定时时间给令牌桶累加err = pthread_create(&tid, NULL, thr_alrm, NULL);if (err < 0){fprintf(stderr, "pthread_create():%s", strerror(errno));exit(1);}atexit(mode_unload);
}//查找数组中空位置的下标
static int get_free_pos()
{int i;for(i = 0; i < MYTBF_MAX; i++){if(job[i] == NULL)return i;}return -1;
}mytbf_t *mytbf_init(int cps,int burst)
{int pos;struct mytbf_st *me = malloc(sizeof(struct mytbf_st));if (me == NULL){return NULL;}//使mod_load这个函数只会执行一次pthread_once(&once, mode_load);me->cps = cps;me->burst = burst;me->token = 0;pthread_mutex_init(&me->mut, NULL);pthread_cond_init(&me->cond, NULL);//操作令牌桶数组前先加锁pthread_mutex_lock(&job_mut);pos = get_free_pos();if (pos < 0){free(me);pthread_mutex_unlock(&job_mut);return NULL;}me->pos = pos;job[me->pos] = me;pthread_mutex_unlock(&job_mut);return me;
}static int mymin(int a, int b)
{return a < b ? a : b;
}int mytbf_fetchtoken(mytbf_t *me,int size)
{struct mytbf_st *ptr = me;//操作令牌桶中的令牌先加锁pthread_mutex_lock(&ptr->mut);if (ptr->token <= 0)//没有令牌就先阻塞在条件变量上pthread_cond_wait(&ptr->cond, &ptr->mut);int n;n = mymin(ptr->token, size);ptr->token -= n;pthread_mutex_unlock(&ptr->mut);return n;
}int mytbf_returntoken(mytbf_t *me,int n)
{struct mytbf_st *ptr = me;//操作令牌桶中的令牌先加锁pthread_mutex_lock(&ptr->mut);ptr->token += n;if (ptr->token >= ptr->burst)ptr->token = ptr->burst;//通知阻塞在条件变量上的线程pthread_cond_broadcast(&ptr->cond);pthread_mutex_unlock(&ptr->mut);return 0;
}int mytbf_destory(mytbf_t *me)
{struct mytbf_st *ptr = me;pthread_mutex_lock(&job_mut);job[ptr->pos] = NULL;pthread_mutex_unlock(&job_mut);pthread_mutex_destroy(&ptr->mut);pthread_cond_destroy(&ptr->cond);free(me);return 0;
}
main.c
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <time.h>
#include <string.h>
#include <unistd.h>
#include "mytbf.h"#define CPS 10
#define BUFSIZE 1024
#define BURST 100int main(int argc, char **argv)
{int fd;mytbf_t *tbf;if(argc < 2){fprintf(stderr,"Usage:\n");exit(1);}//初始化令牌桶中的令牌数量为BURST,速率为CPStbf = mytbf_init(CPS, BURST);if(tbf == NULL){fprintf(stdout, "mytbfInit() false!\n");exit(1);}do{ fd = open(argv[1],O_RDONLY);if (fd < 0){if (errno != EINTR){perror("open()");exit(1);}}}while(fd < 0);size_t len = 0;size_t pos = 0;size_t ret = 0;int size;int count = 0;char buf[BUFSIZE];while (1){//取BUFSIZE个令牌size = mytbf_fetchtoken(tbf, BUFSIZE);if (size < 0){fprintf(stderr, "mytbfFetchToken():%s\n", strerror(-size));exit(1);}while ((len = read(fd, buf, size)) < 0){if (errno == EINTR)continue;perror("read()");break;}//len为0说明读到文件末尾if (len == 0)break;//判断令牌是否使用完,归还剩余的令牌,因为1个令牌代表1个字节if (size - len > 0)mytbf_returntoken(tbf, size - len);pos = 0;while (len > 0){ret = write(1, buf + pos, len);if (ret < 0){perror("write()");exit(1);}pos += ret;len -= ret;}}close(fd);mytbf_destory(tbf);exit(0);
}
makefile
all:mytbf
mytbf:main.c mytbf.cgcc $^ -o $@ -pthread
clean:rm *.o mytbf