Advertisement
Guest User

Untitled

a guest
Mar 23rd, 2020
194
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.25 KB | None | 0 0
  1. import asyncio
  2. from typing import List
  3. from typing import Tuple
  4.  
  5.  
  6. JEDI = 1
  7. SITH = 2
  8.  
  9.  
  10. class Plant:
  11.     """Завод световых мечей."""
  12.  
  13.     def __init__(self, orders) -> None:
  14.         self.loop = asyncio.get_event_loop()
  15.         self.swords_jedi = asyncio.Queue(loop=self.loop)
  16.         self.swords_sith = asyncio.Queue(loop=self.loop)
  17.         self._orders = orders
  18.  
  19.     async def create_sword(self, orders: List[Tuple[int, int]]) -> None:
  20.         """Метод производства мечей."""
  21.         for order in orders:
  22.             number_of_swords, customer = order
  23.             for _ in range(number_of_swords):
  24.                 if customer is JEDI:
  25.                     self.swords_jedi.put_nowait(1)
  26.                     print('create sword jedi')
  27.                 else:
  28.                     self.swords_sith.put_nowait(2)            
  29.                     print('create sword sith')
  30.                 await asyncio.sleep(0.1)
  31.  
  32.     async def sith_delivery(self) -> None:      
  33.         """Метод доставки мечей ситхам."""  
  34.         while True:
  35.             await self.swords_sith.get()
  36.             await asyncio.sleep(1)
  37.             print('delivered sith sword')
  38.             self.swords_sith.task_done()
  39.                    
  40.     async def jedi_delivery(self, courier: int) -> None:
  41.         """ Метод доставки мечей Джедаем."""
  42.         while True:
  43.             await self.swords_jedi.get()
  44.             await asyncio.sleep(5)
  45.             print(f'courier №{courier} - delivered jedi sword')
  46.             self.swords_jedi.task_done()
  47.  
  48.  
  49.     async def __get_tasks(self) -> None:
  50.         """Задаем карутины."""
  51.         tasks = [
  52.             self.create_sword(orders),
  53.             self.sith_delivery()            
  54.         ]
  55.         for i in range(5):
  56.             tasks.append(self.jedi_delivery(i))
  57.         await asyncio.gather(*tasks)
  58.        
  59.     def run(self) -> None:
  60.         """Запускаем завод."""
  61.         print('start')
  62.         self.loop.run_until_complete(self.__get_tasks())
  63.        
  64.  
  65. if __name__ == "__main__":
  66.     orders = [
  67.     (10, 1),
  68.     (5, 2),
  69.     (10, 1),
  70.     (7, 1),
  71.     (7, 2)
  72.     ]    
  73.     plant = Plant(orders)
  74.     plant.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement