【简介】
FreeRtos的任务间会有通信的需求,FreeRTOS 中所有的通信与同步机制都是基于队列实现的。队列可以实现任务-任务,任务-中断,中断-中断之间的通信机制。
以下是任务A B之间通信创建队列之间数据交互的过程。
在FreeRtos 中队列的使用如下的数据结构进行管理。
/* * Definition of the queue used by the scheduler. * Items are queued by copy, not reference. See the following link for the * rationale: https://www.freertos.org/Embedded-RTOS-Queues.html */ typedef struct QueueDefinition /* The old naming convention is used to prevent breaking kernel aware debuggers. */ { int8_t *pcHead; /*< Points to the beginning of the queue storage area. */ int8_t *pcWriteTo; /*< Points to the free next place in the storage area. */ union { QueuePointers_t xQueue; /*< Data required exclusively when this structure is used as a queue. */ SemaphoreData_t xSemaphore; /*< Data required exclusively when this structure is used as a semaphore. */ } u; List_t xTasksWaitingToSend; /*< List of tasks that are blocked waiting to post onto this queue. Stored in priority order. */ List_t xTasksWaitingToReceive; /*< List of tasks that are blocked waiting to read from this queue. Stored in priority order. */ volatile UBaseType_t uxMessagesWaiting;/*< The number of items currently in the queue. */ UBaseType_t uxLength; /*< The length of the queue defined as the number of items it will hold, not the number of bytes. */ UBaseType_t uxItemSize; /*< The size of each items that the queue will hold. */ volatile int8_t cRxLock; /*< Stores the number of items received from the queue (removed from the queue) while the queue was locked. Set to queueUNLOCKED when the queue is not locked. */ volatile int8_t cTxLock; /*< Stores the number of items transmitted to the queue (added to the queue) while the queue was locked. Set to queueUNLOCKED when the queue is not locked. */ #if( ( configSUPPORT_STATIC_ALLOCATION == 1 ) && ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) ) uint8_t ucStaticallyAllocated; /*< Set to pdTRUE if the memory used by the queue was statically allocated to ensure no attempt is made to free the memory. */ #endif #if ( configUSE_QUEUE_SETS == 1 ) struct QueueDefinition *pxQueueSetContainer; #endif #if ( configUSE_TRACE_FACILITY == 1 ) UBaseType_t uxQueueNumber; uint8_t ucQueueType; #endif } xQUEUE; /* The old xQUEUE name is maintained above then typedefed to the new Queue_t name below to enable the use of older kernel aware debuggers. */ typedef xQUEUE Queue_t;
上述结构成员的xTasksWaitingToSend/xTasksWaitingToReceive 链表成员用于管理当前阻塞在队列发送/接收的任务,因为一个任务同一时刻只能被阻塞在一个任务上。对应的任务控制块上使用xEventListItem 成员挂载到event 链表上。
为了便于debug 产看queue 的信息,queue 还定义了QueueRegistryItem_t 结构用来给queue 添加name 信息。
添加如下的测试代码通过IAR 查看任务被queue的阻塞状态。
/* Declare a variable of type QueueHandle_t. This is used to store the handle to the queue that is accessed by all three tasks. */ QueueHandle_t xQueue; static void vSenderTask( void *pvParameters ) { int32_t lValueToSend; BaseType_t xStatus; /* Two instances of this task are created so the value that is sent to the queue is passed in via the task parameter - this way each instance can use a different value. The queue was created to hold values of type int32_t, so cast the parameter to the required type. */ lValueToSend = ( int32_t ) pvParameters; /* As per most tasks, this task is implemented within an infinite loop. */ for( ;; ) { //vTaskDelay(13000); /* Send the value to the queue. The first parameter is the queue to which data is being sent. The queue was created before the scheduler was started, so before this task started to execute. The second parameter is the address of the data to be sent, in this case the address of lValueToSend. The third parameter is the Block time – the time the task should be kept in the Blocked state to wait for space to become available on the queue should the queue already be full. In this case a block time is not specified because the queue should never contain more than one item, and therefore never be full. */ xStatus = xQueueSendToBack( xQueue, &lValueToSend, 10000000 ); if( xStatus != pdPASS ) { /* The send operation could not complete because the queue was fullthis must be an error as the queue should never contain more than one item! */ printf( "Could not send to the queue.\r\n" ); } } } static void vReceiverTask( void *pvParameters ) { /* Declare the variable that will hold the values received from the queue. */ int32_t lReceivedValue; BaseType_t xStatus; const TickType_t xTicksToWait = pdMS_TO_TICKS( 10000 ); /* This task is also defined within an infinite loop. */ for( ;; ) { vTaskDelay(13000); /* This call should always find the queue empty because this task will immediately remove any data that is written to the queue. */ if( uxQueueMessagesWaiting( xQueue ) != 0 ) { printf( "Queue should have been empty!\r\n" ); } /* Receive data from the queue. The first parameter is the queue from which data is to be received. The queue is created before the scheduler is started, and therefore before this task runs for the first time. The second parameter is the buffer into which the received data will be placed. In this case the buffer is simply the address of a variable that has the required size to hold the received data. The last parameter is the block time – the maximum amount of time that the task will remain in the Blocked state to wait for data to be available should the queue already be empty. */ xStatus = xQueueReceive( xQueue, &lReceivedValue, xTicksToWait ); if( xStatus == pdPASS ) { /* Data was successfully received from the queue, print out the received value. */ printf( "Received = %d \r\n", lReceivedValue ); } else { /* Data was not received from the queue even after waiting for 100ms. This must be an error as the sending tasks are free running and will be continuously writing to the queue. */ printf( "Could not receive from the queue.\r\n" ); } } } int main(void) { uart_init(); (void)show_board_info(); /* The queue is created to hold a maximum of 5 values, each of which is large enough to hold a variable of type int32_t. */ xQueue = xQueueCreate( 5, sizeof( int32_t ) ); if( xQueue != NULL ) { vQueueAddToRegistry( xQueue, "TestQ" ); } xTaskCreate( vSenderTask, "Sender1", 128, ( void * ) 100, 1, NULL ); xTaskCreate( vSenderTask, "Sender2", 128, ( void * ) 200, 1, NULL ); /* Create the task that will read from the queue. The task is created with priority 2, so above the priority of the sender tasks. */ xTaskCreate( vReceiverTask, "Receiver", 128, NULL, 2, NULL ); vTaskStartScheduler(); }
IAR 查看任务状态,send1/2被阻塞在queue
此时的queue 已经被写满无法再进行写入,当前被阻塞在TX queue 的任务数量为2.
互斥锁的创建
在freertos 中任务见通信同步的机制都是使用的quene 来实现的,以下代码是互斥锁的创建代码
代码中的queue 的len 配置为1 ,不需要传递数据对应的size 配置为0,以下代码是互斥锁底层获取接口调用函数。
BaseType_t xQueueSemaphoreTake( QueueHandle_t xQueue, TickType_t xTicksToWait ) { BaseType_t xEntryTimeSet = pdFALSE; TimeOut_t xTimeOut; Queue_t * const pxQueue = xQueue; #if( configUSE_MUTEXES == 1 ) BaseType_t xInheritanceOccurred = pdFALSE; #endif /* Check the queue pointer is not NULL. */ configASSERT( ( pxQueue ) ); /* Check this really is a semaphore, in which case the item size will be 0. */ configASSERT( pxQueue->uxItemSize == 0 );// 断言检查信号量的size 是否为0,非零触发断言 /* Cannot block if the scheduler is suspended. */ #if ( ( INCLUDE_xTaskGetSchedulerState == 1 ) || ( configUSE_TIMERS == 1 ) ) { configASSERT( !( ( xTaskGetSchedulerState() == taskSCHEDULER_SUSPENDED ) && ( xTicksToWait != 0 ) ) );//检查调度器是否被挂起,挂起调度器则触发断言 } #endif /*lint -save -e904 This function relaxes the coding standard somewhat to allow return statements within the function itself. This is done in the interest of execution time efficiency. */ for( ;; ) { taskENTER_CRITICAL(); { /* Semaphores are queues with an item size of 0, and where the number of messages in the queue is the semaphore's count value. */ const UBaseType_t uxSemaphoreCount = pxQueue->uxMessagesWaiting; /* Is there data in the queue now? To be running the calling task must be the highest priority task wanting to access the queue. */ if( uxSemaphoreCount > ( UBaseType_t ) 0 )// 检查信号量的计数值是否大于0,如果大于零信号量可以进行-1获取动作 { traceQUEUE_RECEIVE( pxQueue ); /* Semaphores are queues with a data size of zero and where the messages waiting is the semaphore's count. Reduce the count. */ pxQueue->uxMessagesWaiting = uxSemaphoreCount - ( UBaseType_t ) 1;//对信号量数值进行-1处理 #if ( configUSE_MUTEXES == 1 ) { if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX )//当前queue 类型为互斥锁类型 { /* Record the information required to implement priority inheritance should it become necessary. */ pxQueue->u.xSemaphore.xMutexHolder = pvTaskIncrementMutexHeldCount();//更新互斥锁holder 计数+1 } else { mtCOVERAGE_TEST_MARKER(); } } #endif /* configUSE_MUTEXES */ /* Check to see if other tasks are blocked waiting to give the semaphore, and if so, unblock the highest priority such task. */ if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE ) { if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE ) { queueYIELD_IF_USING_PREEMPTION(); } else { mtCOVERAGE_TEST_MARKER(); } } else { mtCOVERAGE_TEST_MARKER(); } taskEXIT_CRITICAL(); return pdPASS; } else { if( xTicksToWait == ( TickType_t ) 0 ) { /* For inheritance to have occurred there must have been an initial timeout, and an adjusted timeout cannot become 0, as if it were 0 the function would have exited. */ #if( configUSE_MUTEXES == 1 ) { configASSERT( xInheritanceOccurred == pdFALSE ); } #endif /* configUSE_MUTEXES */ /* The semaphore count was 0 and no block time is specified (or the block time has expired) so exit now. */ taskEXIT_CRITICAL(); traceQUEUE_RECEIVE_FAILED( pxQueue ); return errQUEUE_EMPTY; } else if( xEntryTimeSet == pdFALSE ) { /* The semaphore count was 0 and a block time was specified so configure the timeout structure ready to block. */ vTaskInternalSetTimeOutState( &xTimeOut ); xEntryTimeSet = pdTRUE; } else { /* Entry time was already set. */ mtCOVERAGE_TEST_MARKER(); } } } taskEXIT_CRITICAL(); /* Interrupts and other tasks can give to and take from the semaphore now the critical section has been exited. */ vTaskSuspendAll(); prvLockQueue( pxQueue ); /* Update the timeout state to see if it has expired yet. */ if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE ) { /* A block time is specified and not expired. If the semaphore count is 0 then enter the Blocked state to wait for a semaphore to become available. As semaphores are implemented with queues the queue being empty is equivalent to the semaphore count being 0. */ if( prvIsQueueEmpty( pxQueue ) != pdFALSE ) { traceBLOCKING_ON_QUEUE_RECEIVE( pxQueue ); #if ( configUSE_MUTEXES == 1 ) { if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX ) { taskENTER_CRITICAL(); { xInheritanceOccurred = xTaskPriorityInherit( pxQueue->u.xSemaphore.xMutexHolder ); } taskEXIT_CRITICAL(); } else { mtCOVERAGE_TEST_MARKER(); } } #endif vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToReceive ), xTicksToWait ); prvUnlockQueue( pxQueue ); if( xTaskResumeAll() == pdFALSE ) { portYIELD_WITHIN_API(); } else { mtCOVERAGE_TEST_MARKER(); } } else { /* There was no timeout and the semaphore count was not 0, so attempt to take the semaphore again. */ prvUnlockQueue( pxQueue ); ( void ) xTaskResumeAll(); } } else { /* Timed out. */ prvUnlockQueue( pxQueue ); ( void ) xTaskResumeAll(); /* If the semaphore count is 0 exit now as the timeout has expired. Otherwise return to attempt to take the semaphore that is known to be available. As semaphores are implemented by queues the queue being empty is equivalent to the semaphore count being 0. */ if( prvIsQueueEmpty( pxQueue ) != pdFALSE ) { #if ( configUSE_MUTEXES == 1 ) { /* xInheritanceOccurred could only have be set if pxQueue->uxQueueType == queueQUEUE_IS_MUTEX so no need to test the mutex type again to check it is actually a mutex. */ if( xInheritanceOccurred != pdFALSE ) { taskENTER_CRITICAL(); { UBaseType_t uxHighestWaitingPriority; /* This task blocking on the mutex caused another task to inherit this task's priority. Now this task has timed out the priority should be disinherited again, but only as low as the next highest priority task that is waiting for the same mutex. */ uxHighestWaitingPriority = prvGetDisinheritPriorityAfterTimeout( pxQueue ); vTaskPriorityDisinheritAfterTimeout( pxQueue->u.xSemaphore.xMutexHolder, uxHighestWaitingPriority ); } taskEXIT_CRITICAL(); } } #endif /* configUSE_MUTEXES */ traceQUEUE_RECEIVE_FAILED( pxQueue ); return errQUEUE_EMPTY; } else { mtCOVERAGE_TEST_MARKER(); } } } /*lint -restore */ }