Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pika
- import json
- import time
- import sys
- def handle_incoming_message(channel, method, properties, body) ->None:
- print(f'[{channel}][{method}][{properties}][{body}]')
- channel.basic_ack(delivery_tag = method.delivery_tag)
- if __name__ == '__main__':
- credentials = pika.credentials.PlainCredentials(username = 'guest', password = 'guest')
- with pika.BlockingConnection(parameters = pika.ConnectionParameters(host = '127.0.0.1', port = 6666, credentials = credentials)) as connection:
- with connection.channel() as channel:
- channel.basic_qos(prefetch_count = 1, global_qos = False)
- channel.basic_consume(queue = 'data-stream-000', on_message_callback = handle_incoming_message, arguments = {
- 'single-active-consumer': True,
- 'name': 'consumer-group-1'
- })
- channel.start_consuming()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement