Artemii_Kravtsov

Untitled

Sep 23rd, 2021
699
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1.     def balancing(self, info_pack):
  2.         splash_concurrency = self.splash_concurrency   # потолок одновременных запросов в splash
  3.         splash_balance = splash_concurrency + 20       # сколько одновременных запросов буду поддерживать
  4.                                                        # (с запасом в 20 запросов)
  5.        
  6.         # передаёт воркеру, на какое время засыпать, если в очереди закончились элементы
  7.         self.worker_instance.time_until_future_is_done = info_pack['time_outside']
  8.  
  9.         # Если из info_pack следует, что почтальону запрещено делать более 500 запросов одновременно,
  10.         # но при этом к моменту, когда эти запросы начнут возвращаться с ответом, вычислитель успеет
  11.         # обработать 2000 запросов, будем считать, что вычислитель работает быстрее почтальона
  12.         if info_pack['limit_to_simultaneous_queries'] > info_pack['when_its_time_to_awake_postman']:
  13.             worker_is_faster = False
  14.         else:
  15.             worker_is_faster = True
  16.  
  17.         # если воркер работает быстрее, то задача заключается в том, чтобы постоянно поддерживать
  18.         # в Splash столько запросов, сколько Splash может обрабатывать одновременно. Привязываться
  19.         # к воркеру в этом случае не нужно, очередь будет рассасываться быстрее, чем накапливаться.
  20.         # Было бы плохим решением рассылать запросы тогда, когда кончается очередь к воркеру - в этом
  21.         # случае программа постоянно простаивала бы n секунд до появления первых ответов.
  22.         if worker_is_faster:
  23.             # количество необработанных вычислителем ответов в случае, когда воркер работает быстрее,
  24.             # можно оценить так:
  25.             in_queue_cnt = len(self.worker_instance.url_future_associations)
  26.  
  27.             # тех запросов, которые ложатся в очередь, должно быть столько, чтобы время ожидания
  28.             # последнего из них не превышало max_delay. чтобы рассчитать их количество,
  29.             # найдём скорость движения очереди до отправки запросов на сервер:
  30.             # за время, равное средней скорости ответа от сервера, от сервера будет возвращаться
  31.             # количество ответов, равное количеству потоков в Splash. А за 1 секунду - по формуле:
  32.             queue_speed = splash_concurrency / info_pack['time_until_response']
  33.  
  34.             # если мы стараемся поддерживать splash_balance на одном уровне, то из баланса надо вычесть
  35.             # число элементов, уже находящихся в очереди (чтобы получить представление, как сильно
  36.             # мы отклонились от баланса). Если почтальон спит, то делается оценка того,
  37.             # сколько запросов останется в очереди, когда он проснётся
  38.             time_until_postman_is_awake = time() - self.postman_instance.next_time_postman_is_awake
  39.             if time_until_postman_is_awake > 0:
  40.                 in_queue_cnt = in_queue_cnt - (time_until_postman_is_awake * queue_speed)
  41.             balance_deviation = splash_balance - in_queue_cnt
  42.            
  43.             # если в очереди 20 запросов, а splash работает в 60 потоков, значит,
  44.             # 40 следующих запросов сразу же будут отправлены на сервер, а остальные лягут
  45.             # в очередь. unused_concurrency показывает, сколько значений сразу же будут отправлены
  46.             unused_concurrency = max(0, splash_concurrency - in_queue_cnt)
  47.  
  48.             # если в секунду из очереди выходит 35 готовых фьючерсов, а максимальное время ожидания
  49.             # в очереди для запроса было установлено на уровне в 50 секунд, то почтальону
  50.             # можно отправить 35 * 50 запросов, и в следующий раз проснуться через 50 секунд
  51.             cnt_send_regularly = info_pack['max_delay'] * queue_speed
  52.  
  53.             # balance_deviation может быть отрицательным. Если к моменту, когда проснётся почтальон,
  54.             # в очереди, будет 150 урлов при балансе в 100, то balance_deviation будет равен -50,
  55.             # и почтальон отправит меньше запросов, чем
  56.             cnt_send_once = cnt_send_regularly + unused_concurrency + balance_deviation
  57.  
  58.             #  35 * 50 = 1750. Если в какой-то момент в очереди, откуда почтальон
  59.             # берёт урлы, урлов меньше, чем 1750, то проснутся нужно раньше, чем через max_delay -
  60.             # нужно на месте рассчитать, через какое время будет достигнут 'splash_balance'.
  61.             # а если урлов в наличии даже меньше, чем должно быть фьючерсов на балансе, то
  62.             # время сна выйдет отрицательным числом - в этом случае его можно просто поменять на 0.1
  63.             timeout_calculator = lambda urls_cnt: max(0.1, (urls_cnt - balance_deviation) / queue_speed)
  64.            
  65.             # Возвращает словарь, которым будет пользоваться почтальон - будет отправлять cnt_send_once
  66.             # запросов за одну проходку и засыпать на время, равное max_delay. А если в очереди меньше
  67.             # урлов, чем cnt_send_once, то отправит все, и заснёт на время, которое сможет
  68.             # рассчитать, если вызовет функцию timeout_calculator и подставит в неё количество
  69.             # имеющихся урлов
  70.             return {'cnt_send_once': cnt_send_once, 'timeout_calculator': timeout_calculator}
  71.         else:
  72.             pass
RAW Paste Data