信号量
在Nuttx中,信号量是同步和互斥的基础,Nuttx支持POSIX信号量。
信号量是获得对资源独占访问的首选机制,尽管sched_lock()和sched_unlock()接口也能实现这个功能,但是这两个接口还是会在系统中带来一些副作用,sched_lock()会同时禁止高优先级任务的运行,这些任务不依赖于信号量管理的资源,这会对系统的相应时间产生负面影响。
优先级反转
正确使用信号量可以避免sched_lock()的问题,但是存在以下的情况:
低优先级任务Task C,获取一个信号量,获得对保护资源的独占使用;
任务Task C挂起,让高优先级任务Task A执行;
任务Task A试图获取任务Task C所持有的信号量而被阻塞,直到任务Task C放弃信号量;
任务Task C允许被再次执行,但是被某个中等优先级的任务Task B挂起。
在这种情况下,高优先级任务Task A在任务Task B(可能还有其他中等优先级的任务)完成和任务Task C释放信号量之前不能执行。表现出来就是任务Task A的优先级好像比任务Task C优先级要低一样,这种现象就叫优先级反转。
在一些操作系统中通过增加低优先级任务Task C来避免优先级反转(这种行为的可操作术语叫优先级继承)。Nuttx在CONFIG_PRIORITY_INHERITANCE被选中时是支持这种行为,否则的话,需要设计人员提供不会发生优先级反转的实现,比如:
将需要同一个信号量管理的资源的所有任务设置成同一级别的优先级
当需要获取信号量时,将低优先级任务的优先级提升
在低优先级任务中使用sched_lock()
优先级继承
上文中提到,当CONFIG_PRIORITY_INHERITANCE被选中时,Nuttx支持优先级继承,但是这个过程比较复杂。
CONFIG_SEM_PREALLOCHOLDERS
首先,在Nuttx中,优先级继承是在POSIX信号量基础上实现的,这是因为这些信号量是Nuttx中最原始的等待机制,其他大多数等待方式都是基于信号量来实现的,因此,如果为POSIX信号量实现了优先级继承,那么大多数Nuttx等待机制也就具备这个功能了。
复杂性的出现是因为信号量可能有许多信号量计数持有者,为了实现所有持有者的优先级继承,必须分配内部数据结构来管理与信号量关联的各种持有者。CONFIG_SEM_PREALLOCHOLDERS定义了对具有优先级继承支持的信号量进行计数的不同线程的最大数量。这个设置也定义了预分配数据结构池的大小。如果禁用了优先级继承,或者只使用信号量作为互斥体(只有一个持有者),或者使用计数信号量的线程不超过两个,则可以将其设置为0.CONFIG_SEM_NNESTPRIO
此外,可能存在多个不同优先级的线程需要等待来自信号量的计数,低优先级线程持有信号量需要被提高,但是又必须跟踪所有提高优先级的值以便最后能恢复,这个会让事情变得复杂。 CONFIG_SEM_NNESTPRIO定义数组的大小,每个活动线程都有一个数组。这个值设置为等待另一个线程释放信号量上的高优先级线程的最大数量(-1)。给线程行为带来未知风险
优先级继承相关的一些数据结构与信号量的实现紧密耦合在一起,可能带来某些影响。比如,如果线程在信号量进行计数时执行;或者如果线程在不调用sem_destroy()时退出;或者优先级提高后的线程重新确定自己的优先级又会怎样。Nuttx在实现优先级继承的时候会尝试去处理所有的corner case,但是也很有可能会遗漏,最坏的情况是,内存在优先级继承的情况下出现问题。
Locking信号量 VS Signaling信号量
信号量(互斥锁)有很多种用途。
Locking信号量
其中一种典型的用法是对资源的独占访问,也就是对临界区的保护。需要独占访问临界区时,通过信号量来访问资源,访问完毕后,该线程随后释放信号量的计数。优先级继承只适用于这种用途。Signaling信号量
另一种用途是用于发出信号:线程A等待信号量上的事件发生。当事件发生时,另一个线程B将发送信号量唤醒等待的线程A。在独占访问的用法中,是由同一个线程来对信号量进行计数;而在这个用途中,是由一个线程等待在信号量上,另一个线程来发送信号,这本质上是一种线程的同步机制。在这种情况下,不应该使用优先级继承,否则会出现一些奇怪的行为。
/* This structure contains information about the holder of a semaphore */#ifdef CONFIG_PRIORITY_INHERITANCEstruct tcb_s; /* Forward reference */struct semholder_s{#if CONFIG_SEM_PREALLOCHOLDERS > 0 struct semholder_s *flink; /* Implements singly linked list */#endif FAR struct tcb_s *htcb; /* Holder TCB */ int16_t counts; /* Number of counts owned by this holder */};/* This is the generic semaphore structure. */struct sem_s{ volatile int16_t semcount; /* >0 -> Num counts available */ /* <0 -> Num tasks waiting for semaphore */ /* If priority inheritance is enabled, then we have to keep track of which * tasks hold references to the semaphore. */#ifdef CONFIG_PRIORITY_INHERITANCE uint8_t flags; /* See PRIOINHERIT_FLAGS_* definitions */# if CONFIG_SEM_PREALLOCHOLDERS > 0 FAR struct semholder_s *hhead; /* List of holders of semaphore counts */# else struct semholder_s holder; /* Single holder */# endif#endif};
主要的数据结构分为两部分:
struct sem_s:用于描述通用的信号量,其中该结构中包含了信号量的计数变量,以及struct semholder_s成员;
struct semholder_s:用于描述信号量的持有者,对应一个TCB,以及在该TCB所描述的任务中信号量的计数值。由于可能会存在多个任务等待一个信号量,因此这个结构实现为一个单链表形式。
int sem_init(sem_t *sem, int pshared, unsigned int value)
完成未命名信号量sem的初始化,pshared未使用,value为信号量的初始化值。完成初始化之后,信号量就能被用于sem_wait()/sem_post()/sem_trywait()等接口了。int sem_destroy(sem_t *sem)
完成未命名信号量sem的销毁,只有调用sem_init()接口创建的信号量,才能被sem_destroy()销毁。调用sem_destroy() 去销毁一个命名信号量的行为是未定义的,在sem_destroy()之后再去使用信号量的行为也是未定义的。sem_t *sem_open(const char *name, int oflag, ...)
在Task和命名信号量之间建立一个连接,在使用信号量名称调用sem_open()之后,关联的Task可以使用该函数的返回地址来引用对应的信号量。int sem_close(sem_t *sem)
当调用任务结束使用这个命名信号量时,可以调用此接口。sem_close()会释放系统为这个命名的信号量分配的任何系统资源。如果没有使用sem_unlink()来删除信号量,那么sem_close()对指定的信号量没有影响,但是,当指定的信号量被完全解除链接时,信号量将在最后一个任务关闭它时消失。必须小心避免删除另一个调用任务已经锁定的信号量。int sem_unlink(const char *name)
这个函数将删除由输入名参数命名的信号量,如果有一个或多个任务正在使用信号量时调用sem_unlink(),信号量的销毁会被延迟,直到所有引用都被调用sem_close()为止。int sem_wait(sem_t *sem)
尝试去锁住信号量sem,如果sem信号量已经被锁住了,调用该接口的Task不会返回,直到它成功的获取锁,或者调用被信号中断。int sem_timedwait(sem_t *sem, const struct timespec *abstime)
这个函数类似于sem_wait(),不同的是,当没有其他线程通过sem_post()来释放信号量的话,那么在指定时间超时过期时,这个等待将会终止。int sem_trywait(sem_t *sem)
该函数仅在当前信号量未锁定的情况下锁定指定的信号量,无论如何,调用返回时不会阻塞。int sem_post(sem_t *sem)
当一个任务使用完一个信号量时,将调用sem_post(),该函数会解锁信号量。如果该该操作产生的信号量值为正数,则不会阻塞等待信号量解锁的任务,信号量的值只是简单的递增。如果该操作产生的信号量值为0,那么在阻塞的任务中,等待信号量的任务将被允许从sem_wait()调用中成功返回。注意:可以从中断处理程序中调用sem_post()。int sem_getvalue(sem_t *sem, int *sval)
该函数用于获取信号量的值,当信号量被锁住时,得到的值要么为0,要么为负数,其绝对值表示等待信号量的任务数。int sem_getprotocol(FAR const pthread_mutexattr_t *attr, FAR int *protocol)
获取信号量协议属性值,值有: SEM_PRIO_NONE, SEM_PRIO_INHERIT, SEM_PRIO_PROTECT。int sem_setprotocol(FAR pthread_mutexattr_t *attr, int protocol)
设置信号量协议属性,值有: SEM_PRIO_NONE, SEM_PRIO_INHERIT, SEM_PRIO_PROTECT。SEM_PRIO_INHERIT只有在CONFIG_PRIORITY_INHERITANCE被选中时才支持,此外,SEM_PRIO_PROTECT在当前的配置下不支持。
还是来一张图吧:
semaphore原理
信号量整体的框架如上图所示,与之相关的结构如下:
struct sem_s:该结构中维护了一个 信号灯计数值,当有任务在等待这个信号量时,该计数值就加1,释放信号量时,计数值则减1.此外还维护了一个holder持有者链表,把所有想获取这个信号量的任务组织成链表形式。
g_freeholder:全局队列结构,该结构预先静态分配好了所有的holder持有者数据结构,当有新的任务需要等待信号量时,便从这个全局队列中分配一个,如果释放信号量,则将holder持有者数据结构返回到这个队列中。
g_waitingforsemaphore:全局任务队列,当有任务调用sem_wait()等待信号量,但是没法获取的时候,就将该任务添加到g_waitingforsemaphore队列中,并让出CPU,当有任务调用sem_post()释放信号量时,会去查询g_waitingforsemaphore队列,是否有等待该信号量的任务被阻塞,如果有的话,则唤醒对应的任务。
struct semholder_s:信号量持有者,该结构中主要包含了struct tcb_s,对应到等待该信号量的任务,struct tcb_s结构中有一个waitsem字段,用于指向这个任务在等待的信号量。此外还有一个counts计数值,用于记录该任务想获取同一个信号量的次数。
还是从几个关键的函数来分析吧:
sem_wait()
sem_wait()函数主要完成以下几个工作:
判断是否在中断上下文中,由于sem_wait()可能触发任务调度,造成本身睡眠,因此不能在中断上下文中调用;
如果信号量可用,将计数值减1,并将调用任务添加到信号量的持有者链表中;
如果信号量不可用,将计数值减1,将调用任务中waitsem值设置成当前信号量。如果使能了优先级继承,则提升该信号量持有者中比当前调用任务优先级低的任务优先级。最后将调用任务添加到信号量等待队列g_waitingforsemaphore中。
/**************************************************************************** * Name: sem_wait * * Description: * This function attempts to lock the semaphore referenced by 'sem'. If * the semaphore value is (<=) zero, then the calling task will not return * until it successfully acquires the lock. * * Parameters: * sem - Semaphore descriptor. * * Return Value: * 0 (OK), or -1 (ERROR) is unsuccessful * If this function returns -1 (ERROR), then the cause of the failure will * be reported in 'errno' as: * - EINVAL: Invalid attempt to get the semaphore * - EINTR: The wait was interrupted by the receipt of a signal. * * Assumptions: * ****************************************************************************/int sem_wait(FAR sem_t *sem){ FAR struct tcb_s *rtcb = this_task(); irqstate_t flags; int ret = ERROR; /* This API should not be called from interrupt handlers */ DEBUGASSERT(sem != NULL && up_interrupt_context() == false); /* The following operations must be performed with interrupts * disabled because sem_post() may be called from an interrupt * handler. */ flags = enter_critical_section(); /* sem_wait() is a cancellation point */ if (enter_cancellation_point()) {#ifdef CONFIG_CANCELLATION_POINTS /* If there is a pending cancellation, then do not perform * the wait. Exit now with ECANCELED. */ set_errno(ECANCELED); leave_cancellation_point(); leave_critical_section(flags); return ERROR;#endif } /* Make sure we were supplied with a valid semaphore. */ if (sem != NULL) { /* Check if the lock is available */ if (sem->semcount > 0) { /* It is, let the task take the semaphore. */ sem->semcount--; sem_addholder(sem); rtcb->waitsem = NULL; ret = OK; } /* The semaphore is NOT available, We will have to block the * current thread of execution. */ else { /* First, verify that the task is not already waiting on a * semaphore */ ASSERT(rtcb->waitsem == NULL); /* Handle the POSIX semaphore (but don't set the owner yet) */ sem->semcount--; /* Save the waited on semaphore in the TCB */ rtcb->waitsem = sem; /* If priority inheritance is enabled, then check the priority of * the holder of the semaphore. */#ifdef CONFIG_PRIORITY_INHERITANCE /* Disable context switching. The following operations must be * atomic with regard to the scheduler. */ sched_lock(); /* Boost the priority of any threads holding a count on the * semaphore. */ sem_boostpriority(sem);#endif /* Add the TCB to the prioritized semaphore wait queue */ set_errno(0); up_block_task(rtcb, TSTATE_WAIT_SEM); /* When we resume at this point, either (1) the semaphore has been * assigned to this thread of execution, or (2) the semaphore wait * has been interrupted by a signal or a timeout. We can detect these * latter cases be examining the errno value. * * In the event that the semaphore wait was interrupted by a signal or * a timeout, certain semaphore clean-up operations have already been * performed (see sem_waitirq.c). Specifically: * * - sem_canceled() was called to restore the priority of all threads * that hold a reference to the semaphore, * - The semaphore count was decremented, and * - tcb->waitsem was nullifed. * * It is necesaary to do these things in sem_waitirq.c because a long * time may elapse between the time that the signal was issued and * this thread is awakened and this leaves a door open to several * race conditions. */ if (get_errno() != EINTR && get_errno() != ETIMEDOUT) { /* Not awakened by a signal or a timeout... * * NOTE that in this case sem_addholder() was called by logic * in sem_wait() fore this thread was restarted. */ ret = OK; }#ifdef CONFIG_PRIORITY_INHERITANCE sched_unlock();#endif } } else { set_errno(EINVAL); } leave_cancellation_point(); leave_critical_section(flags); return ret; }
sem_post()
sem_post()主要完成以下几个任务:
调用sem_releaseholder()接口来将本任务中持有信号量的次数减1;
增加信号量计数值;
当信号量计数值小于等于0时,表明一定有任务正在睡眠等待本信号量,这些任务都在g_waitingforsemaphore队列中,遍历该队列,找到优先级最高的任务,将它添加进信号量的持有者队列中,并调度运行这个任务。
调用sem_restorebaseprio()接口来恢复之前的优先级(如果有优先级调整的话),在该函数中会去判断任务中持有信号量的计数值,当减到0时,释放该持有者。
/**************************************************************************** * Name: sem_post * * Description: * When a task has finished with a semaphore, it will call sem_post(). * This function unlocks the semaphore referenced by sem by performing the * semaphore unlock operation on that semaphore. * * If the semaphore value resulting from this operation is positive, then * no tasks were blocked waiting for the semaphore to become unlocked; the * semaphore is simply incremented. * * If the value of the semaphore resulting from this operation is zero, * then one of the tasks blocked waiting for the semaphore shall be * allowed to return successfully from its call to sem_wait(). * * Parameters: * sem - Semaphore descriptor * * Return Value: * 0 (OK) or -1 (ERROR) if unsuccessful * * Assumptions: * This function may be called from an interrupt handler. * ****************************************************************************/int sem_post(FAR sem_t *sem){ FAR struct tcb_s *stcb = NULL; irqstate_t flags; int ret = ERROR; /* Make sure we were supplied with a valid semaphore. */ if (sem) { /* The following operations must be performed with interrupts * disabled because sem_post() may be called from an interrupt * handler. */ flags = enter_critical_section(); /* Perform the semaphore unlock operation, releasing this task as a * holder then also incrementing the count on the semaphore. * * NOTE: When semaphores are used for signaling purposes, the holder * of the semaphore may not be this thread! In this case, * sem_releaseholder() will do nothing. * * In the case of a mutex this could be simply resolved since there is * only one holder but for the case of counting semaphores, there may * be many holders and if the holder is not this thread, then it is * not possible to know which thread/holder should be released. * * For this reason, it is recommended that priority inheritance be * disabled via sem_setprotocol(SEM_PRIO_NONE) when the semahore is * initialixed if the semaphore is to used for signaling purposes. */ ASSERT(sem->semcount < SEM_VALUE_MAX); sem_releaseholder(sem); sem->semcount++;#ifdef CONFIG_PRIORITY_INHERITANCE /* Don't let any unblocked tasks run until we complete any priority * restoration steps. Interrupts are disabled, but we do not want * the head of the read-to-run list to be modified yet. * * NOTE: If this sched_lock is called from an interrupt handler, it * will do nothing. */ sched_lock();#endif /* If the result of of semaphore unlock is non-positive, then * there must be some task waiting for the semaphore. */ if (sem->semcount <= 0) { /* Check if there are any tasks in the waiting for semaphore * task list that are waiting for this semaphore. This is a * prioritized list so the first one we encounter is the one * that we want. */ for (stcb = (FAR struct tcb_s *)g_waitingforsemaphore.head; (stcb && stcb->waitsem != sem); stcb = stcb->flink); if (stcb != NULL) { /* The task will be the new holder of the semaphore when * it is awakened. */ sem_addholder_tcb(stcb, sem); /* It is, let the task take the semaphore */ stcb->waitsem = NULL; /* Restart the waiting task. */ up_unblock_task(stcb); } } /* Check if we need to drop the priority of any threads holding * this semaphore. The priority could have been boosted while they * held the semaphore. */#ifdef CONFIG_PRIORITY_INHERITANCE sem_restorebaseprio(stcb, sem); sched_unlock();#endif ret = OK; /* Interrupts may now be enabled. */ leave_critical_section(flags); } else { set_errno(EINVAL); } return ret; }
sem_timedwait()
sem_timedwait()机制与sem_wait()大体类似,它们的区别跟消息队列进行消息接收时mq_receive()/mq_timedreceive()区别类似,也是在代码中创建一个watchdog进行计时,当计时结束后还没等到信号量时,此时会回调sem_timeout()接口,在该接口中取消该任务的等待,并重新调度该任务执行。
/**************************************************************************** * Name: sem_timedwait * * Description: * This function will lock the semaphore referenced by sem as in the * sem_wait() function. However, if the semaphore cannot be locked without * waiting for another process or thread to unlock the semaphore by * performing a sem_post() function, this wait will be terminated when the * specified timeout expires. * * The timeout will expire when the absolute time specified by abstime * passes, as measured by the clock on which timeouts are based (that is, * when the value of that clock equals or exceeds abstime), or if the * absolute time specified by abstime has already been passed at the * time of the call. * * Parameters: * sem - Semaphore object * abstime - The absolute time to wait until a timeout is declared. * * Return Value: * Zero (OK) is returned on success. On failure, -1 (ERROR) is returned * and the errno is set appropriately: * * EINVAL The sem argument does not refer to a valid semaphore. Or the * thread would have blocked, and the abstime parameter specified * a nanoseconds field value less than zero or greater than or * equal to 1000 million. * ETIMEDOUT The semaphore could not be locked before the specified timeout * expired. * EDEADLK A deadlock condition was detected. * EINTR A signal interrupted this function. * ****************************************************************************/int sem_timedwait(FAR sem_t *sem, FAR const struct timespec *abstime){ FAR struct tcb_s *rtcb = this_task(); irqstate_t flags; int ticks; int errcode; int ret = ERROR; DEBUGASSERT(up_interrupt_context() == false && rtcb->waitdog == NULL); /* sem_timedwait() is a cancellation point */ (void)enter_cancellation_point(); /* Verify the input parameters and, in case of an error, set * errno appropriately. */#ifdef CONFIG_DEBUG_FEATURES if (!abstime || !sem) { errcode = EINVAL; goto errout; }#endif /* Create a watchdog. We will not actually need this watchdog * unless the semaphore is unavailable, but we will reserve it up * front before we enter the following critical section. */ rtcb->waitdog = wd_create(); if (!rtcb->waitdog) { errcode = ENOMEM; goto errout; } /* We will disable interrupts until we have completed the semaphore * wait. We need to do this (as opposed to just disabling pre-emption) * because there could be interrupt handlers that are asynchronously * posting semaphores and to prevent race conditions with watchdog * timeout. This is not too bad because interrupts will be re- * enabled while we are blocked waiting for the semaphore. */ flags = enter_critical_section(); /* Try to take the semaphore without waiting. */ ret = sem_trywait(sem); if (ret == OK) { /* We got it! */ goto success_with_irqdisabled; } /* We will have to wait for the semaphore. Make sure that we were provided * with a valid timeout. */ if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000) { errcode = EINVAL; goto errout_with_irqdisabled; } /* Convert the timespec to clock ticks. We must have interrupts * disabled here so that this time stays valid until the wait begins. */ errcode = clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks); /* If the time has already expired return immediately. */ if (errcode == OK && ticks <= 0) { errcode = ETIMEDOUT; goto errout_with_irqdisabled; } /* Handle any time-related errors */ if (errcode != OK) { goto errout_with_irqdisabled; } /* Start the watchdog */ (void)wd_start(rtcb->waitdog, ticks, (wdentry_t)sem_timeout, 1, getpid()); /* Now perform the blocking wait */ ret = sem_wait(sem); if (ret < 0) { /* sem_wait() failed. Save the errno value */ errcode = get_errno(); } /* Stop the watchdog timer */ wd_cancel(rtcb->waitdog); if (errcode != OK) { goto errout_with_irqdisabled; } /* We can now restore interrupts and delete the watchdog */ /* Success exits */success_with_irqdisabled: leave_critical_section(flags); wd_delete(rtcb->waitdog); rtcb->waitdog = NULL; leave_cancellation_point(); return OK; /* Error exits */errout_with_irqdisabled: leave_critical_section(flags); wd_delete(rtcb->waitdog); rtcb->waitdog = NULL; errout: set_errno(errcode); leave_cancellation_point(); return ERROR; }总结
总体来说,在Nuttx中信号量既可用于同步和互斥处理,当任务等不到信号量时,便添加到相关的任务队列中进行阻塞睡眠,当释放信号量时,再去该任务队列中进行查询,重新调度该任务执行。如果遇到优先级反转的情况,优先级继承是一种解决方法。
作者:Loyen