Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- import os, time, re, socket, getopt, sys
- from jinja2 import Template
- logfile=re.compile(ur'.*\.log$', re.IGNORECASE)
- def finder(path, brokers):
- paths={}
- here=os.getcwd()
- for dpath, dnames, fnames in os.walk(path):
- for i, fname in enumerate([os.path.join(dpath, fname) for fname in fnames]):
- if logfile.search(fname) and int(time.time()) - int(os.path.getmtime(fname)) <= 86400:
- t=socket.gethostname().split('.')[0]+'/'+fname.strip('./')
- t=t.replace('/', '-').replace('.', '-')
- paths[t]=fname.replace('./', "/")
- return paths, brokers
- def agent(paths, brokers):
- flume_brokers=brokers
- here=os.getcwd()
- readers=1
- i=1
- tails=[]
- r=''
- s=''
- c=''
- t=Template('''## channel configuration
- a1.channels.c{{ reader }}.type = memory
- a1.channels.c{{ reader }}.capacity = 100
- a1.channels.c{{ reader }}.transactionCapacity = 100
- # source
- a1.sources.r{{ reader }}.type = exec
- a1.sources.r{{ reader }}.command = tail -F {{ log }}
- a1.sources.r{{ reader }}.channels = c{{ reader }}
- # sink
- a1.sinks.k{{ reader }}.type = org.apache.flume.sink.kafka.KafkaSink
- a1.sinks.k{{ reader }}.topic = logs-{{ log_sanitized }}
- a1.sinks.k{{ reader }}.brokerList = {{ flume_brokers }}
- a1.sinks.k{{ reader }}.requiredAcks = 1
- a1.sinks.k{{ reader }}.batchSize = 20
- a1.sources.r{{ reader }}.channels = c{{ reader }}
- a1.sinks.k{{ reader }}.channel = c{{ reader }}
- ''')
- for topic, path in paths.iteritems():
- tails.append(t.render(reader=readers, log=path, log_sanitized=topic, flume_brokers=flume_brokers))
- r=r+' r%i' % readers # counting the number of sources to append in file
- s=s+' k%i' % readers # counting the number of sinks
- c=c+' c%i' % readers # counting the number of sinks
- readers=readers+1
- header=Template('''# Name the components on this agent
- a1.sources = {{ sources }}
- a1.sinks = {{ sinks }}
- a1.channels = {{ channels }}
- ''')
- print header.render(sources=r, sinks=s, channels=c)
- for i in tails:
- print i
- def usage():
- print """
- PURPOSE :
- Finds logs and configure flume to tail them and ship them to kafka brokers.
- USAGE :
- logfinder -b brokers -p logpath
- OPTIONS :
- -short, --long <value> description
- -b, --brokers= <brokers> comma separated list of kafka brokers addresses and ports, e.g. : 127.0.0.1:9092,127.0.0.2:9092
- -p, --path= <logs directory> where to find the logs that you which to tail
- -h, --help prints help
- """
- if __name__ == '__main__':
- try:
- opts, args = getopt.getopt(sys.argv[1:], 'b:p:h', ['brokers=', 'path=', 'help'])
- if len(opts) <= 1:
- usage()
- sys.exit(2)
- except getopt.GetoptError:
- usage()
- sys.exit(2)
- for opt, arg in opts:
- if opt in ('-h', '--help'):
- usage()
- sys.exit(2)
- elif opt in ('-b', '--brokers'):
- brokers = arg
- elif opt in ('-p', '--path'):
- path = arg
- else:
- usage()
- sys.exit(2)
- path, brokers=finder(path, brokers)
- agent(path, brokers)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement