这些小活动你都参加了吗?快来围观一下吧!>>
电子产品世界 » 论坛首页 » 嵌入式开发 » 软件与操作系统 » 【Freertos】队列管理

共4条 1/1 1 跳转至

【Freertos】队列管理

高工
2025-05-18 08:06:59     打赏

【简介】

FreeRtos的任务间会有通信的需求,FreeRTOS 中所有的通信与同步机制都是基于队列实现的。队列可以实现任务-任务,任务-中断,中断-中断之间的通信机制。

image.png

以下是任务A B之间通信创建队列之间数据交互的过程。

image.png

在FreeRtos 中队列的使用如下的数据结构进行管理。

/*
 * Definition of the queue used by the scheduler.
 * Items are queued by copy, not reference.  See the following link for the
 * rationale: https://www.freertos.org/Embedded-RTOS-Queues.html
 */
typedef struct QueueDefinition 		/* The old naming convention is used to prevent breaking kernel aware debuggers. */
{
	int8_t *pcHead;					/*< Points to the beginning of the queue storage area. */
	int8_t *pcWriteTo;				/*< Points to the free next place in the storage area. */

	union
	{
		QueuePointers_t xQueue;		/*< Data required exclusively when this structure is used as a queue. */
		SemaphoreData_t xSemaphore; /*< Data required exclusively when this structure is used as a semaphore. */
	} u;

	List_t xTasksWaitingToSend;		/*< List of tasks that are blocked waiting to post onto this queue.  Stored in priority order. */
	List_t xTasksWaitingToReceive;	/*< List of tasks that are blocked waiting to read from this queue.  Stored in priority order. */

	volatile UBaseType_t uxMessagesWaiting;/*< The number of items currently in the queue. */
	UBaseType_t uxLength;			/*< The length of the queue defined as the number of items it will hold, not the number of bytes. */
	UBaseType_t uxItemSize;			/*< The size of each items that the queue will hold. */

	volatile int8_t cRxLock;		/*< Stores the number of items received from the queue (removed from the queue) while the queue was locked.  Set to queueUNLOCKED when the queue is not locked. */
	volatile int8_t cTxLock;		/*< Stores the number of items transmitted to the queue (added to the queue) while the queue was locked.  Set to queueUNLOCKED when the queue is not locked. */

	#if( ( configSUPPORT_STATIC_ALLOCATION == 1 ) && ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) )
		uint8_t ucStaticallyAllocated;	/*< Set to pdTRUE if the memory used by the queue was statically allocated to ensure no attempt is made to free the memory. */
	#endif

	#if ( configUSE_QUEUE_SETS == 1 )
		struct QueueDefinition *pxQueueSetContainer;
	#endif

	#if ( configUSE_TRACE_FACILITY == 1 )
		UBaseType_t uxQueueNumber;
		uint8_t ucQueueType;
	#endif

} xQUEUE;

/* The old xQUEUE name is maintained above then typedefed to the new Queue_t
name below to enable the use of older kernel aware debuggers. */
typedef xQUEUE Queue_t;

上述结构成员的xTasksWaitingToSend/xTasksWaitingToReceive 链表成员用于管理当前阻塞在队列发送/接收的任务,因为一个任务同一时刻只能被阻塞在一个任务上。对应的任务控制块上使用xEventListItem  成员挂载到event  链表上。

image.png

为了便于debug 产看queue 的信息,queue 还定义了QueueRegistryItem_t 结构用来给queue 添加name 信息。

image.png

添加如下的测试代码通过IAR 查看任务被queue的阻塞状态。

/* Declare a variable of type QueueHandle_t. This is used to store the
handle to the queue that is accessed by all three tasks. */
QueueHandle_t xQueue;


static void vSenderTask( void *pvParameters )
{
  int32_t lValueToSend;
  BaseType_t xStatus;
  /* Two instances of this task are created so the value that is sent to
    the queue is passed in via the task parameter - this way each instance
    can use a different value. The queue was created to hold values of type
    int32_t, so cast the parameter to the required type. */
    lValueToSend = ( int32_t ) pvParameters;
    /* As per most tasks, this task is implemented within an infinite loop. */
    for( ;; )
    {
      //vTaskDelay(13000);
      /* Send the value to the queue.
      The first parameter is the queue to which data is being sent. The
      queue was created before the scheduler was started, so before this
      task started to execute.
      The second parameter is the address of the data to be sent, in this
      case the address of lValueToSend.
      The third parameter is the Block time – the time the task should be
      kept in the Blocked state to wait for space to become available on
      the queue should the queue already be full. In this case a block
      time is not specified because the queue should never contain more
      than one item, and therefore never be full. */
      xStatus = xQueueSendToBack( xQueue, &lValueToSend, 10000000 );
      if( xStatus != pdPASS )
      {
          /* The send operation could not complete because the queue was fullthis must be an error as the queue should never contain more than
          one item! */
          printf( "Could not send to the queue.\r\n" );
        }
      }
}


static void vReceiverTask( void *pvParameters )
{
  /* Declare the variable that will hold the values received from the
  queue. */
  int32_t lReceivedValue;
  BaseType_t xStatus;
  const TickType_t xTicksToWait = pdMS_TO_TICKS( 10000 );
  /* This task is also defined within an infinite loop. */
  for( ;; )
  {
    vTaskDelay(13000);
    /* This call should always find the queue empty because this task will
    immediately remove any data that is written to the queue. */
    if( uxQueueMessagesWaiting( xQueue ) != 0 )
    {
          printf( "Queue should have been empty!\r\n" );
      }
    /* Receive data from the queue.
    The first parameter is the queue from which data is to be received.
    The queue is created before the scheduler is started, and therefore
    before this task runs for the first time.
    The second parameter is the buffer into which the received data will
    be placed. In this case the buffer is simply the address of a
    variable that has the required size to hold the received data.
    The last parameter is the block time – the maximum amount of time
    that the task will remain in the Blocked state to wait for data to
    be available should the queue already be empty. */
    xStatus = xQueueReceive( xQueue, &lReceivedValue, xTicksToWait );
    if( xStatus == pdPASS )
    {
      /* Data was successfully received from the queue, print out the
        received value. */
      printf( "Received = %d \r\n", lReceivedValue );
    }
    else
    {
        /* Data was not received from the queue even after waiting for
        100ms. This must be an error as the sending tasks are free
        running and will be continuously writing to the queue. */
        printf( "Could not receive from the queue.\r\n" );
    }
  }
}
int main(void)
{


    uart_init();

    (void)show_board_info();

    
    /* The queue is created to hold a maximum of 5 values, each of which is
      large enough to hold a variable of type int32_t. */
    xQueue = xQueueCreate( 5, sizeof( int32_t ) );

    if( xQueue != NULL )
    {
	vQueueAddToRegistry( xQueue, "TestQ" );
    }
    
    xTaskCreate( vSenderTask, "Sender1", 128, ( void * ) 100, 1, NULL );
    xTaskCreate( vSenderTask, "Sender2", 128, ( void * ) 200, 1, NULL );
    /* Create the task that will read from the queue. The task is created
    with priority 2, so above the priority of the sender tasks. */
    xTaskCreate( vReceiverTask, "Receiver", 128, NULL, 2, NULL );
    
    vTaskStartScheduler();
}

IAR 查看任务状态,send1/2被阻塞在queue

image.png

此时的queue 已经被写满无法再进行写入,当前被阻塞在TX queue 的任务数量为2.

image.png


互斥锁的创建

在freertos 中任务见通信同步的机制都是使用的quene 来实现的,以下代码是互斥锁的创建代码

image.png

代码中的queue 的len 配置为1 ,不需要传递数据对应的size 配置为0,以下代码是互斥锁底层获取接口调用函数。

BaseType_t xQueueSemaphoreTake( QueueHandle_t xQueue, TickType_t xTicksToWait )
{
BaseType_t xEntryTimeSet = pdFALSE;
TimeOut_t xTimeOut;
Queue_t * const pxQueue = xQueue;

#if( configUSE_MUTEXES == 1 )
	BaseType_t xInheritanceOccurred = pdFALSE;
#endif

	/* Check the queue pointer is not NULL. */
	configASSERT( ( pxQueue ) );

	/* Check this really is a semaphore, in which case the item size will be
	0. */
	configASSERT( pxQueue->uxItemSize == 0 );// 断言检查信号量的size 是否为0,非零触发断言

	/* Cannot block if the scheduler is suspended. */
	#if ( ( INCLUDE_xTaskGetSchedulerState == 1 ) || ( configUSE_TIMERS == 1 ) )
	{
		configASSERT( !( ( xTaskGetSchedulerState() == taskSCHEDULER_SUSPENDED ) && ( xTicksToWait != 0 ) ) );//检查调度器是否被挂起,挂起调度器则触发断言
	}
	#endif


	/*lint -save -e904 This function relaxes the coding standard somewhat to allow return
	statements within the function itself.  This is done in the interest
	of execution time efficiency. */
	for( ;; )
	{
		taskENTER_CRITICAL();
		{
			/* Semaphores are queues with an item size of 0, and where the
			number of messages in the queue is the semaphore's count value. */
			const UBaseType_t uxSemaphoreCount = pxQueue->uxMessagesWaiting;

			/* Is there data in the queue now?  To be running the calling task
			must be the highest priority task wanting to access the queue. */
			if( uxSemaphoreCount > ( UBaseType_t ) 0 )// 检查信号量的计数值是否大于0,如果大于零信号量可以进行-1获取动作
			{
				traceQUEUE_RECEIVE( pxQueue );

				/* Semaphores are queues with a data size of zero and where the
				messages waiting is the semaphore's count.  Reduce the count. */
				pxQueue->uxMessagesWaiting = uxSemaphoreCount - ( UBaseType_t ) 1;//对信号量数值进行-1处理

				#if ( configUSE_MUTEXES == 1 )
				{
					if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX )//当前queue 类型为互斥锁类型
					{
						/* Record the information required to implement
						priority inheritance should it become necessary. */
						pxQueue->u.xSemaphore.xMutexHolder = pvTaskIncrementMutexHeldCount();//更新互斥锁holder 计数+1
					}
					else
					{
						mtCOVERAGE_TEST_MARKER();
					}
				}
				#endif /* configUSE_MUTEXES */

				/* Check to see if other tasks are blocked waiting to give the
				semaphore, and if so, unblock the highest priority such task. */
				if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
				{
					if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
					{
						queueYIELD_IF_USING_PREEMPTION();
					}
					else
					{
						mtCOVERAGE_TEST_MARKER();
					}
				}
				else
				{
					mtCOVERAGE_TEST_MARKER();
				}

				taskEXIT_CRITICAL();
				return pdPASS;
			}
			else
			{
				if( xTicksToWait == ( TickType_t ) 0 )
				{
					/* For inheritance to have occurred there must have been an
					initial timeout, and an adjusted timeout cannot become 0, as
					if it were 0 the function would have exited. */
					#if( configUSE_MUTEXES == 1 )
					{
						configASSERT( xInheritanceOccurred == pdFALSE );
					}
					#endif /* configUSE_MUTEXES */

					/* The semaphore count was 0 and no block time is specified
					(or the block time has expired) so exit now. */
					taskEXIT_CRITICAL();
					traceQUEUE_RECEIVE_FAILED( pxQueue );
					return errQUEUE_EMPTY;
				}
				else if( xEntryTimeSet == pdFALSE )
				{
					/* The semaphore count was 0 and a block time was specified
					so configure the timeout structure ready to block. */
					vTaskInternalSetTimeOutState( &xTimeOut );
					xEntryTimeSet = pdTRUE;
				}
				else
				{
					/* Entry time was already set. */
					mtCOVERAGE_TEST_MARKER();
				}
			}
		}
		taskEXIT_CRITICAL();

		/* Interrupts and other tasks can give to and take from the semaphore
		now the critical section has been exited. */

		vTaskSuspendAll();
		prvLockQueue( pxQueue );

		/* Update the timeout state to see if it has expired yet. */
		if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE )
		{
			/* A block time is specified and not expired.  If the semaphore
			count is 0 then enter the Blocked state to wait for a semaphore to
			become available.  As semaphores are implemented with queues the
			queue being empty is equivalent to the semaphore count being 0. */
			if( prvIsQueueEmpty( pxQueue ) != pdFALSE )
			{
				traceBLOCKING_ON_QUEUE_RECEIVE( pxQueue );

				#if ( configUSE_MUTEXES == 1 )
				{
					if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX )
					{
						taskENTER_CRITICAL();
						{
							xInheritanceOccurred = xTaskPriorityInherit( pxQueue->u.xSemaphore.xMutexHolder );
						}
						taskEXIT_CRITICAL();
					}
					else
					{
						mtCOVERAGE_TEST_MARKER();
					}
				}
				#endif

				vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToReceive ), xTicksToWait );
				prvUnlockQueue( pxQueue );
				if( xTaskResumeAll() == pdFALSE )
				{
					portYIELD_WITHIN_API();
				}
				else
				{
					mtCOVERAGE_TEST_MARKER();
				}
			}
			else
			{
				/* There was no timeout and the semaphore count was not 0, so
				attempt to take the semaphore again. */
				prvUnlockQueue( pxQueue );
				( void ) xTaskResumeAll();
			}
		}
		else
		{
			/* Timed out. */
			prvUnlockQueue( pxQueue );
			( void ) xTaskResumeAll();

			/* If the semaphore count is 0 exit now as the timeout has
			expired.  Otherwise return to attempt to take the semaphore that is
			known to be available.  As semaphores are implemented by queues the
			queue being empty is equivalent to the semaphore count being 0. */
			if( prvIsQueueEmpty( pxQueue ) != pdFALSE )
			{
				#if ( configUSE_MUTEXES == 1 )
				{
					/* xInheritanceOccurred could only have be set if
					pxQueue->uxQueueType == queueQUEUE_IS_MUTEX so no need to
					test the mutex type again to check it is actually a mutex. */
					if( xInheritanceOccurred != pdFALSE )
					{
						taskENTER_CRITICAL();
						{
							UBaseType_t uxHighestWaitingPriority;

							/* This task blocking on the mutex caused another
							task to inherit this task's priority.  Now this task
							has timed out the priority should be disinherited
							again, but only as low as the next highest priority
							task that is waiting for the same mutex. */
							uxHighestWaitingPriority = prvGetDisinheritPriorityAfterTimeout( pxQueue );
							vTaskPriorityDisinheritAfterTimeout( pxQueue->u.xSemaphore.xMutexHolder, uxHighestWaitingPriority );
						}
						taskEXIT_CRITICAL();
					}
				}
				#endif /* configUSE_MUTEXES */

				traceQUEUE_RECEIVE_FAILED( pxQueue );
				return errQUEUE_EMPTY;
			}
			else
			{
				mtCOVERAGE_TEST_MARKER();
			}
		}
	} /*lint -restore */
}



专家
2025-05-18 09:21:08     打赏
2楼

谢谢分享


专家
2025-05-18 11:38:56     打赏
3楼

感谢分享


专家
2025-05-18 11:42:48     打赏
4楼

感谢分享


共4条 1/1 1 跳转至

回复

匿名不能发帖!请先 [ 登陆 注册 ]