Guest User

Untitled

a guest
Sep 25th, 2017
72
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 20.06 KB | None | 0 0
  1. # -*- coding: UTF-8 -*-
  2.  
  3. import sys
  4. import logging
  5. import base64
  6. from random import randint
  7. from satchmo.common_lib.coroutines import Return, WaitFirstTask, Sleep, Task
  8. from differ import files
  9. from satchmo.common_lib.webkitrpc import BrowserProxy, getRpcClient
  10. from satchmo.common_lib.webkit import Browser
  11. from satchmo.common_lib.colorer import bold
  12. from satchmo.common_lib.corpc import RpcServerDisconnected
  13. from PyQt4.QtCore import QCoreApplication, QByteArray, QRegExp
  14. from PyQt4.QtNetwork import QNetworkRequest
  15. from lxml import etree
  16. #-------------------------------------------------------------------#
  17. from Queue import Queue
  18. #-------------------------------------------------------------------#
  19.  
  20. settings = None
  21.  
  22.  
  23. log = logging.getLogger( 'loader' )
  24.  
  25.  
  26.  
  27.  
  28. class BaseLoaderSettings:
  29.     # Очень большая задержка на максимальное время работы coProcessPage
  30.     # Поскольку он может глубоко уходить в рекурсию
  31.     max_worker_timeout = 6 * 3600 * 1000 # 6 часов
  32.    
  33.     # задержка между параллельными запросами
  34.     requests_delay = 200 # msecs
  35.  
  36.     # Максимальное кол-во параллельных сопрограмм на одном уровне рекурсии
  37.     # в BaseLoader.coProcessPage
  38.     max_parallel_tasks = 3
  39.  
  40.     # сколько раз пытаемся пересасывать страничку?
  41.     retries = 5
  42.    
  43.     # Увеличенный timeout на binary объекты
  44.     binary_browser_timeout = 5 * 60 # 5 минут
  45.     # первая экспоненциальная задержка между попытками
  46.     error_sleep_secs = 10 # секунд
  47.  
  48.     # максимальная вложенность BaseLoader.coProcessPage (не совсем вложенность, при реализации paginator'ов может быть линейно)
  49.     max_recursion = 100
  50.  
  51.     # Таймауты и время жизни заявок после выполнения для различных типов
  52.     # RedisRPC запросов (в секундах)
  53.     # Получение даты модификации файлов
  54.     get_files_info_timeout = 120
  55.     get_files_info_ttl     = 15
  56.  
  57.     # Загрузка файла
  58.     upload_file_timeout   = 120
  59.     upload_file_ttl       = 15
  60.  
  61.     # Обновление информации о товарах и категориях
  62.     update_timeout        = 1800
  63.     update_ttl            = 24 * 3600
  64.  
  65.     # Минимальное время засыпания в случае падения webkitrpc
  66.     webkitrpc_sleep       = 15 # секунд
  67.     # Случайный 'досып'
  68.     webkitrpc_sleep_rnd   = 10 # секунд
  69.  
  70.  
  71. # Базовый класс странички какого-нибудь магазина.
  72. # В детях имплементируется парсинг интересных данных в XML и создание
  73. # других страниц, которые нужно грузить и парсить.
  74. #
  75. # СТАРАЙТЕСЬ ЧТОБЫ 1 СТРАНИЧКА НЕ СОСАЛА БОЛЕЕ 1 GET'a
  76. # Переходы по PAGINATOR'ам - отдельными страничками!
  77. #
  78. # При реализации, нужно учитывать, что при обработке ошибок
  79. # страницу просят скачаться несколько раз. Т.е. она должна быть 'stateless'.
  80. #
  81. # mode - full / quick - полный или быстрый режим засоса
  82. class BasePage:
  83.     # Для избежания выкачивания URL'a несколько раз
  84.     urls = set()
  85.  
  86.     # URL однозначно определяет страницу, как PRIMARY KEY
  87.     # Поэтому, чтобы не скачивать НЕСКОЛЬКО РАЗ, мы проверяем
  88.     # нет ли уже созданной странички с таким URL'ом.
  89.     # Если уже есть, то мы помечаем, что сосать НЕ нужно
  90.     # и цикл в coProcessPage не будет сосать страничку как дочернюю
  91.     #
  92.     # replaceParent - означает не добавлять xml результат к родителю, а заменить родителя
  93.     #                 Актуально, например, если грузится продукт с одним и тем же ext_id
  94.     #                 Удобно давать родителя как тэк <product> и выставлять replaceParent = True
  95.     def __init__( self, mode, url, xmlParentNode, cookies = None, replaceParent = False ):
  96.         if url in BasePage.urls:
  97.             log.debug( bold('BasePage.__init__(): %s already in BasePage.urls!' % url) )
  98.             self.skip = True
  99.         else:
  100.             BasePage.urls.add( url )
  101.             self.skip = False
  102.             log.debug( bold('BasePage.__init__(): %s added to BasePage.urls' % url) )
  103.  
  104.         self.mode = mode
  105.         self.url = url
  106.         self.xmlParentNode = xmlParentNode
  107.         self.cookies = cookies
  108.         self.replaceParent = replaceParent
  109.  
  110.  
  111.     # Переопределяется в дочерних классах.
  112.     # Возвращяет tuple ( xmlData, set(pages) )
  113.     # pages - список страниц, которые нужно грузить
  114.     def coLoadParse( self ):
  115.         yield
  116.         raise Exception( 'Implement please..' )
  117.  
  118.  
  119.  
  120. # Высасывание файлов одинаковое для всех.
  121. #
  122. # URL и тип файла берется из родительского XML'a.
  123. # Он должен иметь тэг file!
  124. # browserProxy -- выбор браузера -- по умолчанию - BrowserProxy, при False - просто Browser
  125. class FilePage( BasePage ):
  126.     def __init__( self, mode, url, xmlParentNode, cookies = None, rpcClient = None, browserProxy = True ):
  127.         BasePage.__init__( self, mode, url, xmlParentNode, cookies )
  128.  
  129.         if (not rpcClient) and (browserProxy):
  130.             rpcClient = getRpcClient()
  131.  
  132.         self.browserProxy = browserProxy
  133.         self.rpcClient = rpcClient
  134.  
  135.     def coLoadParse( self ):
  136.         # пропускаем картинки в quick режиме..
  137.         if self.mode == 'quick':
  138.             yield Return( etree.Element('page'), set() )
  139.            
  140.         browser = (BrowserProxy( timeoutSecs = settings.binary_browser_timeout, saveBinary = 1, rpcClient = self.rpcClient )) if (self.browserProxy) else (Browser( timeoutSecs = settings.binary_browser_timeout, saveBinary = 1 ));
  141.  
  142.         if self.cookies:
  143.             browser.setCookies( self.cookies )
  144.  
  145.         # Узнаем дату модификации у differ'a
  146.         last_modification = files.getFileInfo( settings.provider_name, self.url )
  147.         log.debug( 'FilePage.coFullLoadParse(): %s last_modification %s' % (self.url, last_modification) )
  148.         if last_modification:
  149.             # Установим флажек, чтобы browser добавил к header'у if-modified-sience
  150.             yield browser.setIfModified( last_modification )
  151.  
  152.         # Сосем картинку или получаем ответ, что не изменилась
  153.         # Поскольку могут быть разные redirect'ы - все сосем
  154.         yield browser.get( self.url, '1', QRegExp( '.+' ) )
  155.  
  156.         # Проверим, что сам файл не выдал 404
  157.         attributes = (yield browser.getAttributes( self.url )) if (self.browserProxy) else (browser.getAttributes( self.url ))      
  158.         status = attributes[ QNetworkRequest.HttpStatusCodeAttribute ].toString()
  159.  
  160.         if str(status) == '404':
  161.             log.error( bold('FilePage.coFullLoadParse(): %s file not found %s' % (self.url, str(status))) )
  162.             self.xmlParentNode.set( 'error', 'HTTP STATUS 404' )
  163.             yield Return( etree.Element('page'), set() )
  164.  
  165.         # Берем результаты последнего QNetworkRequest'a
  166.         data = (yield browser.getBinaryData()) if (self.browserProxy) else (browser.getBinaryData())
  167.         headers = (yield browser.getHeaders()) if (self.browserProxy) else (browser.getHeaders())
  168.         attributes = (yield browser.getAttributes()) if (self.browserProxy) else (browser.getAttributes ())
  169.         status = attributes[ QNetworkRequest.HttpStatusCodeAttribute ].toString()
  170.  
  171.         if str(status) == '200':
  172.             # если файл пустой
  173.             if not data:
  174.                 self.xmlParentNode.set( 'error', 'EMPTY REPLY' )
  175.                 (yield Return(etree.Element('page'), set())) if (self.browserProxy) else (Return( etree.Element('page'), set() ))
  176.  
  177.             last_modified = headers[ QByteArray('Last-Modified') ].data()
  178.  
  179.             yield files.coUploadFile( settings.provider_name, self.xmlParentNode.get('type'), self.url, data, last_modified )
  180.             log.debug( 'FilePage.coFullLoadParse(): %d bytes of %s saved.' % (len(data), self.url) )
  181.         elif str(status) == '304':
  182.             log.info( 'FilePage.coFullLoadParse(): %s not modified since %s' % (self.url, last_modification) )
  183.         elif str(status) == '404':
  184.             log.info( bold('FilePage.coFullLoadParse(): 404 - %s' % self.url) )
  185.         else:
  186.             log.error( bold('FilePage.coFullLoadParse(): %s download error %s' % (self.url, str(status))) )
  187.             raise Exception( 'Unknown %s status %s' % (self.url, str(status)) )
  188.  
  189.         del browser
  190.  
  191.         yield Return( etree.Element('page'), set() )
  192.  
  193.  
  194.  
  195. class ChildException( Exception ):
  196.     pass
  197.  
  198.  
  199.  
  200. # Вызывает сосание страниц, обеспечивает параллельность засоса
  201. # и обработку ошибочик.
  202. class BaseLoader:
  203.     # Конструирует и возвращает корневую страницу
  204.     def createRootPage( self, mode, xmlParentNode ):
  205.         raise Exception( 'Implement please..' )
  206.  
  207.  
  208.     # нужно ли скачивать страничку в режиме checkHistory?
  209.     def isPageForCheckHistory( self, page ):
  210.         raise Exception( 'Implement please..' )
  211.  
  212.  
  213.     # корректные ли данные в режиме quick checkHistory?
  214.     def isParserValid( self, xml ):
  215.         raise Exception( 'Implement please..' )
  216.  
  217. #-------------------------хранение----------------------------------#
  218. class FullPage:
  219.     def __init__( self, page, selfId, parentID):
  220.         self.page = page
  221.         self.parentId = parentId
  222.         self.selfId = selfId
  223.  
  224.    
  225. #    def saveToFile
  226. #-------------------------------------------------------------------#
  227.  
  228. #---------------------------------------------------------------------------------#
  229.     def cpProcessPage( self, rootPage ):
  230.         #id страницы (1 - id страницы провайдера)
  231.         pageId = 2
  232.  
  233.         #очередь из тех страничек, которые нам нужно высосать
  234.         pagesQweue = Queue()
  235.  
  236.         firstPage = fullPage( rootPage, pageID, 1 )
  237.         pagesQueue.put( firstPage )
  238.         del firstPage
  239.  
  240.         while (not pagesQueue.empty()):
  241.             fullPage = pagesQueue.get()
  242.            
  243.                
  244.  
  245. #---------------------------------------------------------------------------------#
  246.  
  247.     # очень рекурсивная функция высасывания страницы
  248.     # и всех страниц внутри
  249.     def coProcessPage( self, page, recursion = 1 ):
  250.         # слишком глубоко вошли?
  251.         if recursion > settings.max_recursion:
  252.             raise Exception( 'Too deep recursion: %s calls...' % recursion )
  253.  
  254.         pageUrl = page.url
  255.  
  256.         # пытаемся несколько раз сосать сраничичику, которую нам дали
  257.         i = 1
  258.         while True:
  259.             try:
  260.                 xmlData, pages = yield page.coLoadParse()
  261.                
  262.                 # Удачно скачали!
  263.                 break
  264.             except Exception, e:
  265.                 msg = "coProcessPage( recursion: %d ): попытка %d %s не смог высосать %s!\n\n%s" % \
  266.                       (recursion, i, page, page.url, QCoreApplication.instance().scheduler.formatException() )
  267.                 log.error( bold(msg) )
  268.  
  269.                 # если ошибка из-за смерти WebkitRpc, то не будем увеличивать попытку!
  270.                 if isinstance( e, RpcServerDisconnected ):
  271.                     log.debug( 'coProcessPage( recursion: %d ): не увеличиваю попытку, потому-что ошибка вызвана RpcServerDisconnected!' % recursion )
  272.                     # будем спать случайный промежуток времени, чтобы одновременно все не ломанулись
  273.                     # на перестартовавший webkitrpc
  274.                     sleepSecs = settings.webkitrpc_sleep + randint( 0, settings.webkitrpc_sleep_rnd )
  275.                     log.debug( 'coProcessPage( recursion: %d ): cплю %d секунд' % (recursion, sleepSecs) )
  276.                     yield Sleep( 1000 * sleepSecs )
  277.                     continue
  278.  
  279.                 # поскольку Sleep принимает int, немного извратитмся, чтобы не переполнялось..
  280.                 # экспоненциально увеличиваем задержку с номером попытки
  281.                 log.debug( "coProcessPage( recursion: %d ): попытка %d, ложусь спать на %d секунд..." % (recursion, i, (2 ** i) * settings.error_sleep_secs) )
  282.                 for j in range( 2 ** i ):
  283.                     yield Sleep( settings.error_sleep_secs * 1000 )
  284.  
  285.                 # если последняя попытка - нет смысла качать все остальное...
  286.                 i += 1
  287.                 if i > settings.retries:
  288.                     raise ChildException( 'No more retries')
  289.  
  290.  
  291.         log.debug( bold('coProcessPage( recursion: %d ): %s - %d subpages...\n\n%s' % (recursion, pageUrl, len(pages), etree.tostring(xmlData, pretty_print = True))) )
  292.  
  293.         childCount = len(pages)
  294.  
  295.         # Если нам вернули непустой XML
  296.         isXmlEmpty = True
  297.         for i in xmlData.iterchildren():
  298.             isXmlEmpty = False
  299.             break
  300.  
  301.         if not isXmlEmpty:
  302.             # В зависимости от того, как создали страничку,
  303.             # добавимся к родителю, или заменим его
  304.             if page.replaceParent:
  305.                 parent = page.xmlParentNode.getparent()
  306.                 parent.remove( page.xmlParentNode )
  307.             else:
  308.                 parent = page.xmlParentNode
  309.  
  310.             # поскольку у корневого тэга <page>
  311.             # может быть несколько детей, например,
  312.             # categories, products - нужно добавлять все...
  313.             for child in xmlData.iterchildren():
  314.                 parent.append( child )
  315.        
  316.         # страница больше не нужна - экономим память
  317.         del page
  318.  
  319.         # Теперь параллельно сасаем странички, который она вернула.
  320.         # Но не более, чем settings.max_parallel_tasks
  321.         running = set()
  322.         while pages or running:
  323.             # сколько запускать?  
  324.             tasksToRun = min( len(pages), settings.max_parallel_tasks - len(running) )
  325.             log.debug( bold('coProcessPage( recursion: %d ): tasksToRun: %d, max_parallel_tasks: %d, pages: %d, running: %d' % (recursion, tasksToRun, settings.max_parallel_tasks, len(pages), len(running))) )
  326.  
  327.             while tasksToRun:
  328.                 tasksToRun -= 1
  329.  
  330.                 page = pages.pop()
  331.  
  332.                 # если уже сосется такая, то не нужно качать
  333.                 if page.skip:
  334.                     log.debug( bold('coProcessPage( recursion: %d ): %s - skip, becouse already in BasePage.urls...' % (recursion,  page.url)) )
  335.                     continue
  336.  
  337.                 # если режим проверки парсера, возможно эту страницу грузить не нужно
  338. #                if self.checkHistory and not self.isPageForCheckHistory(page):
  339. #                    log.debug( bold('coProcessPage( recursion: %d ): %s - skip in checkHistory mode...' % (recursion,  page.url)) )
  340. #                    continue
  341.  
  342.                 t = QCoreApplication.instance().scheduler.newTask( self.coProcessPage(page, recursion + 1) )
  343.                 t.setEmitUnhandled()
  344.  
  345.                 log.debug( bold('coProcessPage( recursion: %d ): started %s, task %s' % (recursion, page, t)) )
  346.  
  347.                 running.add( t )
  348.                 yield Sleep( settings.requests_delay )
  349.  
  350.             # Возможно мы пропустили все страницы
  351.             if not running:
  352.                 continue
  353.  
  354.             # Подождем, пока хоть кто-нибудь, хоть что-нибудь дасасет...
  355.             t = yield WaitFirstTask( running, settings.max_worker_timeout )
  356.  
  357.             # Timeout?
  358.             if not t:
  359.                 raise Exception( '%d workers downloading too long. %d secs...' % (len(running), settings.max_worker_timeout / 1000) )
  360.  
  361.             running.remove( t )
  362.  
  363.             # Дочка не может высосаться!?
  364.             if t.state == Task.EXCEPTION:
  365.                 if isinstance( t.exception.orig, ChildException ):
  366.                     log.error( bold('coProcessPage( recursion: %d ): %s (%d children) - child exception!' % (recursion, pageUrl, childCount)) )
  367.                 else:
  368.                     log.exception( bold('coProcessPage( recursion: %d ): %s (%d children) exception:\n\n%s' % \
  369.                                    ( recursion, pageUrl, childCount, t.exception )) )
  370.                    
  371.                 # Выпадаем наверх!
  372.                 raise ChildException( 'Please grep log for child the exceptions above!' )
  373.  
  374.  
  375.     # Создать корневую страницу и высасать (используется из coLoad)
  376.     def __coLoadParse( self ):
  377.         resultXml = etree.Element( 'provider' )
  378.  
  379.         # определим валюты и имя магазина
  380.         resultXml.set( 'name', settings.provider_name )
  381.         resultXml.set( 'currency', settings.provider_currency )
  382.         resultXml.set( 'id', 1 )
  383.  
  384.         # Создаем объект корневой страницы в зависимости от режима
  385.         rootPage = self.createRootPage( resultXml )
  386.  
  387.         # Сосем магаз в режиме проверки парсера...
  388.         yield self.coProcessPage( rootPage )
  389.  
  390.         yield Return( resultXml )
  391.  
  392.  
  393.     # Вернет XML для диффера
  394.     # mode - QUICK / FULL
  395.     def coLoad( self, mode ):
  396.         self.mode = mode.lower()
  397.         assert self.mode == 'full' or self.mode == 'quick', 'Please provide mode <quick> or <full> instead of %s' % self.mode
  398.  
  399.         # Сосем в режим проверки парсера
  400.         self.checkHistory = True
  401.         resultXml = yield self.__coLoadParse()
  402.  
  403.         # Если с коммандной строки передали debug
  404.         # просто напечатаем ответ, чтобы посмотреть результат
  405.         if len(sys.argv) > 3 and sys.argv[ 3 ] == 'debug':
  406.             yield Return( resultXml )
  407.  
  408.         if not self.isParserValid( resultXml ):
  409.             raise Exception( 'Parser fails checkHistory test' )
  410.        
  411.         # Полностью сосем магаз
  412.         self.checkHistory = False
  413.         # Чистим кэш скачанных URL'ов
  414.         BasePage.urls = set()
  415.  
  416.         resultXml = yield self.__coLoadParse()
  417.  
  418.         yield Return( resultXml )
Add Comment
Please, Sign In to add comment