Mikekhil

pika error

Mar 29th, 2021 (edited)
431
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.24 KB | None | 0 0
  1. import uuid
  2.  
  3. import pika
  4. from locust import User
  5.  
  6. from utils.time_measure import decorate_time
  7.  
  8.  
  9. class Rabbit:
  10.     def __init__(self, host, exchange_name, routing_key, virtual_host):
  11.         self.virtual_host = virtual_host
  12.         self.host = host
  13.         self.routing_key = routing_key
  14.         self.exchange_name = exchange_name
  15.         self.connection = None
  16.         self.channels = []
  17.  
  18.     def connect(self) -> None:
  19.         """Connect to Rabbit and create a pool of channels"""
  20.         credentials = pika.PlainCredentials('admin', 'admin')
  21.         parameters = pika.ConnectionParameters(host=self.host, credentials=credentials, virtual_host=self.virtual_host,
  22.                                                port=35672, channel_max=256, heartbeat=60)
  23.         connection = pika.BlockingConnection(parameters)
  24.         self.connection = connection
  25.         for _ in range(10):
  26.             self.channels.append(connection.channel())
  27.  
  28.     def close(self) -> None:
  29.         """Close all channels and close connection"""
  30.         for channel in self.channels:
  31.             channel.close()
  32.         self.connection.close()
  33.  
  34.     @decorate_time
  35.     def publish(self, body="") -> None:
  36.         """
  37.        Publish a message with param <body> in open channel. If channel is_closed or number of channels not enough
  38.        created a new one.
  39.        :param body: str
  40.        """
  41.         try:
  42.             channel = self.channels.pop(0)
  43.             if channel.is_closed():
  44.                 channel = self.connection.channel()
  45.         except IndexError as e:
  46.             print(repr(e))
  47.             print("List is empty")
  48.             channel = self.connection.channel()
  49.         channel.basic_publish(
  50.             exchange=self.exchange_name, routing_key=self.routing_key, body=body, properties=
  51.             pika.BasicProperties(content_type='application/json', delivery_mode=1, content_encoding="UTF-8",
  52.                                  headers={"X-Flow-ID": str(uuid.uuid4())}))
  53.         self.channels.append(channel)
  54.  
  55.  
  56. class CustomRabbitLocust(User):
  57.     abstract = True
  58.  
  59.     def __init__(self, *args, **kwargs):
  60.         super().__init__(*args, **kwargs)
  61.         self.client = Rabbit(self.host, self.exchange_name, self.routing_key, self.virtual_host)
  62.  
Add Comment
Please, Sign In to add comment