(原创)RTOS-RTX分析系列之邮箱

2017/07/24 RTX

简介

RTX的邮箱和我们生活中的邮政局的邮筒非常类似,借鉴了发邮件和取邮件的所有场景。

关键结构

先大致了解下邮箱控制块的结构体:

typedef struct OS_MCB {
  U8     cb_type;                 /* Control Block Type  控制块类型   */
  U8     state;                   /* State flag variable 状态标记,1代表有任务在等待接收邮箱的邮件,2代表有任务在等待邮箱发送邮件 */
  U8     isr_st;                  /* State flag variable for isr functions  中断中使用的状态标记*/
  struct OS_TCB *p_lnk;           /* Chain of tasks waiting for message 等待这个邮箱的任务链表 */
  U16    first;                   /* Index of the message list begin 邮箱的起始位置  */
  U16    last;                    /* Index of the message list end  邮箱的末尾位置 */
  U16    count;                   /* Actual number of stored messages  邮箱已经收到的邮件数量*/
  U16    size;                    /* Maximum number of stored messages 邮箱的总大小 */
  void   *msg[1];                 /* FIFO for Message pointers 1st element 邮件的地址数组,和邮箱的总大小匹配 */
} *P_MCB;

下面是邮箱定义的结构体:

typedef struct os_mailQ_def  {
  uint32_t                queue_sz;    ///< number of elements in the queue
  uint32_t                 item_sz;    ///< size of an item
  void                       *pool;    ///< memory array for mail
} osMailQDef_t;

为了方便,重新把格子内存的结构体贴出来:

typedef struct OS_BM {
  void *free;                     /* Pointer to first free memory block      */
  void *end;                      /* Pointer to memory block end             */
  U32  blk_size;                  /* Memory block size                       */
} *P_BM;

还有一个宏需要解释下,就是osMailQDef宏,方便用户定义邮箱:

#define osMailQDef(name, queue_sz, type) \
uint32_t os_mailQ_q_##name[4+(queue_sz)] = { 0 }; \//这里有个加4,是因为OS_MCB在msg字段前,其它字段占用了4个32位单元,实际大小是超出OS_MCB结构体长度的,也算是用了一个技巧。
uint32_t os_mailQ_m_##name[3+((sizeof(type)+3)/4)*(queue_sz)]; \//参考之前```RTOS-RTX分析系列之格子内存```,为了使用格子内存管理,这里把OS_BM占用的3个32位单元也包括了,所以这里加3。
void *   os_mailQ_p_##name[2] = { (os_mailQ_q_##name), os_mailQ_m_##name }; \//放在一起扔给pool管理。
const osMailQDef_t os_mailQ_def_##name =  \
{ (queue_sz), sizeof(type), (os_mailQ_p_##name) }

关键实现

定义邮箱

举个栗子,邮箱长度是16,每个邮件的大小是用户自定义的,这里是sizeof(MAILQUEUE_OBJ_t),也就是33个字节:

#define MAILQUEUE_OBJECTS      16                               // number of Message Queue Objects

typedef struct {                                                // object data type
  uint8_t Buf[32];
  uint8_t Idx;
} MAILQUEUE_OBJ_t;

osMailQDef(MailQueue, MAILQUEUE_OBJECTS, MAILQUEUE_OBJ_t);     // mail queue object

按照上面的讲解,邮箱在定义的时候,就已经把所有的空间都分配了,即使用不到16个邮件空间,这里也占用了16个邮件空间大小的ram,因此,在资源有限的芯片中,邮箱的大小应该按需调节。

rt_mbx_init

空间分配好后,就要初始化邮箱控制结构体:

void rt_mbx_init (OS_ID mailbox, U16 mbx_size) {
  /* Initialize a mailbox */
  P_MCB p_MCB = mailbox;//把上面申请的os_mailQ_q_##name[4+(queue_sz)]空间强制转换成邮箱结构体。

  p_MCB->cb_type = MCB;//控制块类型为邮箱。
  p_MCB->state   = 0;
  p_MCB->isr_st  = 0;
  p_MCB->p_lnk   = NULL;
  p_MCB->first   = 0;
  p_MCB->last    = 0;
  p_MCB->count   = 0;
  p_MCB->size    = (mbx_size + sizeof(void *) - sizeof(struct OS_MCB)) / //这里实际上是总大小减去OS_MCB结构体除了*msg字段的总大小,因为在传入的时候是包括这些空间的。
						     (U32)sizeof (void *);
}

rt_mbx_send

和实际生活中很类似,发送一个邮件,需要指定一个邮箱,以及设置超时时间。

OS_RESULT rt_mbx_send (OS_ID mailbox, void *p_msg, U16 timeout) {
  /* Send message to a mailbox */
  P_MCB p_MCB = mailbox;
  P_TCB p_TCB;

  if ((p_MCB->p_lnk != NULL) && (p_MCB->state == 1)) {//如果有任务正在等待这个邮箱的邮件,那么直接给这个任务,并不需要放入邮箱中。
    /* A task is waiting for message */
    p_TCB = rt_get_first ((P_XCB)p_MCB);//找出等待优先级最高的任务
#ifdef __CMSIS_RTOS
    rt_ret_val2(p_TCB, 0x10/*osEventMessage*/, (U32)p_msg);//直接把邮件返回给这个任务
#else
    *p_TCB->msg = p_msg;
    rt_ret_val (p_TCB, OS_R_MBX);
#endif
    rt_rmv_dly (p_TCB);//延时任务链表删除这个等待的任务
    rt_dispatch (p_TCB);//主动申请一次调度
  }
  else {//如果没有任务在等待这个邮件,那这封邮件就必须放入邮箱中了,不然没有地方保存就丢弃了。
    /* Store message in mailbox queue */
    if (p_MCB->count == p_MCB->size) {//如果邮箱恰好满了,放不下
      /* No free message entry, wait for one. If message queue is full, */
      /* then no task is waiting for message. The 'p_MCB->p_lnk' list   */
      /* pointer can now be reused for send message waits task list.    */
      if (timeout == 0) {//如果发送邮件这个任务又不想等待,那就只好丢弃这个邮件了,毕竟放不下又不愿意等,那就罢了,直接返回OS_R_TMO。
	return (OS_R_TMO);
      }
      //如果愿意等待一会
      if (p_MCB->p_lnk != NULL) {//当前已经有任务也在等待这个邮箱,那我只好去排队了
	rt_put_prio ((P_XCB)p_MCB, os_tsk.run);//排队
      }
      else {//当前没有任务在等待这个邮箱,我就是排头兵。
	p_MCB->p_lnk = os_tsk.run;
	os_tsk.run->p_lnk  = NULL;
	os_tsk.run->p_rlnk = (P_TCB)p_MCB;
	/* Task is waiting to send a message */      
	p_MCB->state = 2;//标记下这个邮箱的状态为等待发送。
      }
      os_tsk.run->msg = p_msg;//既然邮箱满了,那就任务自己的控制块先保存下这封邮件,免得丢失了。
      rt_block (timeout, WAIT_MBX);//任务只能保存一封邮件,因此赶紧睡眠阻塞自己,免得再发邮件,没地方存,当然如果在指定时间到了继续发邮箱还是满的,这封邮件就得丢失了,保存的将是最新的邮件。
      return (OS_R_TMO);
    }
    /* Yes, there is a free entry in a mailbox. */
    //邮箱没有满,还可以放邮件
    p_MCB->msg[p_MCB->first] = p_msg;//放置邮件
    rt_inc (&p_MCB->count);//标记邮件数量
    if (++p_MCB->first == p_MCB->size) {//循环buff
      p_MCB->first = 0;
    }
  }
  return (OS_R_OK);
}

rt_mbx_wait

等待邮箱的邮件,可以设置超时时间:

OS_RESULT rt_mbx_wait (OS_ID mailbox, void **message, U16 timeout) {
  /* Receive a message; possibly wait for it */
  P_MCB p_MCB = mailbox;
  P_TCB p_TCB;

  /* If a message is available in the fifo buffer */
  /* remove it from the fifo buffer and return. */
  if (p_MCB->count) {//如果当前邮箱里有邮件
    *message = p_MCB->msg[p_MCB->last];//那就获取最先放置的邮件,毕竟按顺序来嘛
    if (++p_MCB->last == p_MCB->size) {//循环buff
      p_MCB->last = 0;
    }
    if ((p_MCB->p_lnk != NULL) && (p_MCB->state == 2)) {//如果恰好邮箱是满的,而且又个任务很焦急的等待发送一封邮件
      /* A task is waiting to send message */
      p_TCB = rt_get_first ((P_XCB)p_MCB);//那就取出优先级最高的任务
#ifdef __CMSIS_RTOS
      rt_ret_val(p_TCB, 0/*osOK*/);//可以让这个焦急的任务安心的回去工作了
#else
      rt_ret_val(p_TCB, OS_R_OK);
#endif
      p_MCB->msg[p_MCB->first] = p_TCB->msg;//把焦急的任务需要发送的邮件拿出来放进邮箱
      if (++p_MCB->first == p_MCB->size) {//循环buff
	p_MCB->first = 0;
      }
      rt_rmv_dly (p_TCB);
      rt_dispatch (p_TCB);//因为邮件一失一得,这里并改变把邮件总数。
    }
    else {
      rt_dec (&p_MCB->count);//取出了一封邮件,总数得减1。
    }
    return (OS_R_OK);//返回正常
  }
  /* No message available: wait for one */
  //当前邮箱是空的
  if (timeout == 0) {//不想等待,那就直接返回
    return (OS_R_TMO);
  }
  if (p_MCB->p_lnk != NULL) {//想等待就排队
    rt_put_prio ((P_XCB)p_MCB, os_tsk.run);
  }
  else {//想等待,刚好只有我排队
    p_MCB->p_lnk = os_tsk.run;
    os_tsk.run->p_lnk = NULL;
    os_tsk.run->p_rlnk = (P_TCB)p_MCB;
    /* Task is waiting to receive a message */      
    p_MCB->state = 1;//改变邮箱的状态,表示此时此刻有任务一直在等待邮箱的邮件。
  }
  rt_block(timeout, WAIT_MBX);//等待邮箱得到邮件
#ifndef __CMSIS_RTOS
  os_tsk.run->msg = message;
#endif
  return (OS_R_TMO);
}

rt_mbx_check

这个接口是为了检测邮箱是否有邮件:

OS_RESULT rt_mbx_check (OS_ID mailbox) {
  /* Check for free space in a mailbox. Returns the number of messages     */
  /* that can be stored to a mailbox. It returns 0 when mailbox is full.   */
  P_MCB p_MCB = mailbox;

  return (p_MCB->size - p_MCB->count);
}

isr_mbx_send

这个接口无非即使延迟执行rt_mbx_send:

void isr_mbx_send (OS_ID mailbox, void *p_msg) {
  /* Same function as "os_mbx_send", but to be called by ISRs. */
  P_MCB p_MCB = mailbox;

  rt_psq_enq (p_MCB, (U32)p_msg);
  rt_psh_req ();
}

isr_mbx_receive

同上,不作解释,延迟执行:

OS_RESULT isr_mbx_receive (OS_ID mailbox, void **message) {
  /* Receive a message in the interrupt function. The interrupt function   */
  /* should not wait for a message since this would block the rtx os.      */
  P_MCB p_MCB = mailbox;

  if (p_MCB->count) {
    /* A message is available in the fifo buffer. */
    *message = p_MCB->msg[p_MCB->last];
    if (p_MCB->state == 2) {
      /* A task is locked waiting to send message */
      rt_psq_enq (p_MCB, 0);
      rt_psh_req ();
    }
    rt_dec (&p_MCB->count);
    if (++p_MCB->last == p_MCB->size) {
      p_MCB->last = 0;
    }
    return (OS_R_MBX);
  }
  return (OS_R_OK);
}

知识共享许可协议
本作品采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可。

站内搜索

    撩我备注-博客

    joinee

    目录结构