tornado.queues – Queues for coroutines¶
New in version 4.2.
Asynchronous queues for coroutines.
Warning
Unlike the standard library’s queue module, the classes defined here
are not thread-safe. To use these queues from another thread,
use IOLoop.add_callback to transfer control to the IOLoop thread
before calling any queue methods.
Classes¶
Queue¶
-
class
tornado.queues.Queue(maxsize=0)[source]¶ Coordinate producer and consumer coroutines.
If maxsize is 0 (the default) the queue size is unbounded.
from tornado import gen from tornado.ioloop import IOLoop from tornado.queues import Queue q = Queue(maxsize=2) @gen.coroutine def consumer(): while True: item = yield q.get() try: print('Doing work on %s' % item) yield gen.sleep(0.01) finally: q.task_done() @gen.coroutine def producer(): for item in range(5): yield q.put(item) print('Put %s' % item) @gen.coroutine def main(): # Start consumer without waiting (since it never finishes). IOLoop.current().spawn_callback(consumer) yield producer() # Wait for producer to put all tasks. yield q.join() # Wait for consumer to finish all tasks. print('Done') IOLoop.current().run_sync(main)
Put 0 Put 1 Doing work on 0 Put 2 Doing work on 1 Put 3 Doing work on 2 Put 4 Doing work on 3 Doing work on 4 Done
In Python 3.5,
Queueimplements the async iterator protocol, soconsumer()could be rewritten as:async def consumer(): async for item in q: try: print('Doing work on %s' % item) yield gen.sleep(0.01) finally: q.task_done()
Changed in version 4.3: Added
async forsupport in Python 3.5.-
maxsize¶ Number of items allowed in the queue.
-
put(item, timeout=None)[source]¶ Put an item into the queue, perhaps waiting until there is room.
Returns a Future, which raises
tornado.gen.TimeoutErrorafter a timeout.
-
put_nowait(item)[source]¶ Put an item into the queue without blocking.
If no free slot is immediately available, raise
QueueFull.
-
get(timeout=None)[source]¶ Remove and return an item from the queue.
Returns a Future which resolves once an item is available, or raises
tornado.gen.TimeoutErrorafter a timeout.
-
get_nowait()[source]¶ Remove and return an item from the queue without blocking.
Return an item if one is immediately available, else raise
QueueEmpty.
-
task_done()[source]¶ Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each
getused to fetch a task, a subsequent call totask_donetells the queue that the processing on the task is complete.If a
joinis blocking, it resumes when all items have been processed; that is, when everyputis matched by atask_done.Raises
ValueErrorif called more times thanput.
-
join(timeout=None)[source]¶ Block until all items in the queue are processed.
Returns a Future, which raises
tornado.gen.TimeoutErrorafter a timeout.
-
PriorityQueue¶
-
class
tornado.queues.PriorityQueue(maxsize=0)[source]¶ A
Queuethat retrieves entries in priority order, lowest first.Entries are typically tuples like
(priority number, data).from tornado.queues import PriorityQueue q = PriorityQueue() q.put((1, 'medium-priority item')) q.put((0, 'high-priority item')) q.put((10, 'low-priority item')) print(q.get_nowait()) print(q.get_nowait()) print(q.get_nowait())
(0, 'high-priority item') (1, 'medium-priority item') (10, 'low-priority item')
Exceptions¶
QueueEmpty¶
-
exception
tornado.queues.QueueEmpty[source]¶ Raised by
Queue.get_nowaitwhen the queue has no items.
QueueFull¶
-
exception
tornado.queues.QueueFull[source]¶ Raised by
Queue.put_nowaitwhen a queue is at its maximum size.