Advertisement
Guest User

Untitled

a guest
May 24th, 2015
271
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.65 KB | None | 0 0
  1. import time
  2. from cassandra.auth import PlainTextAuthProvider
  3. from cassandra.cluster import Cluster
  4. from cassandra.policies import (TokenAwarePolicy, RoundRobinPolicy,
  5. ConstantReconnectionPolicy, RetryPolicy,
  6. WriteType)
  7.  
  8.  
  9. class CassRetryPolicy(RetryPolicy):
  10. """
  11. Custom Retry Policy for failed connections and operations.
  12. @param delay: The time in seconds to wait between retries
  13. @type delay: int
  14. @param max_attempts: The maximum number of retries to attempt before failure
  15. @type max_attempts: int
  16. """
  17.  
  18. def __init__(
  19. self,
  20. delay,
  21. max_attempts
  22. ):
  23. self.delay = delay
  24. self.max_attempts = max_attempts
  25.  
  26. def on_read_timeout(
  27. self,
  28. query,
  29. consistency,
  30. required_responses,
  31. received_responses,
  32. data_retrieved,
  33. retry_num
  34. ):
  35. """
  36. If a read times out, we want to retry 20 times and wait 30 seconds
  37. between each attempt.
  38. """
  39. if retry_num >= self.max_attempts:
  40. return self.RETHROW, None
  41. else:
  42. time.sleep(self.delay)
  43. return self.RETRY, consistency
  44.  
  45. def on_write_timeout(
  46. self,
  47. query,
  48. consistency,
  49. write_type,
  50. required_responses,
  51. received_responses,
  52. retry_num
  53. ):
  54. """
  55. Only retry un-logged batch writes and writes to the batch log
  56. **********************************************************************
  57. IS THIS THE PROPER APPROACH? OR SHOULD I CHECK OTHER WRITES TOO?
  58. **********************************************************************
  59. """
  60. if retry_num >= self.max_attempts:
  61. return self.RETHROW, None
  62. elif write_type in (
  63. WriteType.SIMPLE,
  64. WriteType.BATCH,
  65. WriteType.COUNTER
  66. ):
  67. return self.IGNORE, None
  68. elif write_type in (
  69. WriteType.UNLOGGED_BATCH,
  70. WriteType.BATCH_LOG
  71. ):
  72. time.sleep(self.delay)
  73. return self.RETRY, consistency
  74. else:
  75. return self.RETHROW, None
  76.  
  77. def on_unavailable(
  78. self,
  79. query,
  80. consistency,
  81. required_replicas,
  82. alive_replicas,
  83. retry_num
  84. ):
  85. """
  86. Retry when nodes are unavailable
  87. *** I HAVE TRIED RUNNING THIS IN DEBUG MODE AND THE LINES BELOW NEVER EXECUTE
  88. """
  89. if retry_num >= self.max_attempts:
  90. return self.RETHROW, None
  91. else:
  92. time.sleep(self.delay)
  93. return self.RETRY, consistency
  94.  
  95.  
  96. def cluster(
  97. contact_points,
  98. username=None,
  99. password=None,
  100. load_balancing_policy=TokenAwarePolicy(
  101. RoundRobinPolicy()
  102. ),
  103. reconnection_policy=ConstantReconnectionPolicy(
  104. delay=30,
  105. max_attempts=20
  106. ),
  107. default_retry_policy=CassRetryPolicy(
  108. delay=30,
  109. max_attempts=20
  110. )
  111. ):
  112. """
  113. Create a Cassandra Cluster Object to identify the cluster of nodes to
  114. connect to.
  115. """
  116. # TODO: change load balancing policy to DC aware when using network topology
  117. auth_provider = PlainTextAuthProvider(
  118. username=username,
  119. password=password
  120. )
  121.  
  122. return Cluster(
  123. contact_points=contact_points,
  124. auth_provider=auth_provider,
  125. load_balancing_policy=load_balancing_policy,
  126. reconnection_policy=reconnection_policy,
  127. default_retry_policy=default_retry_policy
  128. )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement