Advertisement
Guest User

Untitled

a guest
Dec 8th, 2016
69
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.15 KB | None | 0 0
  1. #!/usr/bin/env python
  2. import os, time, re, socket, getopt, sys
  3. from jinja2 import Template
  4.  
  5. logfile=re.compile(ur'.*\.log$', re.IGNORECASE)
  6.  
  7. def finder(path, brokers):
  8. paths={}
  9. here=os.getcwd()
  10. for dpath, dnames, fnames in os.walk(path):
  11. for i, fname in enumerate([os.path.join(dpath, fname) for fname in fnames]):
  12. if logfile.search(fname) and int(time.time()) - int(os.path.getmtime(fname)) <= 86400:
  13. t=socket.gethostname().split('.')[0]+'/'+fname.strip('./')
  14. t=t.replace('/', '-').replace('.', '-')
  15. paths[t]=fname.replace('./', "/")
  16. return paths, brokers
  17.  
  18. def agent(paths, brokers):
  19. flume_brokers=brokers
  20. here=os.getcwd()
  21. readers=1
  22. i=1
  23. tails=[]
  24. r=''
  25. s=''
  26. c=''
  27. t=Template('''## channel configuration
  28. a1.channels.c{{ reader }}.type = memory
  29. a1.channels.c{{ reader }}.capacity = 100
  30. a1.channels.c{{ reader }}.transactionCapacity = 100
  31. # source
  32. a1.sources.r{{ reader }}.type = exec
  33. a1.sources.r{{ reader }}.command = tail -F {{ log }}
  34. a1.sources.r{{ reader }}.channels = c{{ reader }}
  35. # sink
  36. a1.sinks.k{{ reader }}.type = org.apache.flume.sink.kafka.KafkaSink
  37. a1.sinks.k{{ reader }}.topic = logs-{{ log_sanitized }}
  38. a1.sinks.k{{ reader }}.brokerList = {{ flume_brokers }}
  39. a1.sinks.k{{ reader }}.requiredAcks = 1
  40. a1.sinks.k{{ reader }}.batchSize = 20
  41. a1.sources.r{{ reader }}.channels = c{{ reader }}
  42. a1.sinks.k{{ reader }}.channel = c{{ reader }}
  43. ''')
  44.  
  45. for topic, path in paths.iteritems():
  46. tails.append(t.render(reader=readers, log=path, log_sanitized=topic, flume_brokers=flume_brokers))
  47. r=r+' r%i' % readers # counting the number of sources to append in file
  48. s=s+' k%i' % readers # counting the number of sinks
  49. c=c+' c%i' % readers # counting the number of sinks
  50. readers=readers+1
  51. header=Template('''# Name the components on this agent
  52. a1.sources = {{ sources }}
  53. a1.sinks = {{ sinks }}
  54. a1.channels = {{ channels }}
  55. ''')
  56. print header.render(sources=r, sinks=s, channels=c)
  57. for i in tails:
  58. print i
  59.  
  60.  
  61.  
  62. def usage():
  63. print """
  64. PURPOSE :
  65. Finds logs and configure flume to tail them and ship them to kafka brokers.
  66.  
  67. USAGE :
  68. logfinder -b brokers -p logpath
  69.  
  70. OPTIONS :
  71. -short, --long <value> description
  72. -b, --brokers= <brokers> comma separated list of kafka brokers addresses and ports, e.g. : 127.0.0.1:9092,127.0.0.2:9092
  73. -p, --path= <logs directory> where to find the logs that you which to tail
  74. -h, --help prints help
  75. """
  76.  
  77. if __name__ == '__main__':
  78.  
  79. try:
  80. opts, args = getopt.getopt(sys.argv[1:], 'b:p:h', ['brokers=', 'path=', 'help'])
  81. if len(opts) <= 1:
  82. usage()
  83. sys.exit(2)
  84. except getopt.GetoptError:
  85. usage()
  86. sys.exit(2)
  87.  
  88. for opt, arg in opts:
  89.  
  90. if opt in ('-h', '--help'):
  91. usage()
  92. sys.exit(2)
  93. elif opt in ('-b', '--brokers'):
  94. brokers = arg
  95. elif opt in ('-p', '--path'):
  96. path = arg
  97. else:
  98. usage()
  99. sys.exit(2)
  100. path, brokers=finder(path, brokers)
  101. agent(path, brokers)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement