Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- from typing import List
- from typing import Tuple
- JEDI = 1
- SITH = 2
- class Plant:
- """Завод световых мечей."""
- def __init__(self, orders) -> None:
- self.loop = asyncio.get_event_loop()
- self.swords_jedi = asyncio.Queue(loop=self.loop)
- self.swords_sith = asyncio.Queue(loop=self.loop)
- self._orders = orders
- async def create_sword(self, orders: List[Tuple[int, int]]) -> None:
- """Метод производства мечей."""
- for order in orders:
- number_of_swords, customer = order
- for _ in range(number_of_swords):
- if customer is JEDI:
- self.swords_jedi.put_nowait(1)
- print('create sword jedi')
- else:
- self.swords_sith.put_nowait(2)
- print('create sword sith')
- await asyncio.sleep(0.1)
- async def sith_delivery(self) -> None:
- """Метод доставки мечей ситхам."""
- while True:
- await self.swords_sith.get()
- await asyncio.sleep(1)
- print('delivered sith sword')
- self.swords_sith.task_done()
- async def jedi_delivery(self, courier: int) -> None:
- """ Метод доставки мечей Джедаем."""
- while True:
- await self.swords_jedi.get()
- await asyncio.sleep(5)
- print(f'courier №{courier} - delivered jedi sword')
- self.swords_jedi.task_done()
- async def __get_tasks(self) -> None:
- """Задаем карутины."""
- tasks = [
- self.create_sword(orders),
- self.sith_delivery()
- ]
- for i in range(5):
- tasks.append(self.jedi_delivery(i))
- await asyncio.gather(*tasks)
- def run(self) -> None:
- """Запускаем завод."""
- print('start')
- self.loop.run_until_complete(self.__get_tasks())
- if __name__ == "__main__":
- orders = [
- (10, 1),
- (5, 2),
- (10, 1),
- (7, 1),
- (7, 2)
- ]
- plant = Plant(orders)
- plant.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement