[ad_1]
The queue mentioned earlier is a First-In-First-Out (FIFO) queue, where the first item to enter the queue is the first to be retrieved. This is suitable when all tasks in the queue have the same priority.
However, consider the following situation:
Suppose there is a queue with tasks waiting in line, each requiring a long processing time.
An error log or VIP user access is a high-priority task that needs immediate attention. What should we do?
This is where asyncio.PriorityQueue
comes into play.
Briefly describe asyncio.PriorityQueue’s implementation
Unlike FIFO queues based on lists, asyncio.PriorityQueue
is based on heaps. It is built using a binary tree structure.
You may be familiar with binary search trees, which ensure that the most minor node is always the leftmost node.
However, the binary tree in asyncio.PriorityQueue
ensures that the most minor node is always at the top, so the highest priority node is permanently removed first.
Real-world example with asyncio.PriorityQueue
Let’s illustrate the usage of asyncio.PriorityQueue
with a real-world scenario that exists in practice.
Imagine we have an order service API. The API takes time for each order to process, but we can’t keep users waiting too long.
So when a user places an order, the API first puts the order into a queue, allowing a background task to process it asynchronously while immediately returning a message to the user.
This API accepts orders from two types of users: regular users and VIP users. It must ensure that VIP user orders are processed with the highest priority.
To keep the learning curve low for readers, in this example, we will use aiohttp
to implement the server. The specific code is as follows:
First, we define an enumeration marking the two categories: regular users and VIP users.
Next, we use dataclass
to define a user’s order, which contains the user type and order processing duration. The order duration is not considered in priority sorting.
Then we define the consumer method process_order_worker
, which retrieves orders from the queue and simulates the order processing.
Don’t forget to use queue.task_done()
to tell the queue that we finished processing the order.
Following that, we implement the order API using aiohttp
. This API responds to user requests, generates an order object, and places it in the asyncio.PriorityQueue
.
It then immediately returns a response to the user, avoiding user wait time.
When the program starts, we use create_order_queue
to initialize the queue and order consumption tasks.
When the program ends, we use destroy_order_queue
to ensure that all orders in the queue are processed and the background tasks are closed correctly.
queue.join()
will wait for all the data in the queue to be processed. asyncio.wait_for
sets a timeout of 20 seconds, after which it will no longer wait queue.join()
to complete.
We can test this implementation using PyCharm’s HTTP Request:
As you can see, the two high-priority tasks are processed as expected. Perfect!
Source link