Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # pylint: disable=import-error, invalid-sequence-index
- import time
- import sys
- import asyncio
- sys.path.append("..")
- from coin import Coin
- class BTC(Coin):
- coin_name="BTC"
- friendly_name="Bitcoin"
- providers=["electrum","jsonrpcrequests"]
- def __init__(self,rpc_url,rpc_user,rpc_pass):
- super().__init__()
- self.rpc_url = rpc_url
- self.rpc_user = rpc_user
- self.rpc_pass = rpc_pass
- self.server = self.providers["jsonrpcrequests"].RPCProxy(self.rpc_url,self.rpc_user,self.rpc_pass)
- self.loop, self.stopping_fut, self.loop_thread = self.providers["electrum"].util.create_and_start_event_loop()
- self.network = self.providers["electrum"].Network()
- self.network.start()
- # wait until connected
- while not self.network.is_connected():
- time.sleep(1)
- def get_tx(self, tx: str) -> dict:
- fut = asyncio.run_coroutine_threadsafe(self.get_tx_async(tx), self.loop)
- return fut.result()
- def get_address(self, address: str) -> list:
- out = self.server.getaddresshistory(address)
- for i in out:
- i["tx"] = self.get_tx(i["tx_hash"])
- return out
- def balance(self) -> dict:
- data = self.server.getbalance()
- return {"confirmed": data.get("confirmed"),
- "unconfirmed": data.get("unconfirmed"),
- "unmatured": data.get("unmatured")}
- async def get_tx_async(self, tx):
- out = self.server.gettransaction(tx)
- out1 = self.server.deserialize(out)
- prevh = out1["inputs"][0]["prevout_hash"]
- prevn = out1["inputs"][0]["prevout_n"]
- out = self.server.gettransaction(prevh)
- out = self.server.deserialize(out)
- for i in out["outputs"]:
- if i["prevout_n"] == prevn:
- out1["input"] = i["address"]
- break
- if not out1["input"]:
- out1["input"] = "Not found"
- conf = await self.network.interface.session.send_request(
- "blockchain.transaction.get",
- [tx, True])
- conf = conf.get("confirmations", 0)
- out1["confirmations"] = conf
- return out1
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement