Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import time
- from cassandra.auth import PlainTextAuthProvider
- from cassandra.cluster import Cluster
- from cassandra.policies import (TokenAwarePolicy, RoundRobinPolicy,
- ConstantReconnectionPolicy, RetryPolicy,
- WriteType)
- class CassRetryPolicy(RetryPolicy):
- """
- Custom Retry Policy for failed connections and operations.
- @param delay: The time in seconds to wait between retries
- @type delay: int
- @param max_attempts: The maximum number of retries to attempt before failure
- @type max_attempts: int
- """
- def __init__(
- self,
- delay,
- max_attempts
- ):
- self.delay = delay
- self.max_attempts = max_attempts
- def on_read_timeout(
- self,
- query,
- consistency,
- required_responses,
- received_responses,
- data_retrieved,
- retry_num
- ):
- """
- If a read times out, we want to retry 20 times and wait 30 seconds
- between each attempt.
- """
- if retry_num >= self.max_attempts:
- return self.RETHROW, None
- else:
- time.sleep(self.delay)
- return self.RETRY, consistency
- def on_write_timeout(
- self,
- query,
- consistency,
- write_type,
- required_responses,
- received_responses,
- retry_num
- ):
- """
- Only retry un-logged batch writes and writes to the batch log
- **********************************************************************
- IS THIS THE PROPER APPROACH? OR SHOULD I CHECK OTHER WRITES TOO?
- **********************************************************************
- """
- if retry_num >= self.max_attempts:
- return self.RETHROW, None
- elif write_type in (
- WriteType.SIMPLE,
- WriteType.BATCH,
- WriteType.COUNTER
- ):
- return self.IGNORE, None
- elif write_type in (
- WriteType.UNLOGGED_BATCH,
- WriteType.BATCH_LOG
- ):
- time.sleep(self.delay)
- return self.RETRY, consistency
- else:
- return self.RETHROW, None
- def on_unavailable(
- self,
- query,
- consistency,
- required_replicas,
- alive_replicas,
- retry_num
- ):
- """
- Retry when nodes are unavailable
- *** I HAVE TRIED RUNNING THIS IN DEBUG MODE AND THE LINES BELOW NEVER EXECUTE
- """
- if retry_num >= self.max_attempts:
- return self.RETHROW, None
- else:
- time.sleep(self.delay)
- return self.RETRY, consistency
- def cluster(
- contact_points,
- username=None,
- password=None,
- load_balancing_policy=TokenAwarePolicy(
- RoundRobinPolicy()
- ),
- reconnection_policy=ConstantReconnectionPolicy(
- delay=30,
- max_attempts=20
- ),
- default_retry_policy=CassRetryPolicy(
- delay=30,
- max_attempts=20
- )
- ):
- """
- Create a Cassandra Cluster Object to identify the cluster of nodes to
- connect to.
- """
- # TODO: change load balancing policy to DC aware when using network topology
- auth_provider = PlainTextAuthProvider(
- username=username,
- password=password
- )
- return Cluster(
- contact_points=contact_points,
- auth_provider=auth_provider,
- load_balancing_policy=load_balancing_policy,
- reconnection_policy=reconnection_policy,
- default_retry_policy=default_retry_policy
- )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement