Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import aiohttp
- WORKERS_COUNT = 5
- df = None
- async download(session, url):
- async with session.get(url) as response:
- assert response.status == 200
- response_text = await response.read()
- return {
- 'status_code': response.status_code,
- ...
- }
- async process_data(row_index. session):
- url = df.loc[index, 'url']
- data = await download(session, url)
- df.loc[index, 'status_code'] = data.get('status_code')
- ...
- async def worker(q, session):
- while True:
- row_index = await q.get()
- # todo: остановиться если очередь пустая
- await process_data(row_index)
- async def main(loop):
- global df
- df = pd.read_pickle('with_sent.pkl')
- # инициализировать данные
- ...
- q = asyncio.Queue()
- for i, _ in df.iterrows():
- await q.put(i)
- with aiohttp.ClientSession(loop=loop) as session:
- tasks = [
- loop.create_task(worker(q, session))
- for _ in range(WORKERS_COUNT)
- ]
- await asyncio.wait(tasks)
- # сохранить данные
- df.to_pickle('output.pkl')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement