zoom.queues module

zoom.queues

message queues

class zoom.queues.Queues(db=None)

Bases: object

messages

>>> messages = setup_test()
>>> t = messages.get('test_topic')
>>> t.put('hey!')
1
>>> t.put('you!')
2
>>> t.peek()
'hey!'
clear()
get(name, newest=None)
stats()
topic(name, newest=None)
topics()
class zoom.queues.Topic(name, newest=None, db=None)

Bases: object

message topic

call(*messages, delay=0.1, timeout=15)

send messages and wait for responses

clear()

clear the topic

>>> messages = setup_test()
>>> t = messages.get('test_topic')
>>> t.send('hey!', 'you!')
[1, 2]
>>> len(t)
2
>>> t.clear()
>>> len(t)
0
handle(f, timeout=0, delay=0.1, one_pass=False)

respond to and consume messages

>>> messages = setup_test()
>>> t = messages.get('test_topic')
>>> def echo(m):
...     if m == 'quit': raise StopHandling
...     print('got', repr(m))
>>> t.put('hey!')
1
>>> t.put('you!')
2
>>> t.put('quit')
3
>>> t.handle(echo)
got 'hey!'
got 'you!'
2
join(jobs, delay=0.1, timeout=15)

wait for responses for consumers

NOTE: the assumption at this point is that any provided delay or timeout applies to all jobs. If jobs need varying arguments then mutliple calls to join should be considered.

last()

get row_id of the last (newest) message in the topic

len(newest=None)

return the number of messages in the topic

>>> messages = setup_test()
>>> t = messages.get('test_topic')
>>> t.put('hey!')
1
>>> t.put('you!')
2
>>> t.len()
2
listen(f, delay=0.1, meta=False)

observe but don’t consume messages

>>> messages = setup_test()
>>> t = messages.get('test_topic')
>>> t.put('hey!')
1
>>> t.put('you!')
2
>>> def echo(m):
...     print(m)
...     return m == 'you!'
>>> t.listen(echo)
hey!
you!
2
>>> t1 = messages.topic('test_topic1')
>>> t2 = messages.topic('test_topic2')
>>> t3 = messages.topic(None)
>>> t1.put('hey!')
3
>>> t2.put('you!')
4
>>> def echo(m):
...     print(m)
...     return m == 'you!'
>>> t3.listen(echo)
hey!
you!
2
peek(newest=None)

return the next message but don’t remove it

>>> messages = setup_test()
>>> t = messages.get('test_topic')
>>> t.peek()
>>> t.put('hey!')
1
>>> t.put('you!')
2
>>> t.peek()
'hey!'
>>> t.peek()
'hey!'
perform(task, *args, **kwargs)

consume a single message and perform task with it

>>> messages = setup_test()
>>> t = messages.get('test_topic')
>>> def echo(m):
...     print('got', repr(m))
>>> t.put('hey!')
1
>>> t.put('you!')
2
>>> t.perform(echo)
got 'hey!'
True
>>> t.perform(echo)
got 'you!'
True
>>> t.perform(echo)
False
poll(newest=None)

peek at the next message and increment internal pointer

>>> messages = setup_test()
>>> t = messages.get('test_topic')
>>> t.put('hey!')
1
>>> t.put('you!')
2
>>> t.newest
-1
>>> t.poll()
'hey!'
>>> t.newest
1
>>> t.poll()
'you!'
>>> raised = False
>>> try:
...     t.poll()
... except EmptyException:
...     raised = True
>>> raised
True
>>> t.newest = -1
>>> t.poll()
'hey!'
pop()

read next message and remove it from the topic

>>> messages = setup_test()
>>> t = messages.get('test_topic')
>>> t.put('hey!')
1
>>> t.put('you!')
2
>>> t.len()
2
>>> t._peek()
(1, 'test_topic', 'hey!')
>>> t.pop()
'hey!'
>>> t.len()
1
>>> t.pop()
'you!'
>>> t.len()
0
>>> t.pop()
>>> t.newest = -1
>>> raised = False
>>> try:
...     t._pop()
... except EmptyException:
...     raised = True
>>> raised
True
process(f)

respond to and consume current messages

>>> messages = setup_test()
>>> t = messages.get('test_topic')
>>> def echo(m):
...     if m == 'quit': raise StopProcessing
...     print('got', repr(m))
>>> t.put('hey!')
1
>>> t.put('you!')
2
>>> t.put('quit')
3
>>> t.process(echo)
got 'hey!'
got 'you!'
2
>>> t.process(echo)
0
put(message)

put a message in the topic

send(*messages)

send list of messages

>>> messages = setup_test()
>>> t = messages.get('test_topic')
>>> t.send('hey!', 'you!')
[1, 2]
>>> t.peek()
'hey!'
>>> t.peek()
'hey!'
wait(delay=0.1, timeout=15)

wait for a message to arrive and return it

>>> messages = setup_test()
>>> t = messages.get('test_topic')
>>> t.put('hey!')
1
>>> t.put('you!')
2
>>> t.wait()
'hey!'
>>> t.wait()
'you!'
exception zoom.queues.EmptyException

Bases: Exception

exception zoom.queues.WaitException

Bases: Exception

exception zoom.queues.StopListening

Bases: Exception

exception zoom.queues.StopHandling

Bases: Exception

exception zoom.queues.StopProcessing

Bases: Exception