Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- interacts with electrum daemon running on docker
- """
- import json
- from requests.exceptions import ConnectionError
- from jsonrpcclient.clients.http_client import HTTPClient
- def get_address_list():
- "Get list of wallet addresses"
- try:
- resp = client.request('listaddresses')
- except ConnectionError as e:
- resp = e
- return resp
- def get_balance():
- "Get balance of the whole wallet"
- try:
- resp = client.request('getbalance')
- except ConnectionError as e:
- resp = e
- return resp
- def is_mine(address):
- "Check if address is in hd wallet"
- try:
- resp = client.request('ismine', address)
- except ConnectionError as e:
- resp = e
- return resp
- def get_unused_addr():
- "Get an unused address, not a new address"
- try:
- resp = client.request('getunusedaddress')
- except ConnectionError as e:
- resp = e
- return resp
- def get_address_balance(address):
- "Get balance of any address"
- try:
- resp = client.request('getaddressbalance', address)
- except ConnectionError as e:
- resp = e
- return resp
- def get_address_history(address):
- try:
- resp = client.request('getaddresshistory', address)
- except ConnectionError as e:
- resp = e
- return resp
- def gettransaction(txn):
- try:
- resp = client.request('gettransaction', txn)
- except ConnectionError as e:
- resp = e
- return resp
- def decode_tnx_hex(txn_hex):
- try:
- resp = client.request('deserialize', txn_hex)
- except ConnectionError as e:
- resp = e
- return resp
- def txn_hash_greater_than_processed_height(address_histories, height=0):
- """returns the list of transaction hashes greater than the processed height and their height"""
- resp = []
- if type(address_histories) == list:
- for history in address_histories:
- if history['height'] > height:
- resp.append(history)
- else:
- if address_histories['height'] > height:
- resp.append(address_histories)
- return resp
- def total_received(address, height=0, force_return=True):
- """
- Return the total received by an address above a given block height
- >> if height is suplied, total received after
- the supplied height is returned
- >> if force return is False, the function will not bother to calculate
- the total received if max height in address transaction history is
- less than supplied height
- """
- # decode txn_hash
- # loop through outputs
- address_history = get_address_history(address)
- if isinstance(address_history, ConnectionError):
- heights = list(tx['height'] for tx in address_history)
- if heights:
- max_height = max(heights)
- else:
- return None # Address has not received anything anything above previous height
- elif type(address_history) == ConnectionError:
- return None # find a better way to handle connection error
- else:
- max_height = address_history['height']
- if height and not force_return:
- if max_height <= height:
- return None
- hash_list = txn_hash_greater_than_processed_height(
- address_history, height=height)
- received = 0
- for txn in hash_list:
- txn_hash = txn['tx_hash']
- try:
- hex_value_dict = gettransaction(txn_hash)
- if isinstance(hex_value_dict, ConnectionError):
- return None
- if hex_value_dict['complete']:
- hex_value = hex_value_dict['hex']
- txn_decoded = decode_tnx_hex(hex_value)
- if isinstance(txn_decoded, ConnectionError):
- return None
- except Exception as e:
- print(e)
- return None
- for _txn in txn_decoded['outputs']:
- if _txn['address'] == address:
- received += txn['value']
- return {'total_received': received / 10**8, 'height': max_height}
- def create_new_address():
- "generate new address from pubKey using pybitcointools, no need query daemon"
- pass
- def send(_to, amount):
- try:
- resp = client.request('payto', [_to, amount])
- except ConnectionError as e:
- resp = e
- return resp
- def broadcast(hx):
- try:
- resp = client.request('broadcast', [hx])
- except ConnectionError as e:
- resp = e
- return resp
- def main():
- '''
- """Tests"""
- address = 'mpXbEmKZNdiu8LoRvmAjNhdubh6bRcS1Sc'
- print(get_address_list())
- assert type(get_address_list()) == list
- assert type(get_balance()) == dict
- get_address_balance('mpXbEmKZNdiu8LoRvmAjNhdubh6bRcS1Sc')
- is_mine('mpXbEmKZNdiu8LoRvmAjNhdubh6bRcS1Sc')
- is_mine('mujekfctnb7kLFPp5xY3hPEk5bR9nuqNuZ')
- get_unused_addr()
- histories = get_address_history(address)
- print('type=========', type(histories))
- txn = gettransaction('64d6867629b3db43e67337d334ebd4d599b9a72d087224339b2b521cfa9952ff')
- txn_hex = txn['hex']
- decode_tnx_hex(txn_hex)
- print(txn)
- try:
- assert not isinstance(txn, ConnectionError)
- except AssertionError:
- print("Nework failure")
- '''
- #res = (send('mhtEbsosjxP1U2mWLTobG8Mt58mfrS3ZkU', 0.001))
- # print(json.loads(res.text)['result'])
- #hx = json.loads(res.text)['result']['hex']
- #res = broadcast(hx)
- # print(json.loads(res.text)['result'])
- # print(res.text)
- res = get_balance()
- print(res.text)
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment