Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!usr/bin/env python3
- # Date: 02-07-18, fb ~ 7th 2018 | Synchronocy
- # Project: Pooling test
- # Tired as hell cbf fixing, just demonstrating that it works
- # IDLE Python 3.6. 64-bit
- #import requests
- import random
- import string
- from threading import Thread
- from queue import Queue
- class Worker(Thread):
- """
- Pooling
- """
- def __init__(self, tasks):
- Thread.__init__(self)
- self.tasks = tasks
- self.daemon = True
- self.start()
- def run(self):
- while True:
- func, args, kargs = self.tasks.get()
- try:
- func(*args, **kargs)
- except Exception as ex:
- pass
- finally:
- self.tasks.task_done()
- class ThreadPool:
- """
- Pooling
- """
- def __init__(self, num_threads):
- self.tasks = Queue(num_threads)
- for _ in range(num_threads):
- Worker(self.tasks)
- def add_task(self, func, *args, **kargs):
- """
- Add a task to be completed by the thread pool
- """
- self.tasks.put((func, args, kargs))
- def map(self, func, args_list):
- """
- Map an array to the thread pool
- """
- for args in args_list:
- self.add_task(func, args)
- def wait_completion(self):
- """
- Await completions
- """
- self.tasks.join()
- counter = 0
- count = counter + 1
- def random_string_generator(size=10, chars=string.ascii_lowercase + string.digits):
- a = ''.join(random.choice(chars) for _ in range(size))
- with open("b1g.txt","a") as handle:
- handle.write(a+"\n")
- handle.close()
- def apples():
- fn = 1
- fm = 1
- while True:
- print(fn)
- print(fm)
- fn = fn + fm
- fm = fm + fn
- random_string_generator()
- def main():
- apples()
- pool = ThreadPool(500)
- pool.add_task(apples)
- pool.wait_completion()
- if __name__ == '__main__':
- main()
Add Comment
Please, Sign In to add comment