zoom.queues module¶
zoom.queues
message queues
-
class
zoom.queues.Queues(db=None)¶ Bases:
objectmessages
>>> messages = setup_test() >>> t = messages.get('test_topic') >>> t.put('hey!') 1 >>> t.put('you!') 2 >>> t.peek() 'hey!'
-
clear()¶
-
get(name, newest=None)¶
-
stats()¶
-
topic(name, newest=None)¶
-
topics()¶
-
-
class
zoom.queues.Topic(name, newest=None, db=None)¶ Bases:
objectmessage topic
-
call(*messages, delay=0.1, timeout=15)¶ send messages and wait for responses
-
clear()¶ clear the topic
>>> messages = setup_test() >>> t = messages.get('test_topic') >>> t.send('hey!', 'you!') [1, 2] >>> len(t) 2 >>> t.clear() >>> len(t) 0
-
handle(f, timeout=0, delay=0.1, one_pass=False)¶ respond to and consume messages
>>> messages = setup_test() >>> t = messages.get('test_topic') >>> def echo(m): ... if m == 'quit': raise StopHandling ... print('got', repr(m)) >>> t.put('hey!') 1 >>> t.put('you!') 2 >>> t.put('quit') 3 >>> t.handle(echo) got 'hey!' got 'you!' 2
-
join(jobs, delay=0.1, timeout=15)¶ wait for responses for consumers
NOTE: the assumption at this point is that any provided delay or timeout applies to all jobs. If jobs need varying arguments then mutliple calls to join should be considered.
-
last()¶ get row_id of the last (newest) message in the topic
-
len(newest=None)¶ return the number of messages in the topic
>>> messages = setup_test() >>> t = messages.get('test_topic') >>> t.put('hey!') 1 >>> t.put('you!') 2 >>> t.len() 2
-
listen(f, delay=0.1, meta=False)¶ observe but don’t consume messages
>>> messages = setup_test() >>> t = messages.get('test_topic') >>> t.put('hey!') 1 >>> t.put('you!') 2 >>> def echo(m): ... print(m) ... return m == 'you!' >>> t.listen(echo) hey! you! 2
>>> t1 = messages.topic('test_topic1') >>> t2 = messages.topic('test_topic2') >>> t3 = messages.topic(None)
>>> t1.put('hey!') 3 >>> t2.put('you!') 4 >>> def echo(m): ... print(m) ... return m == 'you!' >>> t3.listen(echo) hey! you! 2
-
peek(newest=None)¶ return the next message but don’t remove it
>>> messages = setup_test() >>> t = messages.get('test_topic') >>> t.peek() >>> t.put('hey!') 1 >>> t.put('you!') 2 >>> t.peek() 'hey!' >>> t.peek() 'hey!'
-
perform(task, *args, **kwargs)¶ consume a single message and perform task with it
>>> messages = setup_test() >>> t = messages.get('test_topic') >>> def echo(m): ... print('got', repr(m)) >>> t.put('hey!') 1 >>> t.put('you!') 2 >>> t.perform(echo) got 'hey!' True >>> t.perform(echo) got 'you!' True >>> t.perform(echo) False
-
poll(newest=None)¶ peek at the next message and increment internal pointer
>>> messages = setup_test() >>> t = messages.get('test_topic') >>> t.put('hey!') 1 >>> t.put('you!') 2 >>> t.newest -1 >>> t.poll() 'hey!' >>> t.newest 1 >>> t.poll() 'you!'
>>> raised = False >>> try: ... t.poll() ... except EmptyException: ... raised = True >>> raised True
>>> t.newest = -1 >>> t.poll() 'hey!'
-
pop()¶ read next message and remove it from the topic
>>> messages = setup_test() >>> t = messages.get('test_topic') >>> t.put('hey!') 1 >>> t.put('you!') 2 >>> t.len() 2 >>> t._peek() (1, 'test_topic', 'hey!') >>> t.pop() 'hey!' >>> t.len() 1 >>> t.pop() 'you!' >>> t.len() 0 >>> t.pop() >>> t.newest = -1 >>> raised = False >>> try: ... t._pop() ... except EmptyException: ... raised = True >>> raised True
-
process(f)¶ respond to and consume current messages
>>> messages = setup_test() >>> t = messages.get('test_topic') >>> def echo(m): ... if m == 'quit': raise StopProcessing ... print('got', repr(m)) >>> t.put('hey!') 1 >>> t.put('you!') 2 >>> t.put('quit') 3 >>> t.process(echo) got 'hey!' got 'you!' 2 >>> t.process(echo) 0
-
put(message)¶ put a message in the topic
-
send(*messages)¶ send list of messages
>>> messages = setup_test() >>> t = messages.get('test_topic') >>> t.send('hey!', 'you!') [1, 2] >>> t.peek() 'hey!' >>> t.peek() 'hey!'
-
wait(delay=0.1, timeout=15)¶ wait for a message to arrive and return it
>>> messages = setup_test() >>> t = messages.get('test_topic') >>> t.put('hey!') 1 >>> t.put('you!') 2 >>> t.wait() 'hey!' >>> t.wait() 'you!'
-
-
exception
zoom.queues.EmptyException¶ Bases:
Exception
-
exception
zoom.queues.WaitException¶ Bases:
Exception
-
exception
zoom.queues.StopListening¶ Bases:
Exception
-
exception
zoom.queues.StopHandling¶ Bases:
Exception
-
exception
zoom.queues.StopProcessing¶ Bases:
Exception