Guest User

Untitled

a guest
Nov 30th, 2017
45
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 22.63 KB | None | 0 0
  1. import aioamqp
  2. import asyncio
  3. from threading import Thread
  4. from datetime import *
  5. import concurrent.futures
  6. import json
  7. from config import SingletonDecorator
  8.  
  9. from syscom.core import load_syscom_ascii
  10. from syscom.vibr import calculate_ppv
  11. from syscom.instafile import load_instantel_ascii
  12. from ftpprocess import ftpprocesser
  13. import time
  14. import warnings
  15. import queue
  16. import logging
  17. from multiprocessing.pool import ThreadPool
  18. global start_time
  19.  
  20. thred={}
  21. import threading
  22.  
  23. class routingfile:
  24. def differ(first, second):
  25. second = set(second)
  26. return [item for item in first if item not in second]
  27. def diff(self,list1, list2):
  28. c = set(list1).union(set(list2)) # or c = set(list1) | set(list2)
  29. d = set(list1).intersection(set(list2)) # or d = set(list1) & set(list2)
  30. return list(c - d)
  31. def timestamp(self,list1, list2):
  32. for fname, tsamp in list1.items():
  33. print("filename", fname)
  34. print("Tstamp", tsamp)
  35. if fname in list2:
  36. print("Tstamp of list2", list2[fname])
  37. if tsamp > list2[fname]:
  38. print("Time stamp is greater", fname)
  39. self.finallist.append(fname)
  40. else:
  41. print("file is new", fname)
  42. self.finallist.append(fname)
  43. print("From function",self.finallist)
  44. return self.finallist
  45. @asyncio.coroutine
  46. def rpcserver(self):
  47. time_waited = 0
  48. to_wait = 1.5
  49. while True:
  50. try:
  51.  
  52. transport, protocol = yield from aioamqp.connect(host ='localhost',
  53. heartbeat =500,
  54. port='5672')
  55. break
  56. except (OSError, aioamqp.AmqpClosedConnection) as e:
  57. to_wait = round(min(1, (to_wait ** 1.0)), 2)
  58. print("Failed to connect to RabbitMQ: %s. Waiting %s seconds to see if RabbitMQ at %s comes online...")
  59.  
  60. yield from asyncio.sleep(to_wait)
  61. time_waited += to_wait
  62.  
  63. if time_waited:
  64. print("Waited total of %s seconds for RabbitMQ to come up", time_waited)
  65.  
  66.  
  67. channel = yield from protocol.channel()
  68. yield from channel.queue_declare(queue_name='test123')
  69. print(' [*][*][*] Waiting for messages. To exit press CTRL+C')
  70. yield from channel.basic_qos(prefetch_count=1, prefetch_size=0, connection_global=False)
  71.  
  72. yield from channel.basic_consume(self.callback, queue_name='test123', no_ack=False)
  73.  
  74.  
  75. @asyncio.coroutine
  76. def callback(self,ch, body, envelope, properties):
  77.  
  78.  
  79. try:
  80. print(" [x] Received %r" % body)
  81. body=body.decode("utf-8")
  82. body=json.loads(body)
  83. print("Config name is:",body['configid'])
  84.  
  85. gh = routeclass()
  86. configid = body['configid']
  87.  
  88. yield from ch.basic_client_ack(delivery_tag=envelope.delivery_tag)
  89. #Starting procesing with a thread
  90.  
  91. locals()[configid]=threading.Thread(target=gh.executionstarter, args=[configid], daemon=False)
  92. locals()[configid].start()
  93.  
  94. #locals()[configid].join()
  95. configid = body['configid']
  96. except:
  97. print("Connetion")
  98.  
  99.  
  100.  
  101. class ThreadWithReturnValue(Thread):
  102. def __init__(self, group=None, target=None, name=None,
  103. args=(), kwargs={}, Verbose=None):
  104. Thread.__init__(self, group, target, name, args, kwargs, Verbose)
  105. self._return = None
  106. def run(self):
  107. if self._Thread__target is not None:
  108. self._return = self._Thread__target(*self._Thread__args,
  109. **self._Thread__kwargs)
  110. def join(self):
  111. Thread.join(self)
  112. return self._return
  113.  
  114. class routeclass:
  115. listindb = {}#Used to store lsit from database
  116. finallist = []#final list of files
  117. username = ''
  118. host = ''
  119. password = ''
  120. count = 0
  121. filefilter = ''
  122. finallister = {}
  123. outdir = ''
  124. tp = {}
  125. lii = {}
  126. cols=[]
  127. finalcsv=[]
  128. start_time = time.time()
  129. def __init__(self):
  130. print("inittttt")
  131. self.listindb = {}
  132. self.finallist = []
  133. self.username = ''
  134. self.host = ''
  135. self.password = ''
  136. #count = 0
  137. self.filefilter = ''
  138. self.finallister = {}
  139. self.outdir = ''
  140. self.tp = {}
  141. self.lii = {}
  142. self.cols=[]
  143. self.finalcsv=[]
  144. self.start_time = time.time()
  145.  
  146. def diff(self,list1, list2):
  147. c = set(list1).union(set(list2)) # or c = set(list1) | set(list2)
  148. d = set(list1).intersection(set(list2)) # or d = set(list1) & set(list2)
  149. return list(c - d)
  150. def timestamp(self,list1, list2):
  151. #This will compare two list and creates list of file names which have larger timestamp
  152. for fname, tsamp in list1.items():
  153. print("filename", fname)
  154. print("Tstamp", tsamp)
  155. if fname in list2:
  156. print("Tstamp of list2", list2[fname])
  157. if tsamp > list2[fname]:
  158. print("Time stamp is greater", fname)
  159. self.finallist.append(fname)
  160. else:
  161. print("file is new", fname)
  162. self.finallist.append(fname)
  163. print("From function",self.finallist)
  164. return self.finallist
  165.  
  166. def calculate(self, list1):
  167. for fl in list1:
  168.  
  169.  
  170.  
  171.  
  172. try:
  173. head, dfg, df1 =asyncio.ensure_future(load_syscom_ascii(fleName=fl, columns=self.cols, finalcsv=self.finalcsv,
  174. out=self.outdir))
  175.  
  176. print("DATA Frame is", df1.duplicated())
  177. warnings.filterwarnings("ignore")
  178. calculate_ppv(fleName=fl, columns=self.cols, separator=',', fileformat='syscom',
  179. timeWindowSec=10,
  180. df=df1, head=head, out=self.outdir)
  181. SingletonDecorator(
  182. "UPDATE gdl_input_files SET processed ='3' , errordata='0' WHERE filename='" + fl + "'",
  183. function="insertion")
  184. except:
  185. print("asdasd")
  186. SingletonDecorator(
  187. "UPDATE gdl_input_files SET processed ='4' , errordata='3' WHERE filename='" + fl + "'",
  188. function="insertion")
  189. return
  190. def executionstarter(self,configid):
  191. #Main Execution begins from here
  192.  
  193. valid = SingletonDecorator(
  194. query="SELECT d.id,d.directory,f.ftp_host,f.user_name,f.password,o.path,g.filename_filter,g.preprocessor,g.userfields FROM gdl_file_config g,ddf_subdirectory d,ddf_directory f,output_directory o where g.config_id='" + str(
  195. configid) + "' and g.directory_id=d.id and d.data_source_id=f.id and o.config_id='" + str(
  196. configid) + "' ;", function="retrive")
  197. f = valid()
  198. if f is None:
  199. print("Query error")
  200.  
  201. else:
  202. print("Working On Query", f)
  203. for k in f:
  204. print("current config id is", k[0])
  205. print("current directory", k[1])
  206. print("current Output Dir", k[5])
  207. print("current host", k[2])
  208. print("Username", k[3])
  209. print("Pasword", k[4])
  210. print("FileFilter", k[6])
  211.  
  212. dirid = k[0]
  213. self.curdir = k[1]
  214. self.host = k[2]
  215. self.filefilter = k[6]
  216. self.outdir = k[5]
  217. self.username = k[3]
  218. self.password = k[4]
  219. preprocesser=k[7]
  220. self.finalcsv=k[8]
  221.  
  222.  
  223. print("Preprocesser Type",preprocesser)
  224. #getting file type
  225. def vibrwithoutcols():
  226. print("No cols")
  227. ftp1 = ftpprocesser(host=self.host, username=self.username, password=self.password,
  228. directory=self.curdir, outputdir=self.outdir, filter=self.filefilter)
  229.  
  230. tp1 = ftp1.ftplister()
  231. print(tp1)
  232. self.tp = json.loads(tp1) # from server
  233. for k, l in self.tp.items():
  234. if (k != 'configid'):
  235. print("Insertion")
  236. print("File and tstamp is", k, l)
  237. # For testing will insert all the files from server into db
  238. # ins=SingletonDecorator(query="INSERT INTO gdl_input_files (filename, directory_id, timestamp, processed, date_added) VALUES ('"+k+"', '"+str(dirid)+"','"+l+"','1','"+strftime("%Y-%m-%d %H:%M:%S", gmtime())+"' )",function="insertion")
  239. print("The files from server number", len(self.tp))
  240. # print("Directory is",body['dir'])
  241. print("Getting list of files ****** from DataBase For directory id", dirid)
  242. listoffiles = SingletonDecorator(
  243. query="SELECT distinct(filename),timestamp FROM gdl_input_files where processed=3 and directory_id ='" + str(
  244. dirid) + "'", function="retrive")
  245. lis = listoffiles() # Getting list from database
  246.  
  247. for m in lis:
  248. print("Files in DB", m)
  249.  
  250. self.listindb[m[0]] = m[1] # Arranging files with timestamp on db
  251.  
  252. # Loading the list as json for easy comparison
  253. listin = json.dumps(self.listindb)
  254. self.lii = json.loads(listin)
  255.  
  256. dif = self.diff(list1=self.tp,
  257. list2=self.lii) # Finding new files available on server only which will be not available in datbase
  258. print("asdasdasd")
  259. for fname in dif:
  260. # getting new files time stamp
  261. try:
  262. print("FNAME", fname)
  263. print("timestampis", self.tp[fname])
  264. # adding newly arrived files from server with time stamp
  265. self.finallister[fname] = self.tp[fname]
  266. except:
  267. print("KEy error")
  268.  
  269. print("New files from seerver is", self.finallister)
  270. for fname, timestamp in self.finallister.items():
  271. currenttime, k = str(datetime.now()).split('.')
  272. print("currenttime", currenttime)
  273. print("Inserting new files in server into DB")
  274. print("Filename", fname)
  275. print("Times", timestamp)
  276.  
  277. SingletonDecorator(
  278. # Inserting new files received from server into db
  279. query="INSERT INTO gdl_input_files (filename, directory_id, timestamp, processed, date_added) VALUES ('" + fname + "', '" + str(
  280. dirid) + "','" + l + "','3','" + currenttime + "')", function="insertion")
  281.  
  282. print("List from DB", lis)
  283. print("Current files list from server dir", self.tp)
  284. print(len(self.lii))
  285. print(len(self.tp))
  286. self.finallist = self.timestamp(list1=self.tp, list2=self.lii)
  287. for fname in self.finallist:
  288. print("New insertion")
  289. ol = self.tp[fname]
  290. # Updating existing files in fname with new timestamp
  291. SingletonDecorator(
  292. "UPDATE gdl_input_files SET processed ='1',timestamp='" + ol + "' WHERE filename='" + fname + "'",
  293. function="insertion")
  294. print("New Update Query")
  295. print("From server is", self.tp)
  296. print("From DB is", self.lii)
  297. print("From server is", len(self.tp))
  298. print("From DB is", len(self.lii))
  299. print("Final list is", self.finallist)
  300. print("Final listcount", len(self.finallist))
  301. time.sleep(0.01)
  302. op = threading.Thread(target=ftp1.ftptester, args=[self.finallist], daemon=False)
  303. op.start()
  304. op.join()
  305. warnings.filterwarnings("ignore")
  306.  
  307.  
  308. for fl in self.finallist:
  309. try:
  310.  
  311. pool = ThreadPool(processes=len(self.finallist))
  312. async_result = pool.apply_async(target= load_instantel_ascii(fleName=fl, columns=None,
  313. out=self.outdir))
  314.  
  315. head, dfg, df1=async_result.get()
  316.  
  317.  
  318. '''
  319. head, dfg, df1 = load_instantel_ascii(fleName=fl, columns=None,
  320. out=self.outdir)
  321. '''
  322. print("DATA Frame is", df1.duplicated())
  323. '''
  324. calculate_ppv(fleName=fl, columns=None, separator=',', fileformat='syscom',
  325. timeWindowSec=10,
  326. df=df1, head=head, out=self.outdir)
  327. '''
  328.  
  329. ml12 = threading.Thread(target=calculate_ppv(fleName=fl, columns=None, separator=',', fileformat='syscom',
  330. timeWindowSec=10,
  331. df=df1, head=head, out=self.outdir))
  332. ml12.start()
  333. ml12.join()
  334.  
  335.  
  336. SingletonDecorator(
  337. "UPDATE gdl_input_files SET processed ='3' , errordata='0' WHERE filename='" + fl + "'",
  338. function="insertion")
  339. except:
  340. print("Error on Graph Generation")
  341. '''
  342. SingletonDecorator(
  343. "UPDATE gdl_input_files SET processed ='4' , errordata='3' WHERE filename='" + fl + "'",
  344. function="insertion")
  345. '''
  346. return
  347. def vibr():
  348. print("VIBR FIle")
  349. try:
  350. t,fileformat=temp1[0].split('=')
  351. t, timewindow = temp1[1].split('=')
  352. try:
  353. t, separator= temp1[2].split('=')
  354. except:
  355. separator=''
  356. tcols,tcols1=temp1[3].split('=')
  357. self.cols=tcols1.split('|')
  358. print("Colums",self.cols)
  359. print("Fileformat",fileformat)
  360. print("separ",separator)
  361.  
  362. print("CSV info", self.finalcsv)
  363. temp2, temp3 = self.finalcsv.split("=")
  364. self.finalcsv = temp3.split("|")
  365. print("FInal csv", self.finalcsv)
  366. except:
  367. print("Not in proper VIBR")
  368. ftp1 = ftpprocesser(host=self.host, username=self.username, password=self.password,
  369. directory=self.curdir, outputdir=self.outdir, filter=self.filefilter)
  370.  
  371. tp1 = ftp1.ftplister()
  372. print(tp1)
  373. self.tp = json.loads(tp1) # from server
  374. for k, l in self.tp.items():
  375. if (k != 'configid'):
  376. print("Insertion")
  377. print("File and tstamp is", k, l)
  378. # For testing will insert all the files from server into db
  379. # ins=SingletonDecorator(query="INSERT INTO gdl_input_files (filename, directory_id, timestamp, processed, date_added) VALUES ('"+k+"', '"+str(dirid)+"','"+l+"','1','"+strftime("%Y-%m-%d %H:%M:%S", gmtime())+"' )",function="insertion")
  380. print("The files from server number", len(self.tp))
  381. # print("Directory is",body['dir'])
  382. print("Getting list of files ****** from DataBase For directory id", dirid)
  383. listoffiles = SingletonDecorator(
  384. query="SELECT distinct(filename),timestamp FROM gdl_input_files where processed=3 and directory_id ='" + str(
  385. dirid) + "'", function="retrive")
  386. lis = listoffiles() # Getting list from database
  387.  
  388. for m in lis:
  389. print("Files in DB", m)
  390.  
  391. self.listindb[m[0]] = m[1] # Arranging files with timestamp on db
  392.  
  393. # Loading the list as json for easy comparison
  394. listin = json.dumps(self.listindb)
  395. self.lii = json.loads(listin)
  396.  
  397. dif = self.diff(list1=self.tp,
  398. list2=self.lii) # Finding new files available on server only which will be not available in datbase
  399. print("asdasdasd")
  400. for fname in dif:
  401. # getting new files time stamp
  402. try:
  403. print("FNAME", fname)
  404. print("timestampis", self.tp[fname])
  405. # adding newly arrived files from server with time stamp
  406. self.finallister[fname] = self.tp[fname]
  407. except:
  408. print("KEy error")
  409.  
  410. print("New files from seerver is", self.finallister)
  411. for fname, timestamp in self.finallister.items():
  412. currenttime, k = str(datetime.now()).split('.')
  413. print("currenttime", currenttime)
  414. print("Inserting new files in server into DB")
  415. print("Filename", fname)
  416. print("Times", timestamp)
  417.  
  418. SingletonDecorator(
  419. # Inserting new files received from server into db
  420. query="INSERT INTO gdl_input_files (filename, directory_id, timestamp, processed, date_added) VALUES ('" + fname + "', '" + str(
  421. dirid) + "','" + l + "','3','" + currenttime + "')", function="insertion")
  422.  
  423. print("List from DB", lis)
  424. print("Current files list from server dir", self.tp)
  425. print(len(self.lii))
  426. print(len(self.tp))
  427. self.finallist = self.timestamp(list1=self.tp, list2=self.lii)
  428. for fname in self.finallist:
  429. print("New insertion")
  430. ol = self.tp[fname]
  431. # Updating existing files in fname with new timestamp
  432. SingletonDecorator(
  433. "UPDATE gdl_input_files SET processed ='1',timestamp='" + ol + "' WHERE filename='" + fname + "'",
  434. function="insertion")
  435. print("New Update Query")
  436. print("From server is", self.tp)
  437. print("From DB is", self.lii)
  438. print("From server is", len(self.tp))
  439. print("From DB is", len(self.lii))
  440. print("Final list is", self.finallist)
  441. print("Final listcount", len(self.finallist))
  442. th = threading.Thread(target=ftp1.ftptester, args=[self.finallist], daemon=True)
  443. th.start()
  444. th.join()
  445. warnings.filterwarnings("ignore")
  446. #try:
  447. # ml1 = threading.Thread(target=self.calculate, args=[self.finallist], daemon=False)
  448. # ml1.start()
  449. # ml1.join()
  450.  
  451.  
  452. # except:
  453. # print("Error on Graph Generation")
  454.  
  455. i=1
  456. threads = []
  457. for fl in self.finallist:
  458.  
  459. try:
  460. i += 1
  461. '''
  462. head, dfg, df1 = load_syscom_ascii(fleName=fl, columns=self.cols, finalcsv=self.finalcsv,
  463. out=self.outdir)
  464. '''
  465.  
  466. # do some other stuff in the main process
  467.  
  468.  
  469. locals()[i] = threading.Thread(target=load_syscom_ascii,
  470. kwargs={'fleName': fl, 'columns': self.cols,
  471. 'finalcsv': self.finalcsv, 'out': self.outdir,
  472. 'separator': ',', 'fileformat': 'syscom',
  473. 'timeWindowSec': '10'})
  474. locals()[i].start()
  475.  
  476. locals()[i].join()
  477.  
  478. # print("DATA Frame is", .duplicated())
  479. '''
  480. calculate_ppv(fleName=fl, columns=self.cols, separator=',', fileformat='syscom',
  481. timeWindowSec=10,
  482. df=df1, head=head, out=self.outdir)
  483.  
  484.  
  485.  
  486. m1=threading.Thread(target=calculate_ppv(fleName=fl, columns=self.cols, separator=',', fileformat='syscom',
  487. timeWindowSec=10,
  488. df=df1, head=head, out=self.outdir),deamon=False)
  489.  
  490. ml.start()
  491. threads.append(m1)
  492. #ml.join()
  493. '''
  494.  
  495. except:
  496. print("Error")
  497.  
  498. for thread in threads:
  499. thread.join()
  500.  
  501.  
  502. filetypes = {'vibr': vibr,'vibrnocols':vibrwithoutcols}
  503. try:
  504. ftype, t = preprocesser.split(":")
  505. temp1 = t.split(";")
  506. # Function for VIBR file
  507.  
  508. if ftype == 'VIBR':
  509. filetypes['vibr']()
  510. except:
  511. print("No colsss")
  512. filetypes['vibrnocols']()
  513.  
  514. print("--- %s seconds ---" % (time.time() - self.start_time))
  515. print("Total number of files retrived", len(self.finallist))
  516. print(' [*] Waiting for messages. To exit press CTRL+C')
  517.  
  518. def main():
  519. global event_loop
  520. try:
  521. event_loop= asyncio.get_event_loop()
  522.  
  523. obj=routingfile()
  524. event_loop.run_until_complete(obj.rpcserver())
  525.  
  526. event_loop.run_forever()
  527. except:
  528. print("event Lopp errro")
  529. if __name__ == '__main__':
  530. #If you execute rpcserver as main module below line will be excuted first
  531. main()
Add Comment
Please, Sign In to add comment