Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import time
- import types
- import bson
- from pymongo import (
- MongoClient,
- ReadPreference,
- uri_parser,
- version as py_version,
- )
- from datetime import datetime
- from checks import AgentCheck
- DEFAULT_TIMEOUT = 10
- class AscentBatchCheck(AgentCheck):
- SERVICE_CHECK_NAME = 'tokumx.can_connect'
- def __init__(self, name, init_config, agentConfig, instances=None):
- AgentCheck.__init__(self, name, init_config, agentConfig, instances)
- def create_event(self, db, batch, state, agentConfig):
- msg_title = "%s is stuck in %s state" % (batch, state)
- msg = "%s in %s has been found stuck in %s state" % (batch, db, state)
- self.event({
- 'timestamp': int(time.time()),
- 'event_type': 'ascent_batch',
- 'msg_title': msg_title,
- 'msg_text': msg,
- 'host': self.hostname
- })
- def _get_ssl_params(self, instance):
- ssl_params = {
- 'ssl': instance.get('ssl', None),
- 'ssl_keyfile': instance.get('ssl_keyfile', None),
- 'ssl_certfile': instance.get('ssl_certfile', None),
- 'ssl_cert_reqs': instance.get('ssl_cert_reqs', None),
- 'ssl_ca_certs': instance.get('ssl_ca_certs', None)
- }
- for key, param in ssl_params.items():
- if param is None:
- del ssl_params[key]
- return ssl_params
- def _get_connection(self, instance, read_preference=None):
- if 'server' not in instance:
- raise Exception("Missing 'server' in ascent-batches config")
- server = instance['server']
- ssl_params = self._get_ssl_params(instance)
- tags = instance.get('tags', [])
- tags.append('server:%s' % server)
- # de-dupe tags to avoid a memory leak
- tags = list(set(tags))
- # Configuration a URL, mongodb://user:pass@server/db
- parsed = uri_parser.parse_uri(server)
- username = parsed.get('username')
- password = parsed.get('password')
- db_name = parsed.get('database')
- nodelist = parsed.get('nodelist')
- if nodelist:
- host = nodelist[0][0]
- port = nodelist[0][1]
- service_check_tags = [
- "host:%s" % host,
- "port:%s" % port
- ]
- do_auth = True
- if username is None or password is None:
- self.log.debug("TokuMX: cannot extract username and password from config %s" % server)
- do_auth = False
- try:
- conn = MongoClient(server, socketTimeoutMS=DEFAULT_TIMEOUT*1000, **ssl_params)
- except Exception:
- self.service_check(self.SERVICE_CHECK_NAME, AgentCheck.CRITICAL, tags=service_check_tags)
- raise
- if do_auth:
- if not db.authenticate(username, password):
- message = "TokuMX: cannot connect with config %s" % server
- self.service_check(self.SERVICE_CHECK_NAME, AgentCheck.CRITICAL, tags=service_check_tags, message=message)
- raise Exception(message)
- self.service_check(self.SERVICE_CHECK_NAME, AgentCheck.OK, tags=service_check_tags)
- return conn, tags
- def taking_too_long(self, ts):
- return (abs(datetime.utcnow() - ts)).total_seconds > 10800
- def find_stuck_batches(self, instance, conn, tags):
- replSet = conn['admin'].command('replSetGetStatus')
- for member in replSet.get('members'):
- if member.get('self'):
- if int(member.get('state')) != 1:
- self.log.info('This server is not currently a replicaSet master.')
- return
- for dbname in conn.database_names():
- if dbname in instance['exclude']:
- self.log.info("Skipping %s" % dbname)
- continue
- self.log.info("Checking %s" % dbname)
- db_tags = list(tags)
- db_tags.append('db:%s' % dbname)
- db = conn[dbname]
- if 'batches' in db.collection_names():
- self.log.info('Checking %s for batches stuck in converting/processing state' % dbname)
- batches = db.batches
- for batch in batches.find({ '$or': [ { 'state': 'converting' }, { 'state': 'processing' } ] }):
- if batch['state'] == 'converting' and self.taking_too_long(batch['converting_at']):
- self.create_event(db, batch['name'], batch['state'], self.agentConfig)
- elif batch['state'] == 'processing' and self.taking_too_long(batch['processing_at']):
- self.create_event(db, batch['name'], batch['state'], self.agentConfig)
- def create_event(self, db, batch, state, agentConfig):
- msg_title = "%s is stuck in %s state" % (batch, state)
- msg = "%s in %s has been found stuck in %s state" % (str(batch), str(db.name), state)
- self.event({
- 'timestamp': int(time.time()),
- 'event_type': 'ascent_batch',
- 'msg_title': msg_title,
- 'msg_text': msg,
- 'host': self.hostname
- })
- def check(self, instance):
- conn, tags = self._get_connection(instance)
- self.find_stuck_batches(instance, conn, tags)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement