Advertisement
Guest User

Untitled

a guest
Apr 19th, 2017
119
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.13 KB | None | 0 0
  1. import time
  2. import types
  3.  
  4. import bson
  5. from pymongo import (
  6. MongoClient,
  7. ReadPreference,
  8. uri_parser,
  9. version as py_version,
  10. )
  11.  
  12. from datetime import datetime
  13.  
  14. from checks import AgentCheck
  15.  
  16. DEFAULT_TIMEOUT = 10
  17.  
  18. class AscentBatchCheck(AgentCheck):
  19.  
  20. SERVICE_CHECK_NAME = 'tokumx.can_connect'
  21.  
  22. def __init__(self, name, init_config, agentConfig, instances=None):
  23. AgentCheck.__init__(self, name, init_config, agentConfig, instances)
  24.  
  25. def create_event(self, db, batch, state, agentConfig):
  26. msg_title = "%s is stuck in %s state" % (batch, state)
  27. msg = "%s in %s has been found stuck in %s state" % (batch, db, state)
  28. self.event({
  29. 'timestamp': int(time.time()),
  30. 'event_type': 'ascent_batch',
  31. 'msg_title': msg_title,
  32. 'msg_text': msg,
  33. 'host': self.hostname
  34. })
  35.  
  36. def _get_ssl_params(self, instance):
  37. ssl_params = {
  38. 'ssl': instance.get('ssl', None),
  39. 'ssl_keyfile': instance.get('ssl_keyfile', None),
  40. 'ssl_certfile': instance.get('ssl_certfile', None),
  41. 'ssl_cert_reqs': instance.get('ssl_cert_reqs', None),
  42. 'ssl_ca_certs': instance.get('ssl_ca_certs', None)
  43. }
  44.  
  45. for key, param in ssl_params.items():
  46. if param is None:
  47. del ssl_params[key]
  48.  
  49. return ssl_params
  50.  
  51. def _get_connection(self, instance, read_preference=None):
  52. if 'server' not in instance:
  53. raise Exception("Missing 'server' in ascent-batches config")
  54.  
  55. server = instance['server']
  56.  
  57. ssl_params = self._get_ssl_params(instance)
  58.  
  59. tags = instance.get('tags', [])
  60. tags.append('server:%s' % server)
  61. # de-dupe tags to avoid a memory leak
  62. tags = list(set(tags))
  63.  
  64. # Configuration a URL, mongodb://user:pass@server/db
  65. parsed = uri_parser.parse_uri(server)
  66. username = parsed.get('username')
  67. password = parsed.get('password')
  68. db_name = parsed.get('database')
  69.  
  70. nodelist = parsed.get('nodelist')
  71. if nodelist:
  72. host = nodelist[0][0]
  73. port = nodelist[0][1]
  74. service_check_tags = [
  75. "host:%s" % host,
  76. "port:%s" % port
  77. ]
  78.  
  79. do_auth = True
  80. if username is None or password is None:
  81. self.log.debug("TokuMX: cannot extract username and password from config %s" % server)
  82. do_auth = False
  83. try:
  84. conn = MongoClient(server, socketTimeoutMS=DEFAULT_TIMEOUT*1000, **ssl_params)
  85. except Exception:
  86. self.service_check(self.SERVICE_CHECK_NAME, AgentCheck.CRITICAL, tags=service_check_tags)
  87. raise
  88.  
  89. if do_auth:
  90. if not db.authenticate(username, password):
  91. message = "TokuMX: cannot connect with config %s" % server
  92. self.service_check(self.SERVICE_CHECK_NAME, AgentCheck.CRITICAL, tags=service_check_tags, message=message)
  93. raise Exception(message)
  94.  
  95. self.service_check(self.SERVICE_CHECK_NAME, AgentCheck.OK, tags=service_check_tags)
  96.  
  97. return conn, tags
  98.  
  99. def taking_too_long(self, ts):
  100. return (abs(datetime.utcnow() - ts)).total_seconds > 10800
  101.  
  102. def find_stuck_batches(self, instance, conn, tags):
  103. replSet = conn['admin'].command('replSetGetStatus')
  104. for member in replSet.get('members'):
  105. if member.get('self'):
  106. if int(member.get('state')) != 1:
  107. self.log.info('This server is not currently a replicaSet master.')
  108. return
  109.  
  110. for dbname in conn.database_names():
  111. if dbname in instance['exclude']:
  112. self.log.info("Skipping %s" % dbname)
  113. continue
  114.  
  115. self.log.info("Checking %s" % dbname)
  116. db_tags = list(tags)
  117. db_tags.append('db:%s' % dbname)
  118. db = conn[dbname]
  119. if 'batches' in db.collection_names():
  120. self.log.info('Checking %s for batches stuck in converting/processing state' % dbname)
  121. batches = db.batches
  122. for batch in batches.find({ '$or': [ { 'state': 'converting' }, { 'state': 'processing' } ] }):
  123. if batch['state'] == 'converting' and self.taking_too_long(batch['converting_at']):
  124. self.create_event(db, batch['name'], batch['state'], self.agentConfig)
  125. elif batch['state'] == 'processing' and self.taking_too_long(batch['processing_at']):
  126. self.create_event(db, batch['name'], batch['state'], self.agentConfig)
  127.  
  128. def create_event(self, db, batch, state, agentConfig):
  129. msg_title = "%s is stuck in %s state" % (batch, state)
  130. msg = "%s in %s has been found stuck in %s state" % (str(batch), str(db.name), state)
  131. self.event({
  132. 'timestamp': int(time.time()),
  133. 'event_type': 'ascent_batch',
  134. 'msg_title': msg_title,
  135. 'msg_text': msg,
  136. 'host': self.hostname
  137. })
  138.  
  139. def check(self, instance):
  140. conn, tags = self._get_connection(instance)
  141. self.find_stuck_batches(instance, conn, tags)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement