Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from kapacitor.udf.agent import Agent, Handler
- from kapacitor.udf import udf_pb2
- import logging
- import json
- logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
- logger = logging.getLogger()
- class GeoTestHandler(Handler):
- def __init__(self, agent):
- """Constructor"""
- logger.info('__init__ trigger')
- self._agent = agent
- self._field = ' '
- self._size = 10
- self._points = []
- self._state = {}
- def info(self):
- """info: Define what your UDF wants and what will it provide in the end"""
- logger.info('info trigger')
- response = udf_pb2.Response()
- response.info.wants = udf_pb2.BATCH
- response.info.provides = udf_pb2.STREAM
- response.info.options['field'].valueTypes.append(udf_pb2.STRING)
- return response
- def init(self, init_req):
- """init: Define what your UDF expects as parameters when parsing the TICKScript"""
- logger.info('INIT trigger')
- for opt in init_req.options:
- if opt.name == 'field':
- self._field = opt.values[0].stringValue
- success = True
- msg = ''
- if self._field == '':
- success = False
- msg = 'must provide field name'
- response = udf_pb2.Response()
- response.init.success = success
- response.init.error = msg.encode()
- return response
- def begin_batch(self, begin_req):
- """begin_batch: Do something at the beginning of the batch"""
- logger.info('begin_batch trigger')
- def snapshot(self):
- """snapshot: take a snapshot of the current data, if the task stops for some reason """
- data = {}
- for group, state in self._state.items():
- data[group] = state.snapshot()
- response = udf_pb2.Response()
- response.snapshot.snapshot = json.dumps(data).encode()
- return response
- def point(self, point):
- """point: process each point within the batch"""
- logger.info('point trigger')
- self._points.append(point.fieldsDouble[self._field])
- if len(self._points) == self._size:
- geo = 1.0
- for p in self._points:
- geo *= p
- response = udf_pb2.Response()
- response.point.name = point.name
- response.point.time = point.time
- response.point.group = point.group
- response.point.tags.update(point.tags)
- # add the geometric sum to its own field
- response.point.fieldsDouble['geo'] = geo
- # add some dummy string field into the point as well
- response.point.fieldsString['hash'] = 'test'
- self._agent.write_response(response)
- self._points.pop(0)
- def end_batch(self, batch_meta):
- """end_batch: do something at the end of the batch"""
- logger.info('end_batch')
- if __name__ == '__main__':
- # Create an agent
- agent = Agent()
- # Create a handler and pass it an agent so it can write points
- h = GeoTestHandler(agent)
- # Set the handler on the agent
- agent.handler = h
- # anything printed to STDERR from a UDF process gets captured into the logs
- logger.info("Starting agent for GeoTestHandler")
- agent.start()
- agent.wait()
- logger.info("Agent finished")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement