Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from steem.blockchain import Blockchain
- from steem import Steem
- from steem.post import Post
- import threading, time, sys, os, queue, json
- import matplotlib.pyplot as plt
- # minimal difference between two consecutive measurements needed to consider the solution as optimal
- # defaultly it's 5%,
- STOP_CONDITION = 1.05
- BLOCK_COUNT = 1000
- MAX_THREADS = 25
- class myThread (threading.Thread):
- def __init__(self, thread_id, start_block, end_block, n, blockchain, workQueue, queueLock):
- threading.Thread.__init__(self)
- self.thread_id = thread_id
- self.start_block = start_block
- self.end_block = end_block
- self.n = n
- self.blockchain = blockchain
- self.stream = self.blockchain.stream_from(start_block=start_block, end_block=end_block)
- self.current_block = self.start_block
- self.workQueue = workQueue
- self.queueLock = queueLock
- print (self.thread_id, self.start_block, self.end_block)
- def run(self):
- data = {}
- for post in self.stream:
- if post['block'] != self.current_block:
- percentage = (self.current_block-self.start_block)/self.n*100
- print("Thread {} is at block {}/{} {:.2f}%".format(self.thread_id,post['block'],self.end_block, percentage))
- self.current_block = post['block']
- operation = post['op'][0]
- if operation not in data:
- data[operation] = 1
- else:
- data[operation] += 1
- self.queueLock.acquire()
- self.workQueue.put(data)
- self.queueLock.release()
- def run():
- # each iteration calculates the same part of blockchain, to provide reliable results
- blockchain = Blockchain()
- head_block = blockchain.get_current_block_num()
- execution_times = []
- last_elapsed_time = sys.float_info.max
- i = 1
- while True:
- start_time = time.time()
- start = head_block-BLOCK_COUNT
- amount_of_threads = i
- n = int(BLOCK_COUNT/amount_of_threads)
- threads = []
- queueLock = threading.Lock()
- workQueue = queue.Queue(amount_of_threads)
- for x in range(0, amount_of_threads-1):
- thread = myThread(x, start, start+ n-1, n, blockchain, workQueue, queueLock)
- thread.start()
- threads.append(thread)
- start = start + n
- # last thread can calculate one more block
- thread = myThread(amount_of_threads-1, start, head_block, head_block - start, blockchain, workQueue, queueLock)
- thread.start()
- threads.append(thread)
- print()
- for t in threads:
- t.join()
- merged_data = {}
- while not workQueue.empty():
- data = workQueue.get()
- for key in data:
- if key not in merged_data:
- merged_data[key] = data[key]
- else:
- merged_data[key] += data[key]
- # uncomment following lines to display statistics
- #for operation in merged_data:
- # print (operation, merged_data[operation])
- elapsed_time = time.time() - start_time
- execution_times.append(elapsed_time)
- if not last_elapsed_time == sys.float_info.max:
- print("Adding this thread has decreased calculation time by {}%".format((last_elapsed_time/elapsed_time - 1.0)*100))
- if last_elapsed_time/elapsed_time < STOP_CONDITION:
- print("Optimal amount of threads for your CPU is {}".format(i-1))
- break
- if i == MAX_THREADS:
- print("it seems that execution time can't stop decreasing, that's interesting!")
- break
- i += 1
- last_elapsed_time = elapsed_time
- plt.plot(range(1,len(execution_times)+1),execution_times)
- plt.ylabel('execution time [s]')
- plt.xlabel('amount of threads')
- plt.show()
- if __name__ == '__main__':
- run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement