Advertisement
Guest User

Untitled

a guest
Jun 26th, 2019
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.28 KB | None | 0 0
  1. def open(self):
  2. """
  3. Open the Sender using the supplied conneciton.
  4. If the handler has previously been redirected, the redirect
  5. context will be used to create a new handler before opening it.
  6. :param connection: The underlying client shared connection.
  7. :type: connection: ~uamqp.connection.Connection
  8. """
  9. self.running = True
  10. if self.redirected:
  11. self.target = self.redirected.address
  12. self._handler = SendClient(
  13. self.target,
  14. auth=self.client.get_auth(),
  15. debug=self.client.debug,
  16. msg_timeout=self.timeout,
  17. error_policy=self.retry_policy,
  18. keep_alive_interval=self.keep_alive,
  19. client_name=self.name,
  20. properties=self.client.create_properties())
  21. self._handler.open()
  22. while not self._handler.client_ready():
  23. time.sleep(0.05)
  24.  
  25. def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True):
  26. """
  27. Instantiate an EventHub event Sender handler.
  28. :param client: The parent EventHubClient.
  29. :type client: ~azure.eventhub.client.EventHubClient.
  30. :param target: The URI of the EventHub to send to.
  31. :type target: str
  32. :param partition: The specific partition ID to send to. Default is None, in which case the service
  33. will assign to all partitions using round-robin.
  34. :type partition: str
  35. :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is
  36. queued. Default value is 60 seconds. If set to 0, there will be no timeout.
  37. :type send_timeout: int
  38. :param keep_alive: The time interval in seconds between pinging the connection to keep it alive during
  39. periods of inactivity. The default value is None, i.e. no keep alive pings.
  40. :type keep_alive: int
  41. :param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs.
  42. Default value is `True`.
  43. :type auto_reconnect: bool
  44. """
  45. self.running = False
  46. self.client = client
  47. self.target = target
  48. self.partition = partition
  49. self.timeout = send_timeout
  50. self.redirected = None
  51. self.error = None
  52. self.keep_alive = keep_alive
  53. self.auto_reconnect = auto_reconnect
  54. self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
  55. self.reconnect_backoff = 1
  56. self.name = "EHSender-{}".format(uuid.uuid4())
  57. if partition:
  58. self.target += "/Partitions/" + partition
  59. self.name += "-partition{}".format(partition)
  60. self._handler = SendClient(
  61. self.target,
  62. auth=self.client.get_auth(),
  63. debug=self.client.debug,
  64. msg_timeout=self.timeout,
  65. error_policy=self.retry_policy,
  66. keep_alive_interval=self.keep_alive,
  67. client_name=self.name,
  68. properties=self.client.create_properties())
  69. self._outcome = None
  70. self._condition = None
  71.  
  72. if partition:
  73. self.target += "/Partitions/" + partition
  74. self.name += "-partition{}".format(partition)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement