Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import aioamqp
- import asyncio
- from threading import Thread
- from datetime import *
- import concurrent.futures
- import json
- from config import SingletonDecorator
- from syscom.core import load_syscom_ascii
- from syscom.vibr import calculate_ppv
- from syscom.instafile import load_instantel_ascii
- from ftpprocess import ftpprocesser
- import time
- import warnings
- import queue
- import logging
- from multiprocessing.pool import ThreadPool
- global start_time
- thred={}
- import threading
- class routingfile:
- def differ(first, second):
- second = set(second)
- return [item for item in first if item not in second]
- def diff(self,list1, list2):
- c = set(list1).union(set(list2)) # or c = set(list1) | set(list2)
- d = set(list1).intersection(set(list2)) # or d = set(list1) & set(list2)
- return list(c - d)
- def timestamp(self,list1, list2):
- for fname, tsamp in list1.items():
- print("filename", fname)
- print("Tstamp", tsamp)
- if fname in list2:
- print("Tstamp of list2", list2[fname])
- if tsamp > list2[fname]:
- print("Time stamp is greater", fname)
- self.finallist.append(fname)
- else:
- print("file is new", fname)
- self.finallist.append(fname)
- print("From function",self.finallist)
- return self.finallist
- @asyncio.coroutine
- def rpcserver(self):
- time_waited = 0
- to_wait = 1.5
- while True:
- try:
- transport, protocol = yield from aioamqp.connect(host ='localhost',
- heartbeat =500,
- port='5672')
- break
- except (OSError, aioamqp.AmqpClosedConnection) as e:
- to_wait = round(min(1, (to_wait ** 1.0)), 2)
- print("Failed to connect to RabbitMQ: %s. Waiting %s seconds to see if RabbitMQ at %s comes online...")
- yield from asyncio.sleep(to_wait)
- time_waited += to_wait
- if time_waited:
- print("Waited total of %s seconds for RabbitMQ to come up", time_waited)
- channel = yield from protocol.channel()
- yield from channel.queue_declare(queue_name='test123')
- print(' [*][*][*] Waiting for messages. To exit press CTRL+C')
- yield from channel.basic_qos(prefetch_count=1, prefetch_size=0, connection_global=False)
- yield from channel.basic_consume(self.callback, queue_name='test123', no_ack=False)
- @asyncio.coroutine
- def callback(self,ch, body, envelope, properties):
- try:
- print(" [x] Received %r" % body)
- body=body.decode("utf-8")
- body=json.loads(body)
- print("Config name is:",body['configid'])
- gh = routeclass()
- configid = body['configid']
- yield from ch.basic_client_ack(delivery_tag=envelope.delivery_tag)
- #Starting procesing with a thread
- locals()[configid]=threading.Thread(target=gh.executionstarter, args=[configid], daemon=False)
- locals()[configid].start()
- #locals()[configid].join()
- configid = body['configid']
- except:
- print("Connetion")
- class ThreadWithReturnValue(Thread):
- def __init__(self, group=None, target=None, name=None,
- args=(), kwargs={}, Verbose=None):
- Thread.__init__(self, group, target, name, args, kwargs, Verbose)
- self._return = None
- def run(self):
- if self._Thread__target is not None:
- self._return = self._Thread__target(*self._Thread__args,
- **self._Thread__kwargs)
- def join(self):
- Thread.join(self)
- return self._return
- class routeclass:
- listindb = {}#Used to store lsit from database
- finallist = []#final list of files
- username = ''
- host = ''
- password = ''
- count = 0
- filefilter = ''
- finallister = {}
- outdir = ''
- tp = {}
- lii = {}
- cols=[]
- finalcsv=[]
- start_time = time.time()
- def __init__(self):
- print("inittttt")
- self.listindb = {}
- self.finallist = []
- self.username = ''
- self.host = ''
- self.password = ''
- #count = 0
- self.filefilter = ''
- self.finallister = {}
- self.outdir = ''
- self.tp = {}
- self.lii = {}
- self.cols=[]
- self.finalcsv=[]
- self.start_time = time.time()
- def diff(self,list1, list2):
- c = set(list1).union(set(list2)) # or c = set(list1) | set(list2)
- d = set(list1).intersection(set(list2)) # or d = set(list1) & set(list2)
- return list(c - d)
- def timestamp(self,list1, list2):
- #This will compare two list and creates list of file names which have larger timestamp
- for fname, tsamp in list1.items():
- print("filename", fname)
- print("Tstamp", tsamp)
- if fname in list2:
- print("Tstamp of list2", list2[fname])
- if tsamp > list2[fname]:
- print("Time stamp is greater", fname)
- self.finallist.append(fname)
- else:
- print("file is new", fname)
- self.finallist.append(fname)
- print("From function",self.finallist)
- return self.finallist
- def calculate(self, list1):
- for fl in list1:
- try:
- head, dfg, df1 =asyncio.ensure_future(load_syscom_ascii(fleName=fl, columns=self.cols, finalcsv=self.finalcsv,
- out=self.outdir))
- print("DATA Frame is", df1.duplicated())
- warnings.filterwarnings("ignore")
- calculate_ppv(fleName=fl, columns=self.cols, separator=',', fileformat='syscom',
- timeWindowSec=10,
- df=df1, head=head, out=self.outdir)
- SingletonDecorator(
- "UPDATE gdl_input_files SET processed ='3' , errordata='0' WHERE filename='" + fl + "'",
- function="insertion")
- except:
- print("asdasd")
- SingletonDecorator(
- "UPDATE gdl_input_files SET processed ='4' , errordata='3' WHERE filename='" + fl + "'",
- function="insertion")
- return
- def executionstarter(self,configid):
- #Main Execution begins from here
- valid = SingletonDecorator(
- query="SELECT d.id,d.directory,f.ftp_host,f.user_name,f.password,o.path,g.filename_filter,g.preprocessor,g.userfields FROM gdl_file_config g,ddf_subdirectory d,ddf_directory f,output_directory o where g.config_id='" + str(
- configid) + "' and g.directory_id=d.id and d.data_source_id=f.id and o.config_id='" + str(
- configid) + "' ;", function="retrive")
- f = valid()
- if f is None:
- print("Query error")
- else:
- print("Working On Query", f)
- for k in f:
- print("current config id is", k[0])
- print("current directory", k[1])
- print("current Output Dir", k[5])
- print("current host", k[2])
- print("Username", k[3])
- print("Pasword", k[4])
- print("FileFilter", k[6])
- dirid = k[0]
- self.curdir = k[1]
- self.host = k[2]
- self.filefilter = k[6]
- self.outdir = k[5]
- self.username = k[3]
- self.password = k[4]
- preprocesser=k[7]
- self.finalcsv=k[8]
- print("Preprocesser Type",preprocesser)
- #getting file type
- def vibrwithoutcols():
- print("No cols")
- ftp1 = ftpprocesser(host=self.host, username=self.username, password=self.password,
- directory=self.curdir, outputdir=self.outdir, filter=self.filefilter)
- tp1 = ftp1.ftplister()
- print(tp1)
- self.tp = json.loads(tp1) # from server
- for k, l in self.tp.items():
- if (k != 'configid'):
- print("Insertion")
- print("File and tstamp is", k, l)
- # For testing will insert all the files from server into db
- # ins=SingletonDecorator(query="INSERT INTO gdl_input_files (filename, directory_id, timestamp, processed, date_added) VALUES ('"+k+"', '"+str(dirid)+"','"+l+"','1','"+strftime("%Y-%m-%d %H:%M:%S", gmtime())+"' )",function="insertion")
- print("The files from server number", len(self.tp))
- # print("Directory is",body['dir'])
- print("Getting list of files ****** from DataBase For directory id", dirid)
- listoffiles = SingletonDecorator(
- query="SELECT distinct(filename),timestamp FROM gdl_input_files where processed=3 and directory_id ='" + str(
- dirid) + "'", function="retrive")
- lis = listoffiles() # Getting list from database
- for m in lis:
- print("Files in DB", m)
- self.listindb[m[0]] = m[1] # Arranging files with timestamp on db
- # Loading the list as json for easy comparison
- listin = json.dumps(self.listindb)
- self.lii = json.loads(listin)
- dif = self.diff(list1=self.tp,
- list2=self.lii) # Finding new files available on server only which will be not available in datbase
- print("asdasdasd")
- for fname in dif:
- # getting new files time stamp
- try:
- print("FNAME", fname)
- print("timestampis", self.tp[fname])
- # adding newly arrived files from server with time stamp
- self.finallister[fname] = self.tp[fname]
- except:
- print("KEy error")
- print("New files from seerver is", self.finallister)
- for fname, timestamp in self.finallister.items():
- currenttime, k = str(datetime.now()).split('.')
- print("currenttime", currenttime)
- print("Inserting new files in server into DB")
- print("Filename", fname)
- print("Times", timestamp)
- SingletonDecorator(
- # Inserting new files received from server into db
- query="INSERT INTO gdl_input_files (filename, directory_id, timestamp, processed, date_added) VALUES ('" + fname + "', '" + str(
- dirid) + "','" + l + "','3','" + currenttime + "')", function="insertion")
- print("List from DB", lis)
- print("Current files list from server dir", self.tp)
- print(len(self.lii))
- print(len(self.tp))
- self.finallist = self.timestamp(list1=self.tp, list2=self.lii)
- for fname in self.finallist:
- print("New insertion")
- ol = self.tp[fname]
- # Updating existing files in fname with new timestamp
- SingletonDecorator(
- "UPDATE gdl_input_files SET processed ='1',timestamp='" + ol + "' WHERE filename='" + fname + "'",
- function="insertion")
- print("New Update Query")
- print("From server is", self.tp)
- print("From DB is", self.lii)
- print("From server is", len(self.tp))
- print("From DB is", len(self.lii))
- print("Final list is", self.finallist)
- print("Final listcount", len(self.finallist))
- time.sleep(0.01)
- op = threading.Thread(target=ftp1.ftptester, args=[self.finallist], daemon=False)
- op.start()
- op.join()
- warnings.filterwarnings("ignore")
- for fl in self.finallist:
- try:
- pool = ThreadPool(processes=len(self.finallist))
- async_result = pool.apply_async(target= load_instantel_ascii(fleName=fl, columns=None,
- out=self.outdir))
- head, dfg, df1=async_result.get()
- '''
- head, dfg, df1 = load_instantel_ascii(fleName=fl, columns=None,
- out=self.outdir)
- '''
- print("DATA Frame is", df1.duplicated())
- '''
- calculate_ppv(fleName=fl, columns=None, separator=',', fileformat='syscom',
- timeWindowSec=10,
- df=df1, head=head, out=self.outdir)
- '''
- ml12 = threading.Thread(target=calculate_ppv(fleName=fl, columns=None, separator=',', fileformat='syscom',
- timeWindowSec=10,
- df=df1, head=head, out=self.outdir))
- ml12.start()
- ml12.join()
- SingletonDecorator(
- "UPDATE gdl_input_files SET processed ='3' , errordata='0' WHERE filename='" + fl + "'",
- function="insertion")
- except:
- print("Error on Graph Generation")
- '''
- SingletonDecorator(
- "UPDATE gdl_input_files SET processed ='4' , errordata='3' WHERE filename='" + fl + "'",
- function="insertion")
- '''
- return
- def vibr():
- print("VIBR FIle")
- try:
- t,fileformat=temp1[0].split('=')
- t, timewindow = temp1[1].split('=')
- try:
- t, separator= temp1[2].split('=')
- except:
- separator=''
- tcols,tcols1=temp1[3].split('=')
- self.cols=tcols1.split('|')
- print("Colums",self.cols)
- print("Fileformat",fileformat)
- print("separ",separator)
- print("CSV info", self.finalcsv)
- temp2, temp3 = self.finalcsv.split("=")
- self.finalcsv = temp3.split("|")
- print("FInal csv", self.finalcsv)
- except:
- print("Not in proper VIBR")
- ftp1 = ftpprocesser(host=self.host, username=self.username, password=self.password,
- directory=self.curdir, outputdir=self.outdir, filter=self.filefilter)
- tp1 = ftp1.ftplister()
- print(tp1)
- self.tp = json.loads(tp1) # from server
- for k, l in self.tp.items():
- if (k != 'configid'):
- print("Insertion")
- print("File and tstamp is", k, l)
- # For testing will insert all the files from server into db
- # ins=SingletonDecorator(query="INSERT INTO gdl_input_files (filename, directory_id, timestamp, processed, date_added) VALUES ('"+k+"', '"+str(dirid)+"','"+l+"','1','"+strftime("%Y-%m-%d %H:%M:%S", gmtime())+"' )",function="insertion")
- print("The files from server number", len(self.tp))
- # print("Directory is",body['dir'])
- print("Getting list of files ****** from DataBase For directory id", dirid)
- listoffiles = SingletonDecorator(
- query="SELECT distinct(filename),timestamp FROM gdl_input_files where processed=3 and directory_id ='" + str(
- dirid) + "'", function="retrive")
- lis = listoffiles() # Getting list from database
- for m in lis:
- print("Files in DB", m)
- self.listindb[m[0]] = m[1] # Arranging files with timestamp on db
- # Loading the list as json for easy comparison
- listin = json.dumps(self.listindb)
- self.lii = json.loads(listin)
- dif = self.diff(list1=self.tp,
- list2=self.lii) # Finding new files available on server only which will be not available in datbase
- print("asdasdasd")
- for fname in dif:
- # getting new files time stamp
- try:
- print("FNAME", fname)
- print("timestampis", self.tp[fname])
- # adding newly arrived files from server with time stamp
- self.finallister[fname] = self.tp[fname]
- except:
- print("KEy error")
- print("New files from seerver is", self.finallister)
- for fname, timestamp in self.finallister.items():
- currenttime, k = str(datetime.now()).split('.')
- print("currenttime", currenttime)
- print("Inserting new files in server into DB")
- print("Filename", fname)
- print("Times", timestamp)
- SingletonDecorator(
- # Inserting new files received from server into db
- query="INSERT INTO gdl_input_files (filename, directory_id, timestamp, processed, date_added) VALUES ('" + fname + "', '" + str(
- dirid) + "','" + l + "','3','" + currenttime + "')", function="insertion")
- print("List from DB", lis)
- print("Current files list from server dir", self.tp)
- print(len(self.lii))
- print(len(self.tp))
- self.finallist = self.timestamp(list1=self.tp, list2=self.lii)
- for fname in self.finallist:
- print("New insertion")
- ol = self.tp[fname]
- # Updating existing files in fname with new timestamp
- SingletonDecorator(
- "UPDATE gdl_input_files SET processed ='1',timestamp='" + ol + "' WHERE filename='" + fname + "'",
- function="insertion")
- print("New Update Query")
- print("From server is", self.tp)
- print("From DB is", self.lii)
- print("From server is", len(self.tp))
- print("From DB is", len(self.lii))
- print("Final list is", self.finallist)
- print("Final listcount", len(self.finallist))
- th = threading.Thread(target=ftp1.ftptester, args=[self.finallist], daemon=True)
- th.start()
- th.join()
- warnings.filterwarnings("ignore")
- #try:
- # ml1 = threading.Thread(target=self.calculate, args=[self.finallist], daemon=False)
- # ml1.start()
- # ml1.join()
- # except:
- # print("Error on Graph Generation")
- i=1
- threads = []
- for fl in self.finallist:
- try:
- i += 1
- '''
- head, dfg, df1 = load_syscom_ascii(fleName=fl, columns=self.cols, finalcsv=self.finalcsv,
- out=self.outdir)
- '''
- # do some other stuff in the main process
- locals()[i] = threading.Thread(target=load_syscom_ascii,
- kwargs={'fleName': fl, 'columns': self.cols,
- 'finalcsv': self.finalcsv, 'out': self.outdir,
- 'separator': ',', 'fileformat': 'syscom',
- 'timeWindowSec': '10'})
- locals()[i].start()
- locals()[i].join()
- # print("DATA Frame is", .duplicated())
- '''
- calculate_ppv(fleName=fl, columns=self.cols, separator=',', fileformat='syscom',
- timeWindowSec=10,
- df=df1, head=head, out=self.outdir)
- m1=threading.Thread(target=calculate_ppv(fleName=fl, columns=self.cols, separator=',', fileformat='syscom',
- timeWindowSec=10,
- df=df1, head=head, out=self.outdir),deamon=False)
- ml.start()
- threads.append(m1)
- #ml.join()
- '''
- except:
- print("Error")
- for thread in threads:
- thread.join()
- filetypes = {'vibr': vibr,'vibrnocols':vibrwithoutcols}
- try:
- ftype, t = preprocesser.split(":")
- temp1 = t.split(";")
- # Function for VIBR file
- if ftype == 'VIBR':
- filetypes['vibr']()
- except:
- print("No colsss")
- filetypes['vibrnocols']()
- print("--- %s seconds ---" % (time.time() - self.start_time))
- print("Total number of files retrived", len(self.finallist))
- print(' [*] Waiting for messages. To exit press CTRL+C')
- def main():
- global event_loop
- try:
- event_loop= asyncio.get_event_loop()
- obj=routingfile()
- event_loop.run_until_complete(obj.rpcserver())
- event_loop.run_forever()
- except:
- print("event Lopp errro")
- if __name__ == '__main__':
- #If you execute rpcserver as main module below line will be excuted first
- main()
Add Comment
Please, Sign In to add comment