Advertisement
Guest User

Untitled

a guest
Mar 20th, 2019
111
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.28 KB | None | 0 0
  1. from kapacitor.udf.agent import Agent, Handler
  2. from kapacitor.udf import udf_pb2
  3. import logging
  4. import json
  5.  
  6. logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
  7. logger = logging.getLogger()
  8.  
  9.  
  10. class GeoTestHandler(Handler):
  11. def __init__(self, agent):
  12. """Constructor"""
  13. logger.info('__init__ trigger')
  14. self._agent = agent
  15. self._field = ' '
  16. self._size = 10
  17. self._points = []
  18. self._state = {}
  19.  
  20. def info(self):
  21. """info: Define what your UDF wants and what will it provide in the end"""
  22. logger.info('info trigger')
  23. response = udf_pb2.Response()
  24. response.info.wants = udf_pb2.BATCH
  25. response.info.provides = udf_pb2.STREAM
  26. response.info.options['field'].valueTypes.append(udf_pb2.STRING)
  27. return response
  28.  
  29. def init(self, init_req):
  30. """init: Define what your UDF expects as parameters when parsing the TICKScript"""
  31. logger.info('INIT trigger')
  32. for opt in init_req.options:
  33. if opt.name == 'field':
  34. self._field = opt.values[0].stringValue
  35. success = True
  36. msg = ''
  37. if self._field == '':
  38. success = False
  39. msg = 'must provide field name'
  40. response = udf_pb2.Response()
  41. response.init.success = success
  42. response.init.error = msg.encode()
  43. return response
  44.  
  45. def begin_batch(self, begin_req):
  46. """begin_batch: Do something at the beginning of the batch"""
  47. logger.info('begin_batch trigger')
  48.  
  49. def snapshot(self):
  50. """snapshot: take a snapshot of the current data, if the task stops for some reason """
  51. data = {}
  52. for group, state in self._state.items():
  53. data[group] = state.snapshot()
  54. response = udf_pb2.Response()
  55. response.snapshot.snapshot = json.dumps(data).encode()
  56. return response
  57.  
  58. def point(self, point):
  59. """point: process each point within the batch"""
  60. logger.info('point trigger')
  61. self._points.append(point.fieldsDouble[self._field])
  62. if len(self._points) == self._size:
  63. geo = 1.0
  64. for p in self._points:
  65. geo *= p
  66. response = udf_pb2.Response()
  67. response.point.name = point.name
  68. response.point.time = point.time
  69. response.point.group = point.group
  70. response.point.tags.update(point.tags)
  71. # add the geometric sum to its own field
  72. response.point.fieldsDouble['geo'] = geo
  73. # add some dummy string field into the point as well
  74. response.point.fieldsString['hash'] = 'test'
  75. self._agent.write_response(response)
  76. self._points.pop(0)
  77.  
  78. def end_batch(self, batch_meta):
  79. """end_batch: do something at the end of the batch"""
  80. logger.info('end_batch')
  81.  
  82.  
  83. if __name__ == '__main__':
  84. # Create an agent
  85. agent = Agent()
  86.  
  87. # Create a handler and pass it an agent so it can write points
  88. h = GeoTestHandler(agent)
  89.  
  90. # Set the handler on the agent
  91. agent.handler = h
  92.  
  93. # anything printed to STDERR from a UDF process gets captured into the logs
  94. logger.info("Starting agent for GeoTestHandler")
  95. agent.start()
  96. agent.wait()
  97. logger.info("Agent finished")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement