Advertisement
Guest User

Untitled

a guest
Aug 25th, 2016
122
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 13.77 KB | None | 0 0
  1. from pyspider.result import ResultWorker
  2. import psycopg2
  3. import sqlalchemy
  4. from sqlalchemy import Table, Column, Integer, String, Float, DateTime
  5. import sqlalchemy.orm
  6. import database
  7. import logmanager
  8. import time
  9. import re
  10. from datetime import datetime
  11.  
  12. class MyResultWorker(ResultWorker):
  13.  
  14. def on_result(self, task, result):
  15.  
  16. assert task
  17. assert result
  18. session = None
  19.  
  20. logger = logmanager.get_logger()
  21. #if 'query' in result: logger.info("--- LOGGING RESULT: " + task['taskid'] + " for query: " + result['query'] + " ---")
  22. #else: logger.info("--- LOGGING RESULT: " + task['taskid'] + " for query: " + " ---")
  23. #logger.info(result['result_type'])
  24. tableName = "fail"
  25. if 'result_type' in result:
  26. tableName = result['result_type']
  27. if tableName == 'related':
  28. connection = database.connect(user="postgres", password="", db="cc")
  29. meta = sqlalchemy.MetaData(bind=connection, reflect=True)
  30. if connection.dialect.has_table(connection.connect(), tableName) == False:
  31. test = Table(tableName, meta,
  32. Column('project', String), #
  33. Column('url', String), #
  34. Column('status', Integer), #
  35. Column('error', String), #
  36. #Column('result_type', String),
  37. Column('query', String),
  38. Column('time_taken', Float),
  39. Column('time_comitted', Float),
  40. Column('last_crawl_time', Float),
  41. Column('taskid', String(64), primary_key=True, nullable=False)) #
  42. meta.create_all(connection)
  43. #test = meta.tables[tableName]
  44. row = {}
  45.  
  46. if 'url' in result: row.update({'url': result['url']})
  47. if 'status' in result: row.update({'status': result['status']})
  48. if 'error' in result: row.update({'error': result['error']})
  49. if 'time_taken' in result: row.update({'time_taken': result['time_taken']})
  50.  
  51. #if 'result_type' in result: row.update({'result_type': result['result_type']})
  52. if 'query' in result: row.update({'query': result['query']})
  53.  
  54. if time:
  55. row.update({'time_comitted': time.time()})
  56. else:
  57. row.update({'time_comitted': 0.0})
  58.  
  59. if 'project' in task: row.update({'project': task['project']})
  60. #if 'project_updatetime' in task: row.update({'task_update_time': task['project_updatetime']})
  61. if 'lastcrawltime' in task: row.update({'last_crawl_time': task['lastcrawltime']})
  62. if 'taskid' in task: row.update({'taskid': task['taskid']})
  63.  
  64. else:
  65. logger.info("hopefully should never see this?")
  66. connection = database.connect(user="postgres", password="", db="cc")
  67. meta = sqlalchemy.MetaData(bind=connection, reflect=True)
  68. Session = sqlalchemy.orm.sessionmaker(bind=connection)
  69. session = Session()
  70.  
  71. try:
  72. Session = sqlalchemy.orm.sessionmaker(bind=connection)
  73. session = Session()
  74. meta = sqlalchemy.MetaData(bind=connection, reflect=True)
  75. test = Table(tableName, meta, autoload=True)
  76. insertion = test.insert().values(row)
  77. session.execute(insertion)
  78. session.commit()
  79. except psycopg2.IntegrityError:
  80. logger.info("psycopg2.IntegrityError")
  81. except sqlalchemy.exc.IntegrityError:
  82. logger.info("sqlalchemy.exc.IntegrityError")
  83. finally:
  84. session.close()
  85.  
  86.  
  87. elif tableName == 'external':
  88. if 'query' in result: tableName = tableName + "_" + result['query']
  89. connection = database.connect(user="postgres", password="", db="cc")
  90. meta = sqlalchemy.MetaData(bind=connection, reflect=True)
  91. if connection.dialect.has_table(connection.connect(), tableName) == False:
  92. test = Table(tableName, meta,
  93. Column('url', String), #
  94. Column('original_url', String),
  95. Column('bing_url', String),#
  96. Column('status', Integer), #
  97. Column('error', String), #
  98. Column('query', String),
  99. #Column('time_invoked', Float),
  100. #Column('time_returned', Float),
  101. #Column('time_comitted', Float),
  102. Column('time_invoked', DateTime),
  103. Column('time_returned', DateTime),
  104. Column('time_comitted', DateTime),
  105. Column('time_taken_total', Float),
  106. Column('time_taken', Float),
  107. Column('last_crawl_time', DateTime),
  108. Column('project', String), #
  109. #Column('task_update_time', Float),
  110.  
  111. Column('taskid', String(64), primary_key=True, nullable = False)) #
  112. meta.create_all(connection)
  113. #test = meta.tables[tableName]
  114. #logger.info(test)
  115. row = {}
  116.  
  117. if 'url' in result: row.update({'url': result['url']})
  118. if 'original_url' in result: row.update({'original_url': result['original_url']})
  119. if 'bing_url' in result: row.update({'bing_url': result['bing_url']})
  120. if 'status' in result: row.update({'status': result['status']})
  121. if 'error' in result: row.update({'error': result['error']})
  122. if 'time_taken' in result: row.update({'time_taken': round(result['time_taken'],3)})
  123.  
  124. #if 'time_taken' in result: row.update({'time_taken': datetime.fromtimestamp(round(result['time_taken'], 2))})
  125.  
  126. #if 'time_invoked' in result: row.update({'time_invoked': result['time_invoked']})
  127. #if 'time_returned' in result: row.update({'time_returned': result['time_returned']})
  128.  
  129. if 'time_invoked' in result:
  130. row.update({'time_invoked': datetime.fromtimestamp(round(result['time_invoked'], 2))})
  131. if 'time_returned' in result:
  132. row.update({'time_returned': datetime.fromtimestamp(round(result['time_returned'], 2))})
  133.  
  134. # logger.info(time.time())
  135. if datetime: row.update({'time_comitted': datetime.now()})
  136. # else: row.update({'time_comitted': 0.0})
  137.  
  138. if 'project' in task: row.update({'project': task['project']})
  139. #if 'updatetime' in task: row.update({'task_update_time': task['updatetime']})
  140. #now_timestamp = time.time()
  141. #offset = datetime.fromtimestamp(now_timestamp) - datetime.utcfromtimestamp(now_timestamp)
  142. #return utc_datetime + offset
  143.  
  144. if 'lastcrawltime' in task:
  145. if task['lastcrawltime'] == None:
  146. pass
  147. else:
  148. row.update({'last_crawl_time': datetime.fromtimestamp(round(task['lastcrawltime'],1))})
  149. #logger.info(datetime.fromtimestamp(round(task['lastcrawltime'], 1)))
  150. if 'taskid' in task: row.update({'taskid': task['taskid']})
  151.  
  152. if 'query' in result: row.update({'query': result['query']})
  153.  
  154. if 'time_invoked' in result and 'time_returned' in result:
  155. row.update({'time_taken_total': round((result.get('time_returned') - result.get('time_invoked')),3)})
  156. if 'time_waiting' in result: row.update({'time_waiting': result['time_waiting']})
  157. if 'time_processing' in result: row.update({'time_processing': result['time_processing']})
  158.  
  159. try:
  160. Session = sqlalchemy.orm.sessionmaker(bind=connection)
  161. session = Session()
  162. meta = sqlalchemy.MetaData(bind=connection, reflect=True)
  163. test = Table(tableName, meta, autoload=True)
  164. insertion = test.insert().values(row)
  165. session.execute(insertion)
  166. session.commit()
  167. except psycopg2.IntegrityError:
  168. logger.info("psycopg2.IntegrityError")
  169. except sqlalchemy.exc.IntegrityError:
  170. logger.info("sqlalchemy.exc.IntegrityError")
  171. finally:
  172. session.close()
  173.  
  174. if 'related_links' in result:
  175. tableName = tableName + "_related"
  176. pattern = "(mid=[A-F0-9]+)&(rvsmid=[A-F0-9]+)"
  177. patternTwo = "([A-F0-9]+)"
  178. patternThree ="mid=([^&]+)"
  179. regex = re.compile(patternThree, re.IGNORECASE)
  180. #logger.info("external related")
  181. connection = database.connect(user="postgres", password="", db="cc")
  182. meta = sqlalchemy.MetaData(bind=connection, reflect=True)
  183. if connection.dialect.has_table(connection.connect(), tableName) == False:
  184. related = Table(tableName, meta,
  185. Column('related_title', String),
  186. Column('related_id', String, primary_key=True, nullable=False),
  187. Column('bing_id', String),#
  188. Column('query', String),
  189. Column('time_invoked', DateTime),
  190. Column('time_returned', DateTime),
  191. Column('time_comitted', DateTime),
  192. Column('time_waiting', Float),
  193. Column('time_processing', Float),
  194. Column('time_taken_total', Float),
  195. Column('time_taken', Float))
  196. #Column('project', String) #
  197. #Column('task_update_time', Float),
  198. #Column('last_crawl_time', Float),
  199. #Column('taskid', String(64), primary_key=True, nullable=False)) #
  200. meta.create_all(connection)
  201. related = meta.tables[tableName]
  202. logger.info(related)
  203. row = {}
  204. relatedArray = []
  205. #logger.info(result)
  206. #if 'bing_id' in result: row.update({'bing_id': result['bing_id']}
  207. if 'related_links' and 'related_titles' in result:
  208. relatedLinksList = result['related_links']
  209. relatedTitlesList = result['related_titles']
  210. #relatedSet = set()
  211. #logger.info(relatedList)
  212. for count, each in enumerate(relatedLinksList):
  213. #logger.info(count)
  214. #logger.info(each)
  215. # logger.info(each)
  216. regexResult = regex.findall(each)
  217. #re.findall(r'id=([^&]+)', link)
  218. row.update({'related_title':relatedTitlesList[count]})
  219. row.update({'related_id': regexResult[0]})
  220. row.update({'bing_id': regexResult[1]})
  221. if 'time_taken' in result:
  222. row.update({'time_taken': round(result['time_taken'], 3)})
  223. if 'time_invoked' in result:
  224. row.update({'time_invoked': datetime.fromtimestamp(round(result['time_invoked'], 2))})
  225. if 'time_returned' in result:
  226. row.update({'time_returned': datetime.fromtimestamp(round(result['time_returned'], 2))})
  227.  
  228. if datetime: row.update({'time_comitted': datetime.now()})
  229.  
  230. if 'query' in result: row.update({'query': result['query']})
  231.  
  232. if 'time_invoked' in result and 'time_returned' in result:
  233. row.update({'time_taken_total': (result.get('time_returned') - result.get('time_invoked'))})
  234. if 'time_waiting' in result: row.update({'time_waiting': result['time_waiting']})
  235. if 'time_processing' in result: row.update({'time_processing': result['time_processing']})
  236. #logger.info(row)
  237. relatedArray.append(row)
  238. #relatedSet.add(row)
  239. # for each in relatedList:
  240. # if each not in relatedSet:
  241. # relatedArray.append(regexResult[0])
  242. # relatedSet.add(regexResult[0])
  243.  
  244. try:
  245. Session = sqlalchemy.orm.sessionmaker(bind=connection)
  246. newSession = Session()
  247. meta = sqlalchemy.MetaData(bind=connection, reflect=True)
  248. related = Table(tableName, meta, autoload=True)
  249. for each in relatedArray:
  250. logger.info(each)
  251. insertion = related.insert().values(each)
  252. logger.info(insertion)
  253. newSession.execute(insertion)
  254. newSession.commit()
  255. except psycopg2.IntegrityError as e:
  256. #session.rollback()
  257. logger.info("psycopg2.IntegrityError")
  258. except sqlalchemy.exc.IntegrityError as e:
  259.  
  260. logger.info("sqlalchemy.exc.IntegrityError")
  261. #session.rollback()
  262. finally:
  263. newSession.close()
  264.  
  265. return
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement