Guest User

Untitled

a guest
Nov 16th, 2017
28
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 9.91 KB | None | 0 0
  1. import aioamqp
  2. import asyncio
  3. import os
  4. import signal
  5. import threading
  6. from queue import Queue
  7. import time
  8. import shutil
  9. import threading
  10. import sys, traceback
  11. from datetime import *
  12. from deepdiff import DeepDiff
  13. from dateutil.relativedelta import *
  14. import _thread
  15. import json
  16. from config import SingletonDecorator
  17. from time import gmtime, strftime
  18. from ftpfilelister import ftplister
  19. #from ftpretriver import retreive
  20. import time
  21. global start_time
  22. from ftpretriver import ftptester
  23. #from concurrent.futures import ProcessPoolExecutor
  24. #import multiprocessing
  25. #from multiprocessing import Process
  26. #from concurrent.futures import ThreadPoolExecutor
  27. #import _thread
  28. import threading
  29. class routingfile:
  30. def differ(first, second):
  31. second = set(second)
  32. return [item for item in first if item not in second]
  33. def diff(self,list1, list2):
  34. c = set(list1).union(set(list2)) # or c = set(list1) | set(list2)
  35. d = set(list1).intersection(set(list2)) # or d = set(list1) & set(list2)
  36. return list(c - d)
  37. def timestamp(self,list1, list2):
  38. for fname, tsamp in list1.items():
  39. print("filename", fname)
  40. print("Tstamp", tsamp)
  41. if fname in list2:
  42. print("Tstamp of list2", list2[fname])
  43. if tsamp > list2[fname]:
  44. print("Time stamp is greater", fname)
  45. self.finallist.append(fname)
  46. else:
  47. print("file is new", fname)
  48. self.finallist.append(fname)
  49. print("From function",self.finallist)
  50. return self.finallist
  51. @asyncio.coroutine
  52. def rpcserver(self):
  53. transport, protocol = yield from aioamqp.connect('localhost', 5672)
  54. channel = yield from protocol.channel()
  55. yield from channel.queue_declare(queue_name='test123')
  56. print(' [*] Waiting for messages. To exit press CTRL+C')
  57.  
  58. yield from channel.basic_consume(self.callback, queue_name='test123', no_ack=False)
  59.  
  60. @asyncio.coroutine
  61. def callback(self,ch, body, envelope, properties):
  62.  
  63. start_time= time.time()
  64. print(" [x] Received %r" % body)
  65. body=body.decode("utf-8")
  66. body=json.loads(body)
  67.  
  68. print("Config name is:",body['configid'])
  69. gh = stack()
  70. gh.test1(configid=body['configid'])
  71. yield from ch.basic_client_ack(delivery_tag=envelope.delivery_tag)
  72. configid = body['configid']
  73. #print("Total number of files retrived", len(self.finallist))
  74.  
  75. gh=stack()
  76. gh.test1(configid=body['configid'])
  77.  
  78.  
  79.  
  80. class stack:
  81. listindb = {}
  82. finallist = []
  83. username = ''
  84. host = ''
  85. password = ''
  86. count = 0
  87. filefilter = ''
  88. finallister = {}
  89. outdir = ''
  90. tp = {}
  91. lii = {}
  92. def differ(first, second):
  93. second = set(second)
  94. return [item for item in first if item not in second]
  95. def diff(self,list1, list2):
  96. c = set(list1).union(set(list2)) # or c = set(list1) | set(list2)
  97. d = set(list1).intersection(set(list2)) # or d = set(list1) & set(list2)
  98. return list(c - d)
  99. def timestamp(self,list1, list2):
  100. for fname, tsamp in list1.items():
  101. print("filename", fname)
  102. print("Tstamp", tsamp)
  103. if fname in list2:
  104. print("Tstamp of list2", list2[fname])
  105. if tsamp > list2[fname]:
  106. print("Time stamp is greater", fname)
  107. self.finallist.append(fname)
  108. else:
  109. print("file is new", fname)
  110. self.finallist.append(fname)
  111. print("From function",self.finallist)
  112. return self.finallist
  113. def test1(self,configid):
  114.  
  115. valid = SingletonDecorator(
  116. query="SELECT d.id,d.directory,f.ftp_host,f.user_name,f.password,o.path,g.filename_filter FROM gdl_file_config g,ddf_subdirectory d,ddf_directory f,output_directory o where g.config_id='" + str(
  117. configid) + "' and g.directory_id=d.id and d.data_source_id=f.id and o.config_id='" + str(
  118. configid) + "' ;", function="retrive")
  119. f = valid()
  120. if f is None:
  121. print("Query error")
  122.  
  123. else:
  124. print("asdasdasdas", f)
  125. for k in f:
  126. print("current config id is", k[0])
  127. print("current directory", k[1])
  128. print("current Output Dir", k[5])
  129. print("current host", k[2])
  130. print("Username", k[3])
  131. print("Pasword", k[4])
  132. print("FileFilter", k[6])
  133.  
  134. dirid = k[0]
  135. curdir = k[1]
  136. host = k[2]
  137. filefilter = k[6]
  138. outdir = k[5]
  139. username = k[3]
  140. password = k[4]
  141.  
  142. self.tp = ftplister(host=k[2], username=k[3], password=k[4], dir=k[1], filter=filefilter)
  143. self.tp = json.loads(self.tp) # from server
  144. for k, l in self.tp.items():
  145. if (k != 'configid'):
  146. print("Insertion")
  147. print("File and tstamp is", k, l)
  148. # For testing will insert all the files from server into db
  149. # ins=SingletonDecorator(query="INSERT INTO gdl_input_files (filename, directory_id, timestamp, processed, date_added) VALUES ('"+k+"', '"+str(dirid)+"','"+l+"','1','"+strftime("%Y-%m-%d %H:%M:%S", gmtime())+"' )",function="insertion")
  150. print("The files list are", len(self.tp))
  151. # print("Directory is",body['dir'])
  152. print("Getting list of files ****** From Directory id", dirid)
  153. listoffiles = SingletonDecorator(
  154. query="SELECT distinct(filename),timestamp FROM gdl_input_files where directory_id ='" + str(
  155. dirid) + "'", function="retrive")
  156. lis = listoffiles() # From db
  157.  
  158. for m in lis:
  159. # print("Files in DB",m)
  160. self.listindb[m[0]] = m[1]
  161.  
  162. print("TYPWEWWW", len(self.listindb))
  163. listin = json.dumps(self.listindb)
  164. self.lii = json.loads(listin)
  165. # tem=DeepDiff(lii,tp)
  166. dif = self.diff(list1=self.lii, list2=self.tp)
  167. print("asdasdasd")
  168. for fname in dif:
  169. # getting new files time stamp
  170. try:
  171. print("FNAME", fname)
  172. print("timestampis", self.tp[fname])
  173. # adding newly arrived files from server with time stamp
  174. self.finallister[fname] = self.tp[fname]
  175. except:
  176. print("KEy error")
  177.  
  178. # s = set(lii)
  179. # temp3 = [x for x in tp if x not in s]
  180. # print("finaaaaaaa",temp3)
  181. # shared_items = self.diff(list1=tp,list2=lii)
  182. # for m in shared_items:
  183. # kl=tp[m]
  184. # self.finallister[m]=kl
  185.  
  186. print("New fiels from seerver is", self.finallister)
  187. for fname, timestamp in self.finallister.items():
  188. currenttime, k = str(datetime.now()).split('.')
  189. print("currenttime", currenttime)
  190. print("Inserting new files in server into DB")
  191. print("Filename", fname)
  192. print("Times", timestamp)
  193.  
  194. SingletonDecorator(
  195.  
  196. query="INSERT INTO gdl_input_files (filename, directory_id, timestamp, processed, date_added) VALUES ('" + fname + "', '" + str(
  197. dirid) + "','" + l + "','1','" + currenttime + "')", function="insertion")
  198.  
  199. print("List from DB", lis)
  200. print("Current files list from server dir", self.tp)
  201. # print("Difff",shared_items)
  202.  
  203. print(len(self.lii))
  204. print(len(self.tp))
  205. # for k,l in shared_items:
  206. # print("New files are",k)
  207. self.timestamp(self.tp, self.lii)
  208. print("Final list is", self.finallist)
  209. print("Final listcount", len(self.finallist))
  210. # executor = ThreadPoolExecutor(max_workers=10)
  211. # a = (my_function)
  212. jobs = []
  213. jobs.append(host)
  214. jobs.append(username)
  215. jobs.append(password)
  216. jobs.append(curdir)
  217. jobs.append(outdir)
  218. jobs.append(self.finallist)
  219. ftptester(host=host, username=username, password=password, dir=curdir, filelist=self.finallister)
  220. #event_loop.run_in_executor(None, ftptester(host=self.host, username=username, password=password, dir=curdir,filelist=self.finallister))
  221.  
  222. #try:
  223. #event_loop.run_in_executor(None,ftptester(host=host,username=username,password=password,dir=curdir,filelist=self.finallister))
  224. # event_loop.await()
  225. #kl = threading.Thread(target=ftptester, args=[jobs], daemon=False)
  226. #kl.start()
  227. #kl.join()
  228.  
  229.  
  230.  
  231. #_thread.start_new_thread(ftptester(host=host,username=username,password=password,dir=curdir,filelist=self.finallister))
  232. #except:
  233. print("Cant start a thread")
  234. self.finallist = []
  235. self.flister = []
  236. self.listindb = {}
  237. self.finallister = {}
  238. self.tp = {}
  239. self.lii = {}
  240. print("Total number of files retrived", len(self.finallist))
  241. #print("--- %s seconds ---" % (time.time() - start_time))
  242.  
  243. print(' [*] Waiting for messages. To exit press CTRL+C')
  244. def main():
  245. global event_loop
  246. event_loop= asyncio.get_event_loop()
  247. #event_loop.add_signal_handler(signal.SIGINT, my_handler)
  248. obj=routingfile()
  249. event_loop.run_until_complete(obj.rpcserver())
  250.  
  251. event_loop.run_forever()
  252. if __name__ == '__main__':
  253. main()
  254. print("asdasdasd")
Add Comment
Please, Sign In to add comment