Nuttx提供工作队列机制。工作队列是一个存放线程的队列,它对于将任务负载减荷到不同的线程上下文中,以便于延迟执行,或者串行执行很有帮助。
工作队列分类
有三种不同类型的工作队列,每一类都有不同的属性和用途。
高优先级内核工作队列
高优先级内核工作队列
专用的高优先级工作队列用于中断处理函数中的延迟处理,在有些驱动中可能需要这样一个工作队列,如果没有必要的话,也可以安全的禁掉。高优先级的线程也可以充当资源回收器--从中断处理函数中完成内存的延迟释放。如果高优先级工作线程被disable了的话,清理工作有两种方式来完成:1)如果使能了低优先级的工作线程,在该线程中完成;2)如果低优先级线程没有使能,则IDLE线程来完成(如果内存回收优先级比较高,可能不太合适)。设备驱动底半部
高优先级工作线程可以用于设备驱动程序的底半部,因此它必须运行在一个非常高,并且固定的优先级,与中断处理程序本身的优先级竞争。通常,高优先级工作队列应该是系统中最高优先级的线程。默认的优先级为224。线程池
工作队列可以被配置成支持多个低优先级线程,这本质上是一个线程池,为队列工作提供多线程服务,这打破了“队列”的严格序列化(因此,工作队列也不再是一种队列)。
当在I/O操作,暂停等待输入时,多个工作线程是需要的,如果只有一个工作线程的话,那么整个工作队列处理就会停止。这对于异步I/O、AIO是必要的。与低优先级内核工作队列比较
对于不太关键、较低优先级、面向应用程序的工作线程支持,考虑使用较低优先级的工作队列。较低优先级的工作队列以较低的优先级运行,但是它有一个额外的优点,那就是支持优先级继承(如果CONFIG_PRIORITY_INHERITANCE=y选中的话):低优先级的工作线程可以被调整优先级。配置选项
CONFIG_SCHED_HPWORK:使能高优先级工作队列
CONFIG_SCHED_HPNTHREADS:高优先级工作队列线程池中的线程数量,默认是1.
CONFIG_SCHED_HPWORKPRIORITY:高优先级工作线程的执行优先级,默认是224.
CONFIG_SCHED_HPWORKSTACKSIZE:工作线程的栈空间大小,默认是2048字节通用配置选项
这个选项通用于所有的工作队列:
CONFIG_SIG_SIGWORK:用于唤醒工作线程的信号值,默认使用17.
低优先级内核工作队列
低优先级内核工作队列
低优先级工作队列更适合于具备扩展性的,面向应用程序处理的场景,比如文件系统清理、内存垃圾回收、异步I/O操作等。与高优先内核工作队列比较
低优先级内核工作队列,由于优先级会低一些,因此不适合用作驱动程序的底半部。除此之外,它与高优先级内核工作队列非常相似,上文中关于高优先级工作队列的大部分讨论同样适用。但是低优先级内核工作队列,有一个重要的特点就是优先级继承,这个让它更适合于某些任务。优先级继承
低优先级内核工作线程支持优先级继承(需要选择CONFIG_PRIORITY_INHERITANCE=y),可以根据实际情况调整优先级。优先级继承不是自动完成的,低优先级工作线程总是运行在一个固定的优先级上。可以通过调用lpwork_bootstpriority()接口来提升优先级(通常在调度这个任务之前调用),在任务完成之后可以通过lpwork_restorepriority()接口来恢复优先级(一般在任务完成时的work handler中调用)。目前,只有Nuttx异步I/O逻辑使用了这个动态优先级特性。配置选项
CONFIG_SCHED_LPWORK:使能低优先级工作队列
CONFIG_SCHED_LPNTHREADS:低优先级工作队列中线程数量,默认值为1
CONFIG_SCHED_LPWORKPRIORITY:低优先级工作线程中最小的执行优先级,队列中每个线程都以这个优先级的值开始运行。如果优先级继承使能了的话,优先级会在这个基础上往上提升,默认50.
CONFIG_SCHED_LPWORKPRIOMAX:低优先级线程中最大的执行优先级。运行的优先级不能超过这个值,默认176.
CONFIG_SCHED_LPWORKSTACKSIZE:低优先级工作线程的栈大小,默认2048Byte。
用户模式工作队列
工作队列访问权限
低优先级和高优先级工作线程,都是内核线程。在Nuttx flat build模式下编译时,应用程序是可以访问和使用的。但是,在Nuttx protected/kernel build模式下编译时,内核模式下的代码是独立的,用户模式是没法访问的。工作模式工作队列
用户模式工作队列接口与内核模式工作队列接口相同,用户模式工作队列的功能等效于高优先级工作队列,不同之处在于,它的实现不依赖于内核内部提供的资源。配置选项
CONFIG_LIB_USRWORK:使能用户模式工作队列
CONFIG_LIB_USRWORKPRIORITY:用户模式下工作线程的执行优先级,默认为100.
CONFIG_LIB_USRWORKSTACKSIZE:用户模式下工作线程的栈大小,默认2048.
数据结构
数据结构分为两部分,一部分是用户使用的结构,另一部分是内核实现用到的结构:
用户数据结构
/* Defines the work callback */typedef void (*worker_t)(FAR void *arg);/* Defines one entry in the work queue. The user only needs this structure
* in order to declare instances of the work structure. Handling of all
* fields is performed by the work APIs
*/struct work_s{
struct dq_entry_s dq; /* Implements a doubly linked list */
worker_t worker; /* Work callback */
FAR void *arg; /* Callback argument */
systime_t qtime; /* Time work queued */
systime_t delay; /* Delay until work performed */};struct work_s结构只需要用来声明实例即可,该数据结构中的内部成员,全部由相应的API接口来操作,其中qtime表示的是该任务入队的时间,而delay表示的是需要延迟多长时间去执行,如果delay值为0,表明立刻执行。
内核实现数据结构
/* This represents one worker */struct kworker_s{
pid_t pid; /* The task ID of the worker thread */
volatile bool busy; /* True: Worker is not available */};/* This structure defines the state of one kernel-mode work queue */struct kwork_wqueue_s{
systime_t delay; /* Delay between polling cycles (ticks) */
struct dq_queue_s q; /* The queue of pending work */
struct kworker_s worker[1]; /* Describes a worker thread */};/* This structure defines the state of one high-priority work queue. This
* structure must be cast-compatible with kwork_wqueue_s.
*/#ifdef CONFIG_SCHED_HPWORKstruct hp_wqueue_s{
systime_t delay; /* Delay between polling cycles (ticks) */
struct dq_queue_s q; /* The queue of pending work */
struct kworker_s worker[1]; /* Describes the single high priority worker */};#endif/* This structure defines the state of one high-priority work queue. This
* structure must be cast compatible with kwork_wqueue_s
*/#ifdef CONFIG_SCHED_LPWORKstruct lp_wqueue_s{
systime_t delay; /* Delay between polling cycles (ticks) */
struct dq_queue_s q; /* The queue of pending work */
/* Describes each thread in the low priority queue's thread pool */
struct kworker_s worker[CONFIG_SCHED_LPNTHREADS];};#endif/****************************************************************************
* Public Data
****************************************************************************/#ifdef CONFIG_SCHED_HPWORK/* The state of the kernel mode, high priority work queue. */extern struct hp_wqueue_s g_hpwork;#endif#ifdef CONFIG_SCHED_LPWORK/* The state of the kernel mode, low priority work queue(s). */extern struct lp_wqueue_s g_lpwork;#endif上述结构体中:
struct kworker_s:对应一个工作线程,其中包含了线程ID号及运行状态。
struct kwork_wqueue_s:描述内核模式下的工作队列,在接口中都使用这个数据结构,实际上是将struct hp_wqueue_s/struct lp_wqueue_s数据结构进行强制类型转换。
struct hp_wqueue_s:描述高优先级内核工作队列,从数据结构中可以看出,该队列中默认只支持1个工作线程。
struct lp_wqueue_s:描述低优先级内核工作队列,从数据结构中可以看出,该队列中的工作线程是可以配置的,CONFIG_SCHED_LPNTHREADS的值就代表线程数量。
g_hpwork/g_lpwork:分别为两个全局描述符,对应到两种类型的内核工作队列。
接口定义
int work_usrstart(void):启动用户模式下的工作队列。
int work_queue(int qid, FAR struct work_s *work, worker_t worker, FAR void *arg, systime_t delay):将任务添加到工作队列中,任务将会在工作队列中的线程上延迟运行。
int work_cancel(int qid, FAR struct work_s *work):将之前入列的任务删除掉。
int work_signal(int qid):通过工作队列中的线程去执行任务处理。
work_available(work):检查任务的结构体是否可用。
void lpwork_boostpriority(uint8_t reqprio):提升线程执行的优先级。
void lpwork_restorepriority(uint8_t reqprio):恢复线程执行的优先级。
代码说明一切:
/**************************************************************************** * Name: work_usrstart * * Description: * Start the user mode work queue. * * Input parameters: * None * * Returned Value: * The task ID of the worker thread is returned on success. A negated * errno value is returned on failure. * ****************************************************************************/#if defined(CONFIG_LIB_USRWORK) && !defined(__KERNEL__)int work_usrstart(void);#endif
/**************************************************************************** * Name: work_queue * * Description: * Queue work to be performed at a later time. All queued work will be * performed on the worker thread of execution (not the caller's). * * The work structure is allocated by caller, but completely managed by * the work queue logic. The caller should never modify the contents of * the work queue structure; the caller should not call work_queue() * again until either (1) the previous work has been performed and removed * from the queue, or (2) work_cancel() has been called to cancel the work * and remove it from the work queue. * * Input parameters: * qid - The work queue ID * work - The work structure to queue * worker - The worker callback to be invoked. The callback will invoked * on the worker thread of execution. * arg - The argument that will be passed to the worker callback when * it is invoked. * delay - Delay (in clock ticks) from the time queue until the worker * is invoked. Zero means to perform the work immediately. * * Returned Value: * Zero on success, a negated errno on failure * ****************************************************************************/int work_queue(int qid, FAR struct work_s *work, worker_t worker, FAR void *arg, systime_t delay);
/**************************************************************************** * Name: work_cancel * * Description: * Cancel previously queued work. This removes work from the work queue. * After work has been cancelled, it may be re-queue by calling work_queue() * again. * * Input parameters: * qid - The work queue ID * work - The previously queue work structure to cancel * * Returned Value: * Zero on success, a negated errno on failure * * -ENOENT - There is no such work queued. * -EINVAL - An invalid work queue was specified * ****************************************************************************/int work_cancel(int qid, FAR struct work_s *work);
/**************************************************************************** * Name: work_signal * * Description: * Signal the worker thread to process the work queue now. This function * is used internally by the work logic but could also be used by the * user to force an immediate re-assessment of pending work. * * Input parameters: * qid - The work queue ID * * Returned Value: * Zero on success, a negated errno on failure * ****************************************************************************/int work_signal(int qid);
/**************************************************************************** * Name: work_available * * Description: * Check if the work structure is available. * * Input parameters: * work - The work queue structure to check. * None * * Returned Value: * true if available; false if busy (i.e., there is still pending work). * ****************************************************************************/#define work_available(work) ((work)->worker == NULL)
/**************************************************************************** * Name: lpwork_boostpriority * * Description: * Called by the work queue client to assure that the priority of the low- * priority worker thread is at least at the requested level, reqprio. This * function would normally be called just before calling work_queue(). * * Parameters: * reqprio - Requested minimum worker thread priority * * Return Value: * None * ****************************************************************************/#if defined(CONFIG_SCHED_LPWORK) && defined(CONFIG_PRIORITY_INHERITANCE)void lpwork_boostpriority(uint8_t reqprio);#endif
/**************************************************************************** * Name: lpwork_restorepriority * * Description: * This function is called to restore the priority after it was previously * boosted. This is often done by client logic on the worker thread when * the scheduled work completes. It will check if we need to drop the * priority of the worker thread. * * Parameters: * reqprio - Previously requested minimum worker thread priority to be * "unboosted" * * Return Value: * None * ****************************************************************************/#if defined(CONFIG_SCHED_LPWORK) && defined(CONFIG_PRIORITY_INHERITANCE)void lpwork_restorepriority(uint8_t reqprio);#endif原理
按惯例,先来一张图吧:
工作队列简单来说,工作队列就如上图所示,由三个部分组成:
任务队列:用于存放需要延迟执行的任务,这个也就是通过work_queue()接口添加任务的任务队列。
工作线程:在高优先级内核工作队列中,默认只有一个线程;在低优先级内核工作队列中支持多个工作线程。任务队列中的任务就分发到这些线程上来执行。
延时参数delay:这个参数定义了轮询时的间隔时间,进而判断任务队列中的任务是否已经到需要执行的时间点了。
Nuttx操作系统执行的入口在os_start(),从这开始,最终会调用到工作队列线程的创建,调用关系如下:
os_start() ---> os_bringup() ---> os_workqueue() ---> work_hpstart()/work_lpstart()/USERSPACE->work_usrstart()
其中work_hpstart()/work_lpstart()/USERSPACE->work_usrstart()分别对应内核高优先级工作队列、内核低优先级工作队列、用户模式工作队列三种情况,由于原理类似,我将选择内核高优先级工作队列来进行分析。入口为:work_hpstart()。
work_hpstart()主要完成以下几点:
初始化高优先级工作队列数据结构;
在该工作队列中,创建一个高优先级的工作线程work_hpthread,默认只支持一个;
int work_hpstart(void){ pid_t pid; /* Initialize work queue data structures */
g_hpwork.delay = CONFIG_SCHED_HPWORKPERIOD / USEC_PER_TICK;
dq_init(&g_hpwork.q); /* Start the high-priority, kernel mode worker thread */
sinfo("Starting high-priority kernel worker thread\n");
pid = kernel_thread(HPWORKNAME, CONFIG_SCHED_HPWORKPRIORITY,
CONFIG_SCHED_HPWORKSTACKSIZE,
(main_t)work_hpthread,
(FAR char * const *)NULL);
DEBUGASSERT(pid > 0); if (pid < 0)
{ int errcode = errno;
DEBUGASSERT(errcode > 0);
serr("ERROR: kernel_thread failed: %d\n", errcode); return -errcode;
}
g_hpwork.worker[0].pid = pid;
g_hpwork.worker[0].busy = true; return pid;实际的工作由work_hpthread线程来处理,在该函数中运行一个死循环,在循环中调用work_process()来处理实际的任务。
/****************************************************************************
* Name: work_hpthread
*
* Description:
* This is the worker thread that performs the actions placed on the high
* priority work queue.
*
* This, along with the lower priority worker thread(s) are the kernel
* mode work queues (also build in the flat build). One of these threads
* also performs periodic garbage collection (that would otherwise be
* performed by the idle thread if CONFIG_SCHED_WORKQUEUE is not defined).
* That will be the higher priority worker thread only if a lower priority
* worker thread is available.
*
* All kernel mode worker threads are started by the OS during normal
* bring up. This entry point is referenced by OS internally and should
* not be accessed by application logic.
*
* Input parameters:
* argc, argv (not used)
*
* Returned Value:
* Does not return
*
****************************************************************************/static int work_hpthread(int argc, char *argv[]){ /* Loop forever */
for (; ; )
{#ifndef CONFIG_SCHED_LPWORK
/* First, perform garbage collection. This cleans-up memory
* de-allocations that were queued because they could not be freed in
* that execution context (for example, if the memory was freed from
* an interrupt handler).
*
* NOTE: If the work thread is disabled, this clean-up is performed by
* the IDLE thread (at a very, very low priority). If the low-priority
* work thread is enabled, then the garbage collection is done on that
* thread instead.
*/
sched_garbage_collection();#endif
/* Then process queued work. work_process will not return until: (1)
* there is no further work in the work queue, and (2) the polling
* period provided by g_hpwork.delay expires.
*/
work_process((FAR struct kwork_wqueue_s *)&g_hpwork, g_hpwork.delay, 0);
} return OK; /* To keep some compilers happy */}所以工作队列的任务处理核心是work_process()接口,该接口对于内核的高优先级工作队列和内核低优先级工作队列是一致的。
work_process()完成的主要任务有:
获取执行时候的系统时间,这个时间主要用于统计任务进入工作队列后,消耗了多久,是否到了需要去执行的时间点。
从工作队列的头部获取一个任务,通过比较两个时间值:1)消耗的时间,也就是当前的系统时间减去任务入列的时间;2)任务延迟执行的时间,也就是数据结构中描述的delay时间。
如果消耗的时间大于延迟执行的时间,那就立刻执行任务的回调函数。
如果消耗的时间小于延迟执行的时间,计算剩余时间,并最终让任务睡眠等待一下。
5.高优先级内核工作队列和低优先内核工作队列的实现方式有一些细微的差异,主要体现在,高优先级的情况下,如果还不到执行时间,工作线程选择睡眠让出CPU;低优先级的情况下,会选择让第一个线程轮询(与高优先级工作线程行为一致),而让其他的工作线程调用sigwaitinfo()接口等待信号。
代码如下:
void work_process(FAR struct kwork_wqueue_s *wqueue, systime_t period, int wndx){ volatile FAR struct work_s *work;
worker_t worker; irqstate_t flags;
FAR void *arg; systime_t elapsed; systime_t remaining; systime_t stick; systime_t ctick; systime_t next; /* Then process queued work. We need to keep interrupts disabled while
* we process items in the work list.
*/
next = period;
flags = enter_critical_section(); /* Get the time that we started this polling cycle in clock ticks. */
stick = clock_systimer(); /* And check each entry in the work queue. Since we have disabled
* interrupts we know: (1) we will not be suspended unless we do
* so ourselves, and (2) there will be no changes to the work queue
*/
work = (FAR struct work_s *)wqueue->q.head; while (work)
{ /* Is this work ready? It is ready if there is no delay or if
* the delay has elapsed. qtime is the time that the work was added
* to the work queue. It will always be greater than or equal to
* zero. Therefore a delay of zero will always execute immediately.
*/
ctick = clock_systimer();
elapsed = ctick - work->qtime; if (elapsed >= work->delay)
{ /* Remove the ready-to-execute work from the list */
(void)dq_rem((struct dq_entry_s *)work, &wqueue->q); /* Extract the work description from the entry (in case the work
* instance by the re-used after it has been de-queued).
*/
worker = work->worker; /* Check for a race condition where the work may be nullified
* before it is removed from the queue.
*/
if (worker != NULL)
{ /* Extract the work argument (before re-enabling interrupts) */
arg = work->arg; /* Mark the work as no longer being queued */
work->worker = NULL; /* Do the work. Re-enable interrupts while the work is being
* performed... we don't have any idea how long this will take!
*/
leave_critical_section(flags);
worker(arg); /* Now, unfortunately, since we re-enabled interrupts we don't
* know the state of the work list and we will have to start
* back at the head of the list.
*/
flags = enter_critical_section();
work = (FAR struct work_s *)wqueue->q.head;
} else
{ /* Cancelled.. Just move to the next work in the list with
* interrupts still disabled.
*/
work = (FAR struct work_s *)work->dq.flink;
}
} else /* elapsed < work->delay */
{ /* This one is not ready.
*
* NOTE that elapsed is relative to the the current time,
* not the time of beginning of this queue processing pass.
* So it may need an adjustment.
*/
elapsed += (ctick - stick); if (elapsed > work->delay)
{ /* The delay has expired while we are processing */
elapsed = work->delay;
} /* Will it be ready before the next scheduled wakeup interval? */
remaining = work->delay - elapsed; if (remaining < next)
{ /* Yes.. Then schedule to wake up when the work is ready */
next = remaining;
} /* Then try the next in the list. */
work = (FAR struct work_s *)work->dq.flink;
}
}#if defined(CONFIG_SCHED_LPWORK) && CONFIG_SCHED_LPNTHREADS > 0
/* Value of zero for period means that we should wait indefinitely until
* signalled. This option is used only for the case where there are
* multiple, low-priority worker threads. In that case, only one of
* the threads does the poll... the others simple. In all other cases
* period will be non-zero and equal to wqueue->delay.
*/
if (period == 0)
{ sigset_t set; /* Wait indefinitely until signalled with SIGWORK */
sigemptyset(&set);
sigaddset(&set, SIGWORK);
wqueue->worker[wndx].busy = false;
DEBUGVERIFY(sigwaitinfo(&set, NULL));
wqueue->worker[wndx].busy = true;
} else#endif
{ /* Get the delay (in clock ticks) since we started the sampling */
elapsed = clock_systimer() - stick; if (elapsed < period && next > 0)
{ /* How much time would we need to delay to get to the end of the
* sampling period? The amount of time we delay should be the smaller
* of the time to the end of the sampling period and the time to the
* next work expiry.
*/
remaining = period - elapsed;
next = MIN(next, remaining); /* Wait awhile to check the work list. We will wait here until
* either the time elapses or until we are awakened by a signal.
* Interrupts will be re-enabled while we wait.
*/
wqueue->worker[wndx].busy = false;
usleep(next * USEC_PER_TICK);
wqueue->worker[wndx].busy = true;
}
}
leave_critical_section(flags);
}总结Nuttx中的工作队列机制还是比较简单的:一个工作队列,对应到一个任务的队列,以及一个工作线程的数组。内核负责来调度这些工作线程,而任务队列中的任务会分发到各个线程上执行。三种类型的工作队列,实现都是大同小异。
作者:Loyen
我要赚赏金
