Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import aioamqp
- import asyncio
- import os
- import signal
- import threading
- from queue import Queue
- import time
- import shutil
- import threading
- import sys, traceback
- from datetime import *
- from deepdiff import DeepDiff
- from dateutil.relativedelta import *
- import _thread
- import json
- from config import SingletonDecorator
- from time import gmtime, strftime
- from ftpfilelister import ftplister
- #from ftpretriver import retreive
- import time
- global start_time
- from ftpretriver import ftptester
- #from concurrent.futures import ProcessPoolExecutor
- #import multiprocessing
- #from multiprocessing import Process
- #from concurrent.futures import ThreadPoolExecutor
- #import _thread
- import threading
- class routingfile:
- def differ(first, second):
- second = set(second)
- return [item for item in first if item not in second]
- def diff(self,list1, list2):
- c = set(list1).union(set(list2)) # or c = set(list1) | set(list2)
- d = set(list1).intersection(set(list2)) # or d = set(list1) & set(list2)
- return list(c - d)
- def timestamp(self,list1, list2):
- for fname, tsamp in list1.items():
- print("filename", fname)
- print("Tstamp", tsamp)
- if fname in list2:
- print("Tstamp of list2", list2[fname])
- if tsamp > list2[fname]:
- print("Time stamp is greater", fname)
- self.finallist.append(fname)
- else:
- print("file is new", fname)
- self.finallist.append(fname)
- print("From function",self.finallist)
- return self.finallist
- @asyncio.coroutine
- def rpcserver(self):
- transport, protocol = yield from aioamqp.connect('localhost', 5672)
- channel = yield from protocol.channel()
- yield from channel.queue_declare(queue_name='test123')
- print(' [*] Waiting for messages. To exit press CTRL+C')
- yield from channel.basic_consume(self.callback, queue_name='test123', no_ack=False)
- @asyncio.coroutine
- def callback(self,ch, body, envelope, properties):
- start_time= time.time()
- print(" [x] Received %r" % body)
- body=body.decode("utf-8")
- body=json.loads(body)
- print("Config name is:",body['configid'])
- gh = stack()
- gh.test1(configid=body['configid'])
- yield from ch.basic_client_ack(delivery_tag=envelope.delivery_tag)
- configid = body['configid']
- #print("Total number of files retrived", len(self.finallist))
- gh=stack()
- gh.test1(configid=body['configid'])
- class stack:
- listindb = {}
- finallist = []
- username = ''
- host = ''
- password = ''
- count = 0
- filefilter = ''
- finallister = {}
- outdir = ''
- tp = {}
- lii = {}
- def differ(first, second):
- second = set(second)
- return [item for item in first if item not in second]
- def diff(self,list1, list2):
- c = set(list1).union(set(list2)) # or c = set(list1) | set(list2)
- d = set(list1).intersection(set(list2)) # or d = set(list1) & set(list2)
- return list(c - d)
- def timestamp(self,list1, list2):
- for fname, tsamp in list1.items():
- print("filename", fname)
- print("Tstamp", tsamp)
- if fname in list2:
- print("Tstamp of list2", list2[fname])
- if tsamp > list2[fname]:
- print("Time stamp is greater", fname)
- self.finallist.append(fname)
- else:
- print("file is new", fname)
- self.finallist.append(fname)
- print("From function",self.finallist)
- return self.finallist
- def test1(self,configid):
- valid = SingletonDecorator(
- query="SELECT d.id,d.directory,f.ftp_host,f.user_name,f.password,o.path,g.filename_filter FROM gdl_file_config g,ddf_subdirectory d,ddf_directory f,output_directory o where g.config_id='" + str(
- configid) + "' and g.directory_id=d.id and d.data_source_id=f.id and o.config_id='" + str(
- configid) + "' ;", function="retrive")
- f = valid()
- if f is None:
- print("Query error")
- else:
- print("asdasdasdas", f)
- for k in f:
- print("current config id is", k[0])
- print("current directory", k[1])
- print("current Output Dir", k[5])
- print("current host", k[2])
- print("Username", k[3])
- print("Pasword", k[4])
- print("FileFilter", k[6])
- dirid = k[0]
- curdir = k[1]
- host = k[2]
- filefilter = k[6]
- outdir = k[5]
- username = k[3]
- password = k[4]
- self.tp = ftplister(host=k[2], username=k[3], password=k[4], dir=k[1], filter=filefilter)
- self.tp = json.loads(self.tp) # from server
- for k, l in self.tp.items():
- if (k != 'configid'):
- print("Insertion")
- print("File and tstamp is", k, l)
- # For testing will insert all the files from server into db
- # ins=SingletonDecorator(query="INSERT INTO gdl_input_files (filename, directory_id, timestamp, processed, date_added) VALUES ('"+k+"', '"+str(dirid)+"','"+l+"','1','"+strftime("%Y-%m-%d %H:%M:%S", gmtime())+"' )",function="insertion")
- print("The files list are", len(self.tp))
- # print("Directory is",body['dir'])
- print("Getting list of files ****** From Directory id", dirid)
- listoffiles = SingletonDecorator(
- query="SELECT distinct(filename),timestamp FROM gdl_input_files where directory_id ='" + str(
- dirid) + "'", function="retrive")
- lis = listoffiles() # From db
- for m in lis:
- # print("Files in DB",m)
- self.listindb[m[0]] = m[1]
- print("TYPWEWWW", len(self.listindb))
- listin = json.dumps(self.listindb)
- self.lii = json.loads(listin)
- # tem=DeepDiff(lii,tp)
- dif = self.diff(list1=self.lii, list2=self.tp)
- print("asdasdasd")
- for fname in dif:
- # getting new files time stamp
- try:
- print("FNAME", fname)
- print("timestampis", self.tp[fname])
- # adding newly arrived files from server with time stamp
- self.finallister[fname] = self.tp[fname]
- except:
- print("KEy error")
- # s = set(lii)
- # temp3 = [x for x in tp if x not in s]
- # print("finaaaaaaa",temp3)
- # shared_items = self.diff(list1=tp,list2=lii)
- # for m in shared_items:
- # kl=tp[m]
- # self.finallister[m]=kl
- print("New fiels from seerver is", self.finallister)
- for fname, timestamp in self.finallister.items():
- currenttime, k = str(datetime.now()).split('.')
- print("currenttime", currenttime)
- print("Inserting new files in server into DB")
- print("Filename", fname)
- print("Times", timestamp)
- SingletonDecorator(
- query="INSERT INTO gdl_input_files (filename, directory_id, timestamp, processed, date_added) VALUES ('" + fname + "', '" + str(
- dirid) + "','" + l + "','1','" + currenttime + "')", function="insertion")
- print("List from DB", lis)
- print("Current files list from server dir", self.tp)
- # print("Difff",shared_items)
- print(len(self.lii))
- print(len(self.tp))
- # for k,l in shared_items:
- # print("New files are",k)
- self.timestamp(self.tp, self.lii)
- print("Final list is", self.finallist)
- print("Final listcount", len(self.finallist))
- # executor = ThreadPoolExecutor(max_workers=10)
- # a = (my_function)
- jobs = []
- jobs.append(host)
- jobs.append(username)
- jobs.append(password)
- jobs.append(curdir)
- jobs.append(outdir)
- jobs.append(self.finallist)
- ftptester(host=host, username=username, password=password, dir=curdir, filelist=self.finallister)
- #event_loop.run_in_executor(None, ftptester(host=self.host, username=username, password=password, dir=curdir,filelist=self.finallister))
- #try:
- #event_loop.run_in_executor(None,ftptester(host=host,username=username,password=password,dir=curdir,filelist=self.finallister))
- # event_loop.await()
- #kl = threading.Thread(target=ftptester, args=[jobs], daemon=False)
- #kl.start()
- #kl.join()
- #_thread.start_new_thread(ftptester(host=host,username=username,password=password,dir=curdir,filelist=self.finallister))
- #except:
- print("Cant start a thread")
- self.finallist = []
- self.flister = []
- self.listindb = {}
- self.finallister = {}
- self.tp = {}
- self.lii = {}
- print("Total number of files retrived", len(self.finallist))
- #print("--- %s seconds ---" % (time.time() - start_time))
- print(' [*] Waiting for messages. To exit press CTRL+C')
- def main():
- global event_loop
- event_loop= asyncio.get_event_loop()
- #event_loop.add_signal_handler(signal.SIGINT, my_handler)
- obj=routingfile()
- event_loop.run_until_complete(obj.rpcserver())
- event_loop.run_forever()
- if __name__ == '__main__':
- main()
- print("asdasdasd")
Add Comment
Please, Sign In to add comment