Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ##Import statements
- ###################################################################################
- from flask import Flask, jsonify, request
- import unittest
- from flask_testing import TestCase
- import json, hashlib
- from collections import OrderedDict
- import time
- import logging,sys
- #################################################################################
- #Functions
- ####TO DO
- def mine_block():
- global new_transactions
- proof=0
- tx=coinbase_transaction()
- #new_transactions.append(tx)
- mining_candidate=f'{proof}{new_transactions}'
- #print(new_transactions)
- while hash_twice(mining_candidate)[:2] !="00":
- #print(hash_twice(mining_candidate)[:2])
- proof=proof+1
- mining_candidate=f'{proof}{new_transactions}'
- block_hash=hash_twice(mining_candidate)
- block_header=mining_candidate
- #Compute the Merkle Root Hash of the transactions
- Merk_Tree = Merkle_Tree()
- Merk_Tree.create_transaction_string(new_transactions)
- root_hash=Merk_Tree.create_tree()
- block=new_block(new_transactions,proof,chain[-1]["block_hash"],block_hash=block_hash,block_header=block_header,root_hash=root_hash)
- new_transactions=[]
- return block
- #END TO DO
- ################################
- # 1. Declare the class tree
- class Merkle_Tree:
- # 2. Initiate the class object
- def __init__(self,listoftransactions=None):
- self.listoftransactions = listoftransactions
- self.past_transaction = OrderedDict()
- # 3. Create the Merkle Tree
- def create_transaction_string(self, transactions):
- string_list=[]
- if len(transactions)==0:
- transactions=[{'sender': 'NULL', 'recipient': 'NULL', 'amount': 'NULL'}]
- # Convert transaction JSON into string, hash functions need strings
- for tx in transactions:
- string=""
- for odict in tx:
- string=string+json.dumps(odict)+":"+json.dumps(tx[odict])
- string_list.append(string)
- #return string_list
- self.listoftransactions=string_list
- # 3. Create the Merkle Tree
- def create_tree(self):
- # 3.0 Continue on the declaration
- listoftransactions = self.listoftransactions
- past_transaction = self.past_transaction
- temp_transaction = []
- # 3.1 Loop until the list finishes
- for index in range(0,len(listoftransactions),2):
- # 3.2 Get the most left element
- current = listoftransactions[index]
- # 3.3 If there is still index left get the right of the left most element
- if index+1 != len(listoftransactions):
- current_right = listoftransactions[index+1]
- # 3.4 If we reached the limit of the list then make a empty string
- else:
- current_right = ''
- # 3.5 Apply the Hash 256 function to the current values
- current_hash=hashlib.sha256(current.encode('utf-8'))
- #current_hash = hashlib.sha256(current)
- # 3.6 If the current right hash is not a '' <- empty string
- if current_right != '':
- #current_right_hash = hashlib.sha256(current_right)
- current_right_hash=hashlib.sha256(current_right.encode('utf-8'))
- # 3.7 Add the Transaction to the dictionary
- past_transaction[listoftransactions[index]] = current_hash.hexdigest()
- # 3.8 If the next right is not empty
- if current_right != '':
- past_transaction[listoftransactions[index+1]] = current_right_hash.hexdigest()
- # 3.9 Create the new list of transaction
- if current_right != '':
- temp_transaction.append(current_hash.hexdigest() + current_right_hash.hexdigest())
- # 3.01 If the left most is an empty string then only add the current value
- else:
- temp_transaction.append(current_hash.hexdigest())
- # 3.02 Update the variables and rerun the function again
- if len(listoftransactions) != 1:
- self.listoftransactions = temp_transaction
- self.past_transaction = past_transaction
- # 3.03 Call the function repeatly again and again until we get the root
- self.create_tree()
- return self.Get_Root_leaf()
- # 4. Return the past Transaction
- def Get_past_transaction(self):
- return self.past_transaction
- # 5. Get the root of the transaction
- def Get_Root_leaf(self):
- last_key = list(self.past_transaction.keys())[-1]
- return self.past_transaction[last_key]
- #######################
- def new_block(new_transactions,proof, previous_hash,block_hash,block_header,root_hash):
- if len(chain)==0:
- new_transactions=[]
- #self.new_transactions = []
- #index1 is the current block number, which is the number of blocks on the chain +1
- new_block = {
- 'index1': len(chain)+1,
- 'time': time.time(),
- 'proof_of_work': proof,
- 'previous_hash':previous_hash,
- 'transactions': new_transactions,
- 'transaction_hash_merkle_root':root_hash,
- 'version': version
- }
- new_block['block_hash']=block_hash
- new_block['block_header']=block_header
- # Reset the current list of transactions
- #new_transactions = []
- # Attach block to the current
- chain.append(new_block)
- #save_block(new_block)
- return new_block
- def hash_twice(input_dictionary):
- block_string = json.dumps(input_dictionary).encode()
- return hashlib.sha256(hashlib.sha256(block_string).hexdigest().encode()).hexdigest()
- def coinbase_transaction():
- tx=new_transaction("0", 'self', mining_reward)
- return tx
- def new_transaction(sender_public_key, recipient_public_key, amount_value):
- transaction={
- 'sender_public_key': sender_public_key,
- 'recipient_public_key': recipient_public_key,
- 'amount_value': amount_value,
- }
- transaction_string=json.dumps(transaction)
- hash=hashlib.sha256(transaction_string.encode()).hexdigest()
- global new_transactions
- new_transactions.append(transaction)
- #Add the hash value to the transaction object, and return it to the caller of the API
- transaction['transaction_hash']=hash
- return transaction
- def get_chain():
- return chain
- #####################################################################################
- ##Class Definition, to initiate out unit test
- class TestBlockchainAPI(TestCase):
- #Instantiate the FLask web app
- def create_app(self):
- app = Flask(__name__)
- @app.route('/mine', methods=['GET'])
- def mine():
- response=mine_block()
- return jsonify(response), 200
- return app
- def test_mine_block(self):
- with self.client:
- response = self.client.get('/mine')
- # Print the value of the response from the API end point
- log= logging.getLogger( "TestBlockchainAPI" )
- log.debug(' \r\r mining result is %r',response.json)
- # Test #1: Does the API return a code "200"?
- self.assertEqual(response.status_code, 200)
- #####################################################################################
- #Global variables
- version='1.0'
- mining_reward=50
- chain=[]
- new_transactions=[]
- #Add a genesis block, this is the first block of the chain
- new_block(new_transactions=[],previous_hash='0', proof=1,block_hash='',block_header='',root_hash='')
- #####################################################################################
- # Runs the tests.
- if __name__ == '__main__':
- #Prepare to log messages from the Tests
- logging.basicConfig( stream=sys.stderr )
- logging.getLogger( "TestBlockchainAPI" ).setLevel( logging.DEBUG )
- suite = unittest.TestLoader().loadTestsFromTestCase(TestBlockchainAPI)
- unittest.TextTestRunner(verbosity=4).run(suite)
- ######################################################################################
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement