daily pastebin goal
96%
SHARE
TWEET

Untitled

a guest Mar 20th, 2019 58 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. from kapacitor.udf.agent import Agent, Handler
  2. from kapacitor.udf import udf_pb2
  3. import logging
  4. import json
  5.  
  6. logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
  7. logger = logging.getLogger()
  8.  
  9.  
  10. class GeoTestHandler(Handler):
  11.     def __init__(self, agent):
  12.         """Constructor"""
  13.         logger.info('__init__ trigger')
  14.         self._agent = agent
  15.         self._field = ' '
  16.         self._size = 10
  17.         self._points = []
  18.         self._state = {}
  19.  
  20.     def info(self):
  21.         """info: Define what your UDF wants and what will it provide in the end"""
  22.         logger.info('info trigger')
  23.         response = udf_pb2.Response()
  24.         response.info.wants = udf_pb2.BATCH
  25.         response.info.provides = udf_pb2.STREAM
  26.         response.info.options['field'].valueTypes.append(udf_pb2.STRING)
  27.         return response
  28.  
  29.     def init(self, init_req):
  30.         """init: Define what your UDF expects as parameters when parsing the TICKScript"""
  31.         logger.info('INIT trigger')
  32.         for opt in init_req.options:
  33.             if opt.name == 'field':
  34.                 self._field = opt.values[0].stringValue
  35.         success = True
  36.         msg = ''
  37.         if self._field == '':
  38.             success = False
  39.             msg = 'must provide field name'
  40.         response = udf_pb2.Response()
  41.         response.init.success = success
  42.         response.init.error = msg.encode()
  43.         return response
  44.  
  45.     def begin_batch(self, begin_req):
  46.         """begin_batch: Do something at the beginning of the batch"""
  47.         logger.info('begin_batch trigger')
  48.  
  49.     def snapshot(self):
  50.         """snapshot: take a snapshot of the current data, if the task stops for some reason """
  51.         data = {}
  52.         for group, state in self._state.items():
  53.             data[group] = state.snapshot()
  54.         response = udf_pb2.Response()
  55.         response.snapshot.snapshot = json.dumps(data).encode()
  56.         return response
  57.  
  58.     def point(self, point):
  59.         """point: process each point within the batch"""
  60.         logger.info('point trigger')
  61.         self._points.append(point.fieldsDouble[self._field])
  62.         if len(self._points) == self._size:
  63.             geo = 1.0
  64.             for p in self._points:
  65.                 geo *= p
  66.             response = udf_pb2.Response()
  67.             response.point.name = point.name
  68.             response.point.time = point.time
  69.             response.point.group = point.group
  70.             response.point.tags.update(point.tags)
  71.             # add the geometric sum to its own field
  72.             response.point.fieldsDouble['geo'] = geo
  73.             # add some dummy string field into the point as well
  74.             response.point.fieldsString['hash'] = 'test'
  75.             self._agent.write_response(response)
  76.             self._points.pop(0)
  77.  
  78.     def end_batch(self, batch_meta):
  79.         """end_batch: do something at the end of the batch"""
  80.         logger.info('end_batch')
  81.  
  82.  
  83. if __name__ == '__main__':
  84.     # Create an agent
  85.     agent = Agent()
  86.  
  87.     # Create a handler and pass it an agent so it can write points
  88.     h = GeoTestHandler(agent)
  89.  
  90.     # Set the handler on the agent
  91.     agent.handler = h
  92.  
  93.     # anything printed to STDERR from a UDF process gets captured into the logs
  94.     logger.info("Starting agent for GeoTestHandler")
  95.     agent.start()
  96.     agent.wait()
  97.     logger.info("Agent finished")
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top