Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #coding=utf-8
- from google.gax.errors import GaxError
- from google.pubsub.v1.subscriber_api import SubscriberApi
- from grpc.framework.interfaces.face.face import ExpirationError
- def main():
- api = SubscriberApi(timeout=5)
- subscription = api.subscription_path(<project>, <subscription-name>)
- for i in xrange(5):
- print('[PULL] Pull:ing data...')
- try:
- resp = api.pull(subscription, max_messages=5)
- except GaxError as e:
- if isinstance(e.cause, ExpirationError):
- print('[PULL] timed out!')
- continue
- raise e
- print(u'[PULL] ✉️ Got messages: {}'.format(resp))
- print(u'[ACK] Acknowledging ...')
- ack_ids = map(lambda msg: msg.ack_id, resp.received_messages)
- ackresp = api.acknowledge(subscription, ack_ids)
- print(u'[ACK] Done!')
- print('bye')
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement