Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: UTF-8 -*-
- import sys
- import logging
- import base64
- from random import randint
- from satchmo.common_lib.coroutines import Return, WaitFirstTask, Sleep, Task
- from differ import files
- from satchmo.common_lib.webkitrpc import BrowserProxy, getRpcClient
- from satchmo.common_lib.webkit import Browser
- from satchmo.common_lib.colorer import bold
- from satchmo.common_lib.corpc import RpcServerDisconnected
- from PyQt4.QtCore import QCoreApplication, QByteArray, QRegExp
- from PyQt4.QtNetwork import QNetworkRequest
- from lxml import etree
- #-------------------------------------------------------------------#
- from Queue import Queue
- #-------------------------------------------------------------------#
- settings = None
- log = logging.getLogger( 'loader' )
- class BaseLoaderSettings:
- # Очень большая задержка на максимальное время работы coProcessPage
- # Поскольку он может глубоко уходить в рекурсию
- max_worker_timeout = 6 * 3600 * 1000 # 6 часов
- # задержка между параллельными запросами
- requests_delay = 200 # msecs
- # Максимальное кол-во параллельных сопрограмм на одном уровне рекурсии
- # в BaseLoader.coProcessPage
- max_parallel_tasks = 3
- # сколько раз пытаемся пересасывать страничку?
- retries = 5
- # Увеличенный timeout на binary объекты
- binary_browser_timeout = 5 * 60 # 5 минут
- # первая экспоненциальная задержка между попытками
- error_sleep_secs = 10 # секунд
- # максимальная вложенность BaseLoader.coProcessPage (не совсем вложенность, при реализации paginator'ов может быть линейно)
- max_recursion = 100
- # Таймауты и время жизни заявок после выполнения для различных типов
- # RedisRPC запросов (в секундах)
- # Получение даты модификации файлов
- get_files_info_timeout = 120
- get_files_info_ttl = 15
- # Загрузка файла
- upload_file_timeout = 120
- upload_file_ttl = 15
- # Обновление информации о товарах и категориях
- update_timeout = 1800
- update_ttl = 24 * 3600
- # Минимальное время засыпания в случае падения webkitrpc
- webkitrpc_sleep = 15 # секунд
- # Случайный 'досып'
- webkitrpc_sleep_rnd = 10 # секунд
- # Базовый класс странички какого-нибудь магазина.
- # В детях имплементируется парсинг интересных данных в XML и создание
- # других страниц, которые нужно грузить и парсить.
- #
- # СТАРАЙТЕСЬ ЧТОБЫ 1 СТРАНИЧКА НЕ СОСАЛА БОЛЕЕ 1 GET'a
- # Переходы по PAGINATOR'ам - отдельными страничками!
- #
- # При реализации, нужно учитывать, что при обработке ошибок
- # страницу просят скачаться несколько раз. Т.е. она должна быть 'stateless'.
- #
- # mode - full / quick - полный или быстрый режим засоса
- class BasePage:
- # Для избежания выкачивания URL'a несколько раз
- urls = set()
- # URL однозначно определяет страницу, как PRIMARY KEY
- # Поэтому, чтобы не скачивать НЕСКОЛЬКО РАЗ, мы проверяем
- # нет ли уже созданной странички с таким URL'ом.
- # Если уже есть, то мы помечаем, что сосать НЕ нужно
- # и цикл в coProcessPage не будет сосать страничку как дочернюю
- #
- # replaceParent - означает не добавлять xml результат к родителю, а заменить родителя
- # Актуально, например, если грузится продукт с одним и тем же ext_id
- # Удобно давать родителя как тэк <product> и выставлять replaceParent = True
- def __init__( self, mode, url, xmlParentNode, cookies = None, replaceParent = False ):
- if url in BasePage.urls:
- log.debug( bold('BasePage.__init__(): %s already in BasePage.urls!' % url) )
- self.skip = True
- else:
- BasePage.urls.add( url )
- self.skip = False
- log.debug( bold('BasePage.__init__(): %s added to BasePage.urls' % url) )
- self.mode = mode
- self.url = url
- self.xmlParentNode = xmlParentNode
- self.cookies = cookies
- self.replaceParent = replaceParent
- # Переопределяется в дочерних классах.
- # Возвращяет tuple ( xmlData, set(pages) )
- # pages - список страниц, которые нужно грузить
- def coLoadParse( self ):
- yield
- raise Exception( 'Implement please..' )
- # Высасывание файлов одинаковое для всех.
- #
- # URL и тип файла берется из родительского XML'a.
- # Он должен иметь тэг file!
- # browserProxy -- выбор браузера -- по умолчанию - BrowserProxy, при False - просто Browser
- class FilePage( BasePage ):
- def __init__( self, mode, url, xmlParentNode, cookies = None, rpcClient = None, browserProxy = True ):
- BasePage.__init__( self, mode, url, xmlParentNode, cookies )
- if (not rpcClient) and (browserProxy):
- rpcClient = getRpcClient()
- self.browserProxy = browserProxy
- self.rpcClient = rpcClient
- def coLoadParse( self ):
- # пропускаем картинки в quick режиме..
- if self.mode == 'quick':
- yield Return( etree.Element('page'), set() )
- browser = (BrowserProxy( timeoutSecs = settings.binary_browser_timeout, saveBinary = 1, rpcClient = self.rpcClient )) if (self.browserProxy) else (Browser( timeoutSecs = settings.binary_browser_timeout, saveBinary = 1 ));
- if self.cookies:
- browser.setCookies( self.cookies )
- # Узнаем дату модификации у differ'a
- last_modification = files.getFileInfo( settings.provider_name, self.url )
- log.debug( 'FilePage.coFullLoadParse(): %s last_modification %s' % (self.url, last_modification) )
- if last_modification:
- # Установим флажек, чтобы browser добавил к header'у if-modified-sience
- yield browser.setIfModified( last_modification )
- # Сосем картинку или получаем ответ, что не изменилась
- # Поскольку могут быть разные redirect'ы - все сосем
- yield browser.get( self.url, '1', QRegExp( '.+' ) )
- # Проверим, что сам файл не выдал 404
- attributes = (yield browser.getAttributes( self.url )) if (self.browserProxy) else (browser.getAttributes( self.url ))
- status = attributes[ QNetworkRequest.HttpStatusCodeAttribute ].toString()
- if str(status) == '404':
- log.error( bold('FilePage.coFullLoadParse(): %s file not found %s' % (self.url, str(status))) )
- self.xmlParentNode.set( 'error', 'HTTP STATUS 404' )
- yield Return( etree.Element('page'), set() )
- # Берем результаты последнего QNetworkRequest'a
- data = (yield browser.getBinaryData()) if (self.browserProxy) else (browser.getBinaryData())
- headers = (yield browser.getHeaders()) if (self.browserProxy) else (browser.getHeaders())
- attributes = (yield browser.getAttributes()) if (self.browserProxy) else (browser.getAttributes ())
- status = attributes[ QNetworkRequest.HttpStatusCodeAttribute ].toString()
- if str(status) == '200':
- # если файл пустой
- if not data:
- self.xmlParentNode.set( 'error', 'EMPTY REPLY' )
- (yield Return(etree.Element('page'), set())) if (self.browserProxy) else (Return( etree.Element('page'), set() ))
- last_modified = headers[ QByteArray('Last-Modified') ].data()
- yield files.coUploadFile( settings.provider_name, self.xmlParentNode.get('type'), self.url, data, last_modified )
- log.debug( 'FilePage.coFullLoadParse(): %d bytes of %s saved.' % (len(data), self.url) )
- elif str(status) == '304':
- log.info( 'FilePage.coFullLoadParse(): %s not modified since %s' % (self.url, last_modification) )
- elif str(status) == '404':
- log.info( bold('FilePage.coFullLoadParse(): 404 - %s' % self.url) )
- else:
- log.error( bold('FilePage.coFullLoadParse(): %s download error %s' % (self.url, str(status))) )
- raise Exception( 'Unknown %s status %s' % (self.url, str(status)) )
- del browser
- yield Return( etree.Element('page'), set() )
- class ChildException( Exception ):
- pass
- # Вызывает сосание страниц, обеспечивает параллельность засоса
- # и обработку ошибочик.
- class BaseLoader:
- # Конструирует и возвращает корневую страницу
- def createRootPage( self, mode, xmlParentNode ):
- raise Exception( 'Implement please..' )
- # нужно ли скачивать страничку в режиме checkHistory?
- def isPageForCheckHistory( self, page ):
- raise Exception( 'Implement please..' )
- # корректные ли данные в режиме quick checkHistory?
- def isParserValid( self, xml ):
- raise Exception( 'Implement please..' )
- #-------------------------хранение----------------------------------#
- class FullPage:
- def __init__( self, page, selfId, parentID):
- self.page = page
- self.parentId = parentId
- self.selfId = selfId
- # def saveToFile
- #-------------------------------------------------------------------#
- #---------------------------------------------------------------------------------#
- def cpProcessPage( self, rootPage ):
- #id страницы (1 - id страницы провайдера)
- pageId = 2
- #очередь из тех страничек, которые нам нужно высосать
- pagesQweue = Queue()
- firstPage = fullPage( rootPage, pageID, 1 )
- pagesQueue.put( firstPage )
- del firstPage
- while (not pagesQueue.empty()):
- fullPage = pagesQueue.get()
- #---------------------------------------------------------------------------------#
- # очень рекурсивная функция высасывания страницы
- # и всех страниц внутри
- def coProcessPage( self, page, recursion = 1 ):
- # слишком глубоко вошли?
- if recursion > settings.max_recursion:
- raise Exception( 'Too deep recursion: %s calls...' % recursion )
- pageUrl = page.url
- # пытаемся несколько раз сосать сраничичику, которую нам дали
- i = 1
- while True:
- try:
- xmlData, pages = yield page.coLoadParse()
- # Удачно скачали!
- break
- except Exception, e:
- msg = "coProcessPage( recursion: %d ): попытка %d %s не смог высосать %s!\n\n%s" % \
- (recursion, i, page, page.url, QCoreApplication.instance().scheduler.formatException() )
- log.error( bold(msg) )
- # если ошибка из-за смерти WebkitRpc, то не будем увеличивать попытку!
- if isinstance( e, RpcServerDisconnected ):
- log.debug( 'coProcessPage( recursion: %d ): не увеличиваю попытку, потому-что ошибка вызвана RpcServerDisconnected!' % recursion )
- # будем спать случайный промежуток времени, чтобы одновременно все не ломанулись
- # на перестартовавший webkitrpc
- sleepSecs = settings.webkitrpc_sleep + randint( 0, settings.webkitrpc_sleep_rnd )
- log.debug( 'coProcessPage( recursion: %d ): cплю %d секунд' % (recursion, sleepSecs) )
- yield Sleep( 1000 * sleepSecs )
- continue
- # поскольку Sleep принимает int, немного извратитмся, чтобы не переполнялось..
- # экспоненциально увеличиваем задержку с номером попытки
- log.debug( "coProcessPage( recursion: %d ): попытка %d, ложусь спать на %d секунд..." % (recursion, i, (2 ** i) * settings.error_sleep_secs) )
- for j in range( 2 ** i ):
- yield Sleep( settings.error_sleep_secs * 1000 )
- # если последняя попытка - нет смысла качать все остальное...
- i += 1
- if i > settings.retries:
- raise ChildException( 'No more retries')
- log.debug( bold('coProcessPage( recursion: %d ): %s - %d subpages...\n\n%s' % (recursion, pageUrl, len(pages), etree.tostring(xmlData, pretty_print = True))) )
- childCount = len(pages)
- # Если нам вернули непустой XML
- isXmlEmpty = True
- for i in xmlData.iterchildren():
- isXmlEmpty = False
- break
- if not isXmlEmpty:
- # В зависимости от того, как создали страничку,
- # добавимся к родителю, или заменим его
- if page.replaceParent:
- parent = page.xmlParentNode.getparent()
- parent.remove( page.xmlParentNode )
- else:
- parent = page.xmlParentNode
- # поскольку у корневого тэга <page>
- # может быть несколько детей, например,
- # categories, products - нужно добавлять все...
- for child in xmlData.iterchildren():
- parent.append( child )
- # страница больше не нужна - экономим память
- del page
- # Теперь параллельно сасаем странички, который она вернула.
- # Но не более, чем settings.max_parallel_tasks
- running = set()
- while pages or running:
- # сколько запускать?
- tasksToRun = min( len(pages), settings.max_parallel_tasks - len(running) )
- log.debug( bold('coProcessPage( recursion: %d ): tasksToRun: %d, max_parallel_tasks: %d, pages: %d, running: %d' % (recursion, tasksToRun, settings.max_parallel_tasks, len(pages), len(running))) )
- while tasksToRun:
- tasksToRun -= 1
- page = pages.pop()
- # если уже сосется такая, то не нужно качать
- if page.skip:
- log.debug( bold('coProcessPage( recursion: %d ): %s - skip, becouse already in BasePage.urls...' % (recursion, page.url)) )
- continue
- # если режим проверки парсера, возможно эту страницу грузить не нужно
- # if self.checkHistory and not self.isPageForCheckHistory(page):
- # log.debug( bold('coProcessPage( recursion: %d ): %s - skip in checkHistory mode...' % (recursion, page.url)) )
- # continue
- t = QCoreApplication.instance().scheduler.newTask( self.coProcessPage(page, recursion + 1) )
- t.setEmitUnhandled()
- log.debug( bold('coProcessPage( recursion: %d ): started %s, task %s' % (recursion, page, t)) )
- running.add( t )
- yield Sleep( settings.requests_delay )
- # Возможно мы пропустили все страницы
- if not running:
- continue
- # Подождем, пока хоть кто-нибудь, хоть что-нибудь дасасет...
- t = yield WaitFirstTask( running, settings.max_worker_timeout )
- # Timeout?
- if not t:
- raise Exception( '%d workers downloading too long. %d secs...' % (len(running), settings.max_worker_timeout / 1000) )
- running.remove( t )
- # Дочка не может высосаться!?
- if t.state == Task.EXCEPTION:
- if isinstance( t.exception.orig, ChildException ):
- log.error( bold('coProcessPage( recursion: %d ): %s (%d children) - child exception!' % (recursion, pageUrl, childCount)) )
- else:
- log.exception( bold('coProcessPage( recursion: %d ): %s (%d children) exception:\n\n%s' % \
- ( recursion, pageUrl, childCount, t.exception )) )
- # Выпадаем наверх!
- raise ChildException( 'Please grep log for child the exceptions above!' )
- # Создать корневую страницу и высасать (используется из coLoad)
- def __coLoadParse( self ):
- resultXml = etree.Element( 'provider' )
- # определим валюты и имя магазина
- resultXml.set( 'name', settings.provider_name )
- resultXml.set( 'currency', settings.provider_currency )
- resultXml.set( 'id', 1 )
- # Создаем объект корневой страницы в зависимости от режима
- rootPage = self.createRootPage( resultXml )
- # Сосем магаз в режиме проверки парсера...
- yield self.coProcessPage( rootPage )
- yield Return( resultXml )
- # Вернет XML для диффера
- # mode - QUICK / FULL
- def coLoad( self, mode ):
- self.mode = mode.lower()
- assert self.mode == 'full' or self.mode == 'quick', 'Please provide mode <quick> or <full> instead of %s' % self.mode
- # Сосем в режим проверки парсера
- self.checkHistory = True
- resultXml = yield self.__coLoadParse()
- # Если с коммандной строки передали debug
- # просто напечатаем ответ, чтобы посмотреть результат
- if len(sys.argv) > 3 and sys.argv[ 3 ] == 'debug':
- yield Return( resultXml )
- if not self.isParserValid( resultXml ):
- raise Exception( 'Parser fails checkHistory test' )
- # Полностью сосем магаз
- self.checkHistory = False
- # Чистим кэш скачанных URL'ов
- BasePage.urls = set()
- resultXml = yield self.__coLoadParse()
- yield Return( resultXml )
Add Comment
Please, Sign In to add comment