Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pyspider.result import ResultWorker
- import psycopg2
- import sqlalchemy
- from sqlalchemy import Table, Column, Integer, String, Float, DateTime
- import sqlalchemy.orm
- import database
- import logmanager
- import time
- import re
- from datetime import datetime
- class MyResultWorker(ResultWorker):
- def on_result(self, task, result):
- assert task
- assert result
- session = None
- logger = logmanager.get_logger()
- #if 'query' in result: logger.info("--- LOGGING RESULT: " + task['taskid'] + " for query: " + result['query'] + " ---")
- #else: logger.info("--- LOGGING RESULT: " + task['taskid'] + " for query: " + " ---")
- #logger.info(result['result_type'])
- tableName = "fail"
- if 'result_type' in result:
- tableName = result['result_type']
- if tableName == 'related':
- connection = database.connect(user="postgres", password="", db="cc")
- meta = sqlalchemy.MetaData(bind=connection, reflect=True)
- if connection.dialect.has_table(connection.connect(), tableName) == False:
- test = Table(tableName, meta,
- Column('project', String), #
- Column('url', String), #
- Column('status', Integer), #
- Column('error', String), #
- #Column('result_type', String),
- Column('query', String),
- Column('time_taken', Float),
- Column('time_comitted', Float),
- Column('last_crawl_time', Float),
- Column('taskid', String(64), primary_key=True, nullable=False)) #
- meta.create_all(connection)
- #test = meta.tables[tableName]
- row = {}
- if 'url' in result: row.update({'url': result['url']})
- if 'status' in result: row.update({'status': result['status']})
- if 'error' in result: row.update({'error': result['error']})
- if 'time_taken' in result: row.update({'time_taken': result['time_taken']})
- #if 'result_type' in result: row.update({'result_type': result['result_type']})
- if 'query' in result: row.update({'query': result['query']})
- if time:
- row.update({'time_comitted': time.time()})
- else:
- row.update({'time_comitted': 0.0})
- if 'project' in task: row.update({'project': task['project']})
- #if 'project_updatetime' in task: row.update({'task_update_time': task['project_updatetime']})
- if 'lastcrawltime' in task: row.update({'last_crawl_time': task['lastcrawltime']})
- if 'taskid' in task: row.update({'taskid': task['taskid']})
- else:
- logger.info("hopefully should never see this?")
- connection = database.connect(user="postgres", password="", db="cc")
- meta = sqlalchemy.MetaData(bind=connection, reflect=True)
- Session = sqlalchemy.orm.sessionmaker(bind=connection)
- session = Session()
- try:
- Session = sqlalchemy.orm.sessionmaker(bind=connection)
- session = Session()
- meta = sqlalchemy.MetaData(bind=connection, reflect=True)
- test = Table(tableName, meta, autoload=True)
- insertion = test.insert().values(row)
- session.execute(insertion)
- session.commit()
- except psycopg2.IntegrityError:
- logger.info("psycopg2.IntegrityError")
- except sqlalchemy.exc.IntegrityError:
- logger.info("sqlalchemy.exc.IntegrityError")
- finally:
- session.close()
- elif tableName == 'external':
- if 'query' in result: tableName = tableName + "_" + result['query']
- connection = database.connect(user="postgres", password="", db="cc")
- meta = sqlalchemy.MetaData(bind=connection, reflect=True)
- if connection.dialect.has_table(connection.connect(), tableName) == False:
- test = Table(tableName, meta,
- Column('url', String), #
- Column('original_url', String),
- Column('bing_url', String),#
- Column('status', Integer), #
- Column('error', String), #
- Column('query', String),
- #Column('time_invoked', Float),
- #Column('time_returned', Float),
- #Column('time_comitted', Float),
- Column('time_invoked', DateTime),
- Column('time_returned', DateTime),
- Column('time_comitted', DateTime),
- Column('time_taken_total', Float),
- Column('time_taken', Float),
- Column('last_crawl_time', DateTime),
- Column('project', String), #
- #Column('task_update_time', Float),
- Column('taskid', String(64), primary_key=True, nullable = False)) #
- meta.create_all(connection)
- #test = meta.tables[tableName]
- #logger.info(test)
- row = {}
- if 'url' in result: row.update({'url': result['url']})
- if 'original_url' in result: row.update({'original_url': result['original_url']})
- if 'bing_url' in result: row.update({'bing_url': result['bing_url']})
- if 'status' in result: row.update({'status': result['status']})
- if 'error' in result: row.update({'error': result['error']})
- if 'time_taken' in result: row.update({'time_taken': round(result['time_taken'],3)})
- #if 'time_taken' in result: row.update({'time_taken': datetime.fromtimestamp(round(result['time_taken'], 2))})
- #if 'time_invoked' in result: row.update({'time_invoked': result['time_invoked']})
- #if 'time_returned' in result: row.update({'time_returned': result['time_returned']})
- if 'time_invoked' in result:
- row.update({'time_invoked': datetime.fromtimestamp(round(result['time_invoked'], 2))})
- if 'time_returned' in result:
- row.update({'time_returned': datetime.fromtimestamp(round(result['time_returned'], 2))})
- # logger.info(time.time())
- if datetime: row.update({'time_comitted': datetime.now()})
- # else: row.update({'time_comitted': 0.0})
- if 'project' in task: row.update({'project': task['project']})
- #if 'updatetime' in task: row.update({'task_update_time': task['updatetime']})
- #now_timestamp = time.time()
- #offset = datetime.fromtimestamp(now_timestamp) - datetime.utcfromtimestamp(now_timestamp)
- #return utc_datetime + offset
- if 'lastcrawltime' in task:
- if task['lastcrawltime'] == None:
- pass
- else:
- row.update({'last_crawl_time': datetime.fromtimestamp(round(task['lastcrawltime'],1))})
- #logger.info(datetime.fromtimestamp(round(task['lastcrawltime'], 1)))
- if 'taskid' in task: row.update({'taskid': task['taskid']})
- if 'query' in result: row.update({'query': result['query']})
- if 'time_invoked' in result and 'time_returned' in result:
- row.update({'time_taken_total': round((result.get('time_returned') - result.get('time_invoked')),3)})
- if 'time_waiting' in result: row.update({'time_waiting': result['time_waiting']})
- if 'time_processing' in result: row.update({'time_processing': result['time_processing']})
- try:
- Session = sqlalchemy.orm.sessionmaker(bind=connection)
- session = Session()
- meta = sqlalchemy.MetaData(bind=connection, reflect=True)
- test = Table(tableName, meta, autoload=True)
- insertion = test.insert().values(row)
- session.execute(insertion)
- session.commit()
- except psycopg2.IntegrityError:
- logger.info("psycopg2.IntegrityError")
- except sqlalchemy.exc.IntegrityError:
- logger.info("sqlalchemy.exc.IntegrityError")
- finally:
- session.close()
- if 'related_links' in result:
- tableName = tableName + "_related"
- pattern = "(mid=[A-F0-9]+)&(rvsmid=[A-F0-9]+)"
- patternTwo = "([A-F0-9]+)"
- patternThree ="mid=([^&]+)"
- regex = re.compile(patternThree, re.IGNORECASE)
- #logger.info("external related")
- connection = database.connect(user="postgres", password="", db="cc")
- meta = sqlalchemy.MetaData(bind=connection, reflect=True)
- if connection.dialect.has_table(connection.connect(), tableName) == False:
- related = Table(tableName, meta,
- Column('related_title', String),
- Column('related_id', String, primary_key=True, nullable=False),
- Column('bing_id', String),#
- Column('query', String),
- Column('time_invoked', DateTime),
- Column('time_returned', DateTime),
- Column('time_comitted', DateTime),
- Column('time_waiting', Float),
- Column('time_processing', Float),
- Column('time_taken_total', Float),
- Column('time_taken', Float))
- #Column('project', String) #
- #Column('task_update_time', Float),
- #Column('last_crawl_time', Float),
- #Column('taskid', String(64), primary_key=True, nullable=False)) #
- meta.create_all(connection)
- related = meta.tables[tableName]
- logger.info(related)
- row = {}
- relatedArray = []
- #logger.info(result)
- #if 'bing_id' in result: row.update({'bing_id': result['bing_id']}
- if 'related_links' and 'related_titles' in result:
- relatedLinksList = result['related_links']
- relatedTitlesList = result['related_titles']
- #relatedSet = set()
- #logger.info(relatedList)
- for count, each in enumerate(relatedLinksList):
- #logger.info(count)
- #logger.info(each)
- # logger.info(each)
- regexResult = regex.findall(each)
- #re.findall(r'id=([^&]+)', link)
- row.update({'related_title':relatedTitlesList[count]})
- row.update({'related_id': regexResult[0]})
- row.update({'bing_id': regexResult[1]})
- if 'time_taken' in result:
- row.update({'time_taken': round(result['time_taken'], 3)})
- if 'time_invoked' in result:
- row.update({'time_invoked': datetime.fromtimestamp(round(result['time_invoked'], 2))})
- if 'time_returned' in result:
- row.update({'time_returned': datetime.fromtimestamp(round(result['time_returned'], 2))})
- if datetime: row.update({'time_comitted': datetime.now()})
- if 'query' in result: row.update({'query': result['query']})
- if 'time_invoked' in result and 'time_returned' in result:
- row.update({'time_taken_total': (result.get('time_returned') - result.get('time_invoked'))})
- if 'time_waiting' in result: row.update({'time_waiting': result['time_waiting']})
- if 'time_processing' in result: row.update({'time_processing': result['time_processing']})
- #logger.info(row)
- relatedArray.append(row)
- #relatedSet.add(row)
- # for each in relatedList:
- # if each not in relatedSet:
- # relatedArray.append(regexResult[0])
- # relatedSet.add(regexResult[0])
- try:
- Session = sqlalchemy.orm.sessionmaker(bind=connection)
- newSession = Session()
- meta = sqlalchemy.MetaData(bind=connection, reflect=True)
- related = Table(tableName, meta, autoload=True)
- for each in relatedArray:
- logger.info(each)
- insertion = related.insert().values(each)
- logger.info(insertion)
- newSession.execute(insertion)
- newSession.commit()
- except psycopg2.IntegrityError as e:
- #session.rollback()
- logger.info("psycopg2.IntegrityError")
- except sqlalchemy.exc.IntegrityError as e:
- logger.info("sqlalchemy.exc.IntegrityError")
- #session.rollback()
- finally:
- newSession.close()
- return
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement