RT-Thread代码分析
这是源码分析, 实际使用看这个
信号看这个
看这一篇之前最好看一下我的RT-Thread对象管理以及线程管理, 时钟管理
邮箱
实际是实现是一个对环形缓存区的使用
struct rt_mailbox
{struct rt_ipc_object parent; /**< inherit from ipc_object */rt_ubase_t *msg_pool; /**< start address of message buffer 缓存区地址*/rt_uint16_t size; /**< size of message pool 大小*/rt_uint16_t entry; /**< index of messages in msg_pool记录一下数量*/rt_uint16_t in_offset; /**< input offset of the message buffer记录一下写的位置 */rt_uint16_t out_offset; /**< output offset of the message 记录一下读的位置buffer */rt_list_t suspend_sender_thread; /**< sender thread suspended on挂起的任务链表this mailbox */
};
typedef struct rt_mailbox *rt_mailbox_t;
struct rt_ipc_object
{struct rt_object parent; /**< inherit from rt_object */rt_list_t suspend_thread; /**< threads pended on this resource 记录接收的任务的队列 */
};
创建
rt_mailbox_t rt_mb_create(const char *name, rt_size_t size, rt_uint8_t flag)
{rt_mailbox_t mb;RT_DEBUG_NOT_IN_INTERRUPT;/* allocate object 这一个在对象创建的那一篇里面分析过了*/mb = (rt_mailbox_t)rt_object_allocate(RT_Object_Class_MailBox, name);if (mb == RT_NULL)return mb;/* set parent */mb->parent.parent.flag = flag;/* initialize ipc object 实际上就是初始化一个用于记录接收任务挂起的链表 */rt_ipc_object_init(&(mb->parent));/* initialize mailbox 记录一些信息 */mb->size = size;//获取一个缓存区mb->msg_pool = (rt_ubase_t *)RT_KERNEL_MALLOC(mb->size * sizeof(rt_ubase_t));if (mb->msg_pool == RT_NULL){//获取失败/* delete mailbox object */rt_object_delete(&(mb->parent.parent));return RT_NULL;}mb->entry = 0;mb->in_offset = 0;mb->out_offset = 0;/* initialize an additional list of sender suspend thread 发送者的队列 */rt_list_init(&(mb->suspend_sender_thread));return mb;
}
//实际就是记录一下信息
rt_err_t rt_mb_init(rt_mailbox_t mb,const char *name,void *msgpool,rt_size_t size,rt_uint8_t flag)
{RT_ASSERT(mb != RT_NULL);/* initialize object 具体看对象的分析那一章*/rt_object_init(&(mb->parent.parent), RT_Object_Class_MailBox, name);/* set parent flag */mb->parent.parent.flag = flag;/* initialize ipc object */rt_ipc_object_init(&(mb->parent));/* initialize mailbox */mb->msg_pool = (rt_ubase_t *)msgpool;mb->size = size;mb->entry = 0;mb->in_offset = 0;mb->out_offset = 0;/* initialize an additional list of sender suspend thread */rt_list_init(&(mb->suspend_sender_thread));return RT_EOK;
}
删除
rt_err_t rt_mb_delete(rt_mailbox_t mb)
{RT_DEBUG_NOT_IN_INTERRUPT;/* resume all suspended thread 把所有的挂起的接收线程释放, 这个同步那一章里面有 */rt_ipc_list_resume_all(&(mb->parent.suspend_thread));/* also resume all mailbox private suspended thread 发送线程释放 */rt_ipc_list_resume_all(&(mb->suspend_sender_thread));/* free mailbox pool 释放缓存内存 */RT_KERNEL_FREE(mb->msg_pool);/* delete mailbox object 控制块会在空闲任务释放 */rt_object_delete(&(mb->parent.parent));return RT_EOK;
}
发送
rt_err_t rt_mb_send(rt_mailbox_t mb, rt_ubase_t value)
{return rt_mb_send_wait(mb, value, 0);
}
rt_err_t rt_mb_send_wait(rt_mailbox_t mb,rt_ubase_t value,rt_int32_t timeout)
{struct rt_thread *thread;register rt_ubase_t temp;rt_uint32_t tick_delta;/* initialize delta tick */tick_delta = 0;/* get current thread 获取当前线程 */thread = rt_thread_self();//回调函数RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mb->parent.parent)));/* disable interrupt 临界区 */temp = rt_hw_interrupt_disable();/* for non-blocking call 看一看有没有位置 */if (mb->entry == mb->size && timeout == 0){//没有并且不等待rt_hw_interrupt_enable(temp);return -RT_EFULL;}/* mailbox is full */while (mb->entry == mb->size){//邮箱满了/* reset error number in thread */thread->error = RT_EOK;/* no waiting, return timeout */if (timeout == 0){/* enable interrupt 这一个线程等待时间到了 */rt_hw_interrupt_enable(temp);return -RT_EFULL;}//这个使用的时候需要已经开始调度了RT_DEBUG_IN_THREAD_CONTEXT;/* suspend current thread *///把这一个任务记录在挂起队列里面(具体分析看任务同步的那一篇)rt_ipc_list_suspend(&(mb->suspend_sender_thread),thread,mb->parent.parent.flag);/* has waiting time, start thread timer */if (timeout > 0){//需要等待/* get the start tick of timer 获取现在的时间 */tick_delta = rt_tick_get();RT_DEBUG_LOG(RT_DEBUG_IPC, ("mb_send_wait: start timer of thread:%s\n",thread->name));/* reset the timeout of thread timer and start it 开启一个时钟用于超时时候的唤醒*/rt_timer_control(&(thread->thread_timer),RT_TIMER_CTRL_SET_TIME,&timeout);rt_timer_start(&(thread->thread_timer));}/* enable interrupt */rt_hw_interrupt_enable(temp);/* re-schedule 启动一次调度 */rt_schedule();//这里线程被唤醒, 看一看这这时候是不是超时了/* resume from suspend state */if (thread->error != RT_EOK){/* return error */return thread->error;}/* disable interrupt 在这之前可能有一个高优先级把这一个位置又使用了, 需要再看看可不可以发送*/temp = rt_hw_interrupt_disable();/* if it's not waiting forever and then re-calculate timeout tick 跟新一下发送的时间, 再试着发送一次 */if (timeout > 0){tick_delta = rt_tick_get() - tick_delta;timeout -= tick_delta;//时间已经到了if (timeout < 0)timeout = 0;}}//可以发送信息/* set ptr 记录一下信息*/mb->msg_pool[mb->in_offset] = value;/* increase input offset 更新一下记录的指针 */++ mb->in_offset;if (mb->in_offset >= mb->size)mb->in_offset = 0;//大小大于这一个环形缓冲区, 回头部/* increase message entry */mb->entry ++;//大小加一/* resume suspended thread 看一看接收的有没有在等的 */if (!rt_list_isempty(&mb->parent.suspend_thread)){//唤醒一下第一个等待的线程以及把时钟关了(看线程同步)rt_ipc_list_resume(&(mb->parent.suspend_thread));/* enable interrupt */rt_hw_interrupt_enable(temp);//切换一下任务rt_schedule();return RT_EOK;}/* enable interrupt */rt_hw_interrupt_enable(temp);return RT_EOK;
}
接收
//基本和发送一样, 只是循环判断是为空, 以及标识符加减反过来了
rt_err_t rt_mb_recv(rt_mailbox_t mb, rt_ubase_t *value, rt_int32_t timeout)
{struct rt_thread *thread;register rt_ubase_t temp;rt_uint32_t tick_delta;/* parameter check */RT_ASSERT(mb != RT_NULL);RT_ASSERT(rt_object_get_type(&mb->parent.parent) == RT_Object_Class_MailBox);/* initialize delta tick */tick_delta = 0;/* get current thread */thread = rt_thread_self();RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mb->parent.parent)));/* disable interrupt */temp = rt_hw_interrupt_disable();/* for non-blocking call */if (mb->entry == 0 && timeout == 0){rt_hw_interrupt_enable(temp);return -RT_ETIMEOUT;}/* mailbox is empty 看看是不是空的 */while (mb->entry == 0){//需要等待/* reset error number in thread */thread->error = RT_EOK;/* no waiting, return timeout */if (timeout == 0){//等待的时间到了, 失败/* enable interrupt */rt_hw_interrupt_enable(temp);thread->error = -RT_ETIMEOUT;return -RT_ETIMEOUT;}//必须打开调度器了RT_DEBUG_IN_THREAD_CONTEXT;//一个典型的icp挂起处理/* suspend current thread */rt_ipc_list_suspend(&(mb->parent.suspend_thread),thread,mb->parent.parent.flag);/* has waiting time, start thread timer 还有时间, 挂起*/if (timeout > 0){/* get the start tick of timer */tick_delta = rt_tick_get();RT_DEBUG_LOG(RT_DEBUG_IPC, ("mb_recv: start timer of thread:%s\n",thread->name));/* reset the timeout of thread timer and start it */rt_timer_control(&(thread->thread_timer),RT_TIMER_CTRL_SET_TIME,&timeout);rt_timer_start(&(thread->thread_timer));}/* enable interrupt */rt_hw_interrupt_enable(temp);/* re-schedule */rt_schedule();//切换, 看一看是不是出错了(超时等)/* resume from suspend state */if (thread->error != RT_EOK){/* return error */return thread->error;}/* disable interrupt 最后检测一下有没有位置 */temp = rt_hw_interrupt_disable();/* if it's not waiting forever and then re-calculate timeout tick 更新时间 */if (timeout > 0){tick_delta = rt_tick_get() - tick_delta;timeout -= tick_delta;if (timeout < 0)timeout = 0;}}/* fill ptr */*value = mb->msg_pool[mb->out_offset];/* increase output offset */++ mb->out_offset;if (mb->out_offset >= mb->size)mb->out_offset = 0;/* decrease message entry */mb->entry --;/* resume suspended thread */if (!rt_list_isempty(&(mb->suspend_sender_thread))){//释放第一个线程rt_ipc_list_resume(&(mb->suspend_sender_thread));/* enable interrupt */rt_hw_interrupt_enable(temp);RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mb->parent.parent)));rt_schedule();return RT_EOK;}/* enable interrupt */rt_hw_interrupt_enable(temp);RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mb->parent.parent)));return RT_EOK;
}
消息队列
实际上这是一个对链表的使用
消息链表: 使用两个指针记录消息链表的头以及尾
空闲链表: 类似栈, 使用一个指针记录空闲任务尾部
struct rt_messagequeue
{struct rt_ipc_object parent; /**< inherit from ipc_object */void *msg_pool; /**< start address of messagequeue 存放消息的缓冲区 */rt_uint16_t msg_size; /**< message size of each message记录可以容纳的每一个消息的大小*/rt_uint16_t max_msgs; /**< max number of messages 记录消息的个数*/rt_uint16_t entry; /**< index of messages in the queue 记录现在消息的个数 */void *msg_queue_head; /**< list head 链表头 */void *msg_queue_tail; /**< list tail 链表尾*/void *msg_queue_free; /**< pointer indicated the free node of queue 记录缓冲区里下一个空闲消息的链表 */rt_list_t suspend_sender_thread; /**< sender thread suspended on this message queue 发送线程的挂起的等待队列*/
};
struct rt_mq_message
{struct rt_mq_message *next;
};
一个管理信息的链表
创建
初始化以后的缓冲区
rt_mq_t rt_mq_create(const char *name,rt_size_t msg_size,rt_size_t max_msgs,rt_uint8_t flag)
{struct rt_messagequeue *mq;struct rt_mq_message *head;register rt_base_t temp;//这一个函数不应该在中断里面使用RT_DEBUG_NOT_IN_INTERRUPT;/* allocate object 获取一个邮箱的对象 */mq = (rt_mq_t)rt_object_allocate(RT_Object_Class_MessageQueue, name);if (mq == RT_NULL)return mq;/* set parent 记录一下标志 */mq->parent.parent.flag = flag;/* initialize ipc object 主要是初始化一个ipc的链表(这一部分在信号同步的的那一篇里面有) */rt_ipc_object_init(&(mq->parent));/* initialize message queue *//* get correct message size 把这一个按照四字节对齐 */mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE);mq->max_msgs = max_msgs;/* allocate message pool 获取一个存信息的内存, 实际的大小是(信息大小 + 管理结构体(一个链表)) * 数量 */mq->msg_pool = RT_KERNEL_MALLOC((mq->msg_size + sizeof(struct rt_mq_message)) * mq->max_msgs);if (mq->msg_pool == RT_NULL){//获取失败的时候rt_object_delete(&(mq->parent.parent));return RT_NULL;}/* initialize message list */mq->msg_queue_head = RT_NULL;mq->msg_queue_tail = RT_NULL;/* initialize message empty list 初始化这一个缓存里面的信息 */mq->msg_queue_free = RT_NULL;for (temp = 0; temp < mq->max_msgs; temp ++){head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool +temp * (mq->msg_size + sizeof(struct rt_mq_message)));//计算一下第n个信息的位置head->next = (struct rt_mq_message *)mq->msg_queue_free;//初始化链表指向数组前一个信息mq->msg_queue_free = head;//更新一下可以使用的下一个的位置}/* the initial entry is zero */mq->entry = 0;/* initialize an additional list of sender suspend thread */rt_list_init(&(mq->suspend_sender_thread));return mq;
}
//静态, 实际就是记录一下用到的值
rt_err_t rt_mq_init(rt_mq_t mq,const char *name,void *msgpool,rt_size_t msg_size,rt_size_t pool_size,rt_uint8_t flag)
{struct rt_mq_message *head;register rt_base_t temp;/* parameter check */RT_ASSERT(mq != RT_NULL);/* initialize object */rt_object_init(&(mq->parent.parent), RT_Object_Class_MessageQueue, name);/* set parent flag */mq->parent.parent.flag = flag;/* initialize ipc object */rt_ipc_object_init(&(mq->parent));/* set message pool */mq->msg_pool = msgpool;/* get correct message size 计算一下实际可以存储的信息的个数 */mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE);mq->max_msgs = pool_size / (mq->msg_size + sizeof(struct rt_mq_message));/* initialize message list */mq->msg_queue_head = RT_NULL;mq->msg_queue_tail = RT_NULL;/* initialize message empty list */mq->msg_queue_free = RT_NULL;for (temp = 0; temp < mq->max_msgs; temp ++){head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool +temp * (mq->msg_size + sizeof(struct rt_mq_message)));head->next = (struct rt_mq_message *)mq->msg_queue_free;mq->msg_queue_free = head;}/* the initial entry is zero */mq->entry = 0;/* initialize an additional list of sender suspend thread */rt_list_init(&(mq->suspend_sender_thread));return RT_EOK;
}
发送消息
rt_err_t rt_mq_send_wait(rt_mq_t mq,const void *buffer,rt_size_t size,rt_int32_t timeout)
{register rt_ubase_t temp;struct rt_mq_message *msg;rt_uint32_t tick_delta;struct rt_thread *thread;/* greater than one message size */if (size > mq->msg_size)//发送的消息太大了, 不能发送return -RT_ERROR;/* initialize delta tick */tick_delta = 0;/* get current thread 获取当前线程 */thread = rt_thread_self();RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent)));/* disable interrupt 临界区 */temp = rt_hw_interrupt_disable();/* get a free list, there must be an empty item */msg = (struct rt_mq_message *)mq->msg_queue_free;/* for non-blocking call */if (msg == RT_NULL && timeout == 0){/* enable interrupt 没有位置, 并且不等待 */rt_hw_interrupt_enable(temp);return -RT_EFULL;}/* message queue is full 没有可以用于发送的空闲缓冲区 */while ((msg = mq->msg_queue_free) == RT_NULL){/* reset error number in thread */thread->error = RT_EOK;/* no waiting, return timeout */if (timeout == 0){/* enable interrupt 时间到了 */rt_hw_interrupt_enable(temp);return -RT_EFULL;}//一个典型的ipc挂起RT_DEBUG_IN_THREAD_CONTEXT;/* suspend current thread */rt_ipc_list_suspend(&(mq->suspend_sender_thread),thread,mq->parent.parent.flag);/* has waiting time, start thread timer */if (timeout > 0){//还需要等待/* get the start tick of timer 记录现在的时间, 用于计算是不是超时 */tick_delta = rt_tick_get();RT_DEBUG_LOG(RT_DEBUG_IPC, ("mq_send_wait: start timer of thread:%s\n",thread->name));/* reset the timeout of thread timer and start it */rt_timer_control(&(thread->thread_timer),RT_TIMER_CTRL_SET_TIME,&timeout);rt_timer_start(&(thread->thread_timer));}/* enable interrupt */rt_hw_interrupt_enable(temp);/* re-schedule 任务切换*/rt_schedule();//回来了, 可能有位置或者出错(超时)/* resume from suspend state */if (thread->error != RT_EOK){//是出错回来的/* return error */return thread->error;}/* disable interrupt */temp = rt_hw_interrupt_disable();//最后更新时间以及检测一下是不是真的有位置/* if it's not waiting forever and then re-calculate timeout tick */if (timeout > 0){tick_delta = rt_tick_get() - tick_delta;timeout -= tick_delta;if (timeout < 0)timeout = 0;}}//有位置/* move free list pointer 获取一个位置, 这个msg是记录空闲位置的那一个指针 */mq->msg_queue_free = msg->next;/* enable interrupt */rt_hw_interrupt_enable(temp);/* the msg is the new tailer of list, the next shall be NULL 这是一个新加入的信息, 没有下一个*/msg->next = RT_NULL;/* copy buffer 把信息拷贝到缓冲区 */rt_memcpy(msg + 1, buffer, size);/* disable interrupt */temp = rt_hw_interrupt_disable();/* link msg to message queue 看一看链表里面有没有信息*/if (mq->msg_queue_tail != RT_NULL){//这是不第一个信息, 更新上一条信息的下一条为这个新的消息/* if the tail exists, */((struct rt_mq_message *)mq->msg_queue_tail)->next = msg;}/* set new tail 尾部记录为这一个信息 */mq->msg_queue_tail = msg;/* if the head is empty, set head 这时候没有信息的话记录一下这一条信息为第一条 */if (mq->msg_queue_head == RT_NULL)mq->msg_queue_head = msg;/* increase message entry 数量加一 */mq->entry ++;/* resume suspended thread 看一看有没有可以释放的任务*/if (!rt_list_isempty(&mq->parent.suspend_thread)){//释放第一个任务rt_ipc_list_resume(&(mq->parent.suspend_thread));/* enable interrupt */rt_hw_interrupt_enable(temp);rt_schedule();return RT_EOK;}/* enable interrupt */rt_hw_interrupt_enable(temp);return RT_EOK;
}
接收信息
//基本一样, 主要看链表操作
rt_err_t rt_mq_recv(rt_mq_t mq,void *buffer,rt_size_t size,rt_int32_t timeout)
{struct rt_thread *thread;register rt_ubase_t temp;struct rt_mq_message *msg;rt_uint32_t tick_delta;/* parameter check */RT_ASSERT(mq != RT_NULL);RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);RT_ASSERT(buffer != RT_NULL);RT_ASSERT(size != 0);/* initialize delta tick */tick_delta = 0;/* get current thread */thread = rt_thread_self();RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mq->parent.parent)));/* disable interrupt */temp = rt_hw_interrupt_disable();/* for non-blocking call */if (mq->entry == 0 && timeout == 0){//没有可以获取的信息, 并且不等待rt_hw_interrupt_enable(temp);return -RT_ETIMEOUT;}/* message queue is empty 没有信息 */while (mq->entry == 0){RT_DEBUG_IN_THREAD_CONTEXT;/* reset error number in thread */thread->error = RT_EOK;/* no waiting, return timeout 超时 */if (timeout == 0){/* enable interrupt */rt_hw_interrupt_enable(temp);thread->error = -RT_ETIMEOUT;return -RT_ETIMEOUT;}//ipc挂起/* suspend current thread */rt_ipc_list_suspend(&(mq->parent.suspend_thread),thread,mq->parent.parent.flag);/* has waiting time, start thread timer */if (timeout > 0){/* get the start tick of timer */tick_delta = rt_tick_get();RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list\n",thread->name));/* reset the timeout of thread timer and start it */rt_timer_control(&(thread->thread_timer),RT_TIMER_CTRL_SET_TIME,&timeout);rt_timer_start(&(thread->thread_timer));}/* enable interrupt */rt_hw_interrupt_enable(temp);/* re-schedule */rt_schedule();/* recv message */if (thread->error != RT_EOK){/* return error */return thread->error;}/* disable interrupt */temp = rt_hw_interrupt_disable();/* if it's not waiting forever and then re-calculate timeout tick */if (timeout > 0){tick_delta = rt_tick_get() - tick_delta;timeout -= tick_delta;if (timeout < 0)timeout = 0;}}/* get message from queue 获取当前的第一个信息 */msg = (struct rt_mq_message *)mq->msg_queue_head;/* move message queue head 更新一下下一条消息的位置 */mq->msg_queue_head = msg->next;/* reach queue tail, set to NULL 这个里面没有消息了(头尾一样, 只有一条信息) */if (mq->msg_queue_tail == msg)mq->msg_queue_tail = RT_NULL;/* decrease message entry 数量更新 */mq->entry --;/* enable interrupt */rt_hw_interrupt_enable(temp);/* copy message 拷贝一下信息 */rt_memcpy(buffer, msg + 1, size > mq->msg_size ? mq->msg_size : size);/* disable interrupt */temp = rt_hw_interrupt_disable();/* put message to free list 把这个回归空闲队列 */msg->next = (struct rt_mq_message *)mq->msg_queue_free;mq->msg_queue_free = msg;/* resume suspended thread 释放等待线程*/if (!rt_list_isempty(&(mq->suspend_sender_thread))){rt_ipc_list_resume(&(mq->suspend_sender_thread));/* enable interrupt */rt_hw_interrupt_enable(temp);RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent)));rt_schedule();return RT_EOK;}/* enable interrupt */rt_hw_interrupt_enable(temp);RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent)));return RT_EOK;
}