Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import time
- class human:
- def __init__(self, name):
- """ """
- self.name = name
- self.time = 2
- async def execute_async(self):
- """main coroutine"""
- self.execute()
- asyncio.sleep(self.time)
- return "Success! : {}".format(self.name)
- def execute(self):
- """ """
- print(self.name)
- async def main_async(coros, loop):
- count = 0
- for futures in asyncio.as_completed(coros):
- print(await futures)
- coros.append()
- if(count==0):
- loop.create_task(human("new object 5").execute_async())
- count = 1
- def main():
- people = [human("object {}".format(i)) for i in range(5)]
- print("\nTesting Asynchronous Execution:")
- coros = [person.execute_async() for person in people]
- loop = asyncio.get_event_loop()
- loop.run_until_complete(main_async(coros, loop))
- print("Testing Completed")
- loop.close()
- if __name__ == '__main__':
- main()
Add Comment
Please, Sign In to add comment