Advertisement
Guest User

Untitled

a guest
Apr 11th, 2018
550
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 44.21 KB | None | 0 0
  1. # Electrum - Lightweight Bitcoin Client
  2. # Copyright (c) 2011-2016 Thomas Voegtlin
  3. #
  4. # Permission is hereby granted, free of charge, to any person
  5. # obtaining a copy of this software and associated documentation files
  6. # (the "Software"), to deal in the Software without restriction,
  7. # including without limitation the rights to use, copy, modify, merge,
  8. # publish, distribute, sublicense, and/or sell copies of the Software,
  9. # and to permit persons to whom the Software is furnished to do so,
  10. # subject to the following conditions:
  11. #
  12. # The above copyright notice and this permission notice shall be
  13. # included in all copies or substantial portions of the Software.
  14. #
  15. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  16. # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  17. # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  18. # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  19. # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  20. # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21. # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  22. # SOFTWARE.
  23. import time
  24. import queue
  25. import os
  26. import stat
  27. import errno
  28. import random
  29. import re
  30. import select
  31. print("Line 30", select)
  32. from collections import defaultdict
  33. import threading
  34. import socket
  35. import json
  36.  
  37. import socks
  38. from . import util
  39. from . import bitcoin
  40. from .bitcoin import *
  41. from . import constants
  42. from .interface import Connection, Interface
  43. from . import blockchain
  44. from .version import ELECTRUM_VERSION, PROTOCOL_VERSION
  45. from .i18n import _
  46.  
  47. print("Line 46", select)
  48. NODES_RETRY_INTERVAL = 60
  49. SERVER_RETRY_INTERVAL = 10
  50.  
  51.  
  52. def parse_servers(result):
  53.     """ parse servers list into dict format"""
  54.     from .version import PROTOCOL_VERSION
  55.     servers = {}
  56.     for item in result:
  57.         host = item[1]
  58.         out = {}
  59.         version = None
  60.         pruning_level = '-'
  61.         if len(item) > 2:
  62.             for v in item[2]:
  63.                 if re.match("[st]\d*", v):
  64.                     protocol, port = v[0], v[1:]
  65.                     if port == '': port = constants.net.DEFAULT_PORTS[protocol]
  66.                     out[protocol] = port
  67.                 elif re.match("v(.?)+", v):
  68.                     version = v[1:]
  69.                 elif re.match("p\d*", v):
  70.                     pruning_level = v[1:]
  71.                 if pruning_level == '': pruning_level = '0'
  72.         if out:
  73.             out['pruning'] = pruning_level
  74.             out['version'] = version
  75.             servers[host] = out
  76.     return servers
  77.  
  78. def filter_version(servers):
  79.     def is_recent(version):
  80.         try:
  81.             return util.normalize_version(version) >= util.normalize_version(PROTOCOL_VERSION)
  82.         except Exception as e:
  83.             return False
  84.     return {k: v for k, v in servers.items() if is_recent(v.get('version'))}
  85.  
  86.  
  87. def filter_protocol(hostmap, protocol = 's'):
  88.     '''Filters the hostmap for those implementing protocol.
  89.    The result is a list in serialized form.'''
  90.     eligible = []
  91.     for host, portmap in hostmap.items():
  92.         port = portmap.get(protocol)
  93.         if port:
  94.             eligible.append(serialize_server(host, port, protocol))
  95.     return eligible
  96.  
  97. def pick_random_server(hostmap = None, protocol = 's', exclude_set = set()):
  98.     if hostmap is None:
  99.         hostmap = constants.net.DEFAULT_SERVERS
  100.     eligible = list(set(filter_protocol(hostmap, protocol)) - exclude_set)
  101.     return random.choice(eligible) if eligible else None
  102.  
  103. from .simple_config import SimpleConfig
  104.  
  105. proxy_modes = ['socks4', 'socks5', 'http']
  106.  
  107.  
  108. def serialize_proxy(p):
  109.     if not isinstance(p, dict):
  110.         return None
  111.     return ':'.join([p.get('mode'), p.get('host'), p.get('port'),
  112.                      p.get('user', ''), p.get('password', '')])
  113.  
  114.  
  115. def deserialize_proxy(s):
  116.     if not isinstance(s, str):
  117.         return None
  118.     if s.lower() == 'none':
  119.         return None
  120.     proxy = { "mode":"socks5", "host":"localhost" }
  121.     args = s.split(':')
  122.     n = 0
  123.     if proxy_modes.count(args[n]) == 1:
  124.         proxy["mode"] = args[n]
  125.         n += 1
  126.     if len(args) > n:
  127.         proxy["host"] = args[n]
  128.         n += 1
  129.     if len(args) > n:
  130.         proxy["port"] = args[n]
  131.         n += 1
  132.     else:
  133.         proxy["port"] = "8080" if proxy["mode"] == "http" else "1080"
  134.     if len(args) > n:
  135.         proxy["user"] = args[n]
  136.         n += 1
  137.     if len(args) > n:
  138.         proxy["password"] = args[n]
  139.     return proxy
  140.  
  141.  
  142. def deserialize_server(server_str):
  143.     host, port, protocol = str(server_str).rsplit(':', 2)
  144.     assert protocol in 'st'
  145.     int(port)    # Throw if cannot be converted to int
  146.     return host, port, protocol
  147.  
  148.  
  149. def serialize_server(host, port, protocol):
  150.     return str(':'.join([host, port, protocol]))
  151.  
  152.  
  153. class Network(util.DaemonThread):
  154.     """The Network class manages a set of connections to remote electrum
  155.    servers, each connected socket is handled by an Interface() object.
  156.    Connections are initiated by a Connection() thread which stops once
  157.    the connection succeeds or fails.
  158.  
  159.    Our external API:
  160.  
  161.    - Member functions get_header(), get_interfaces(), get_local_height(),
  162.          get_parameters(), get_server_height(), get_status_value(),
  163.          is_connected(), set_parameters(), stop()
  164.    """
  165.  
  166.     def __init__(self, config=None):
  167.         if config is None:
  168.             config = {}  # Do not use mutables as default values!
  169.         util.DaemonThread.__init__(self)
  170.         self.config = SimpleConfig(config) if isinstance(config, dict) else config
  171.         self.num_server = 10 if not self.config.get('oneserver') else 0
  172.         self.blockchains = blockchain.read_blockchains(self.config)
  173.         self.print_error("blockchains", self.blockchains.keys())
  174.         self.blockchain_index = config.get('blockchain_index', 0)
  175.         if self.blockchain_index not in self.blockchains.keys():
  176.             self.blockchain_index = 0
  177.         # Server for addresses and transactions
  178.         self.default_server = self.config.get('server', None)
  179.         # Sanitize default server
  180.         if self.default_server:
  181.             try:
  182.                 deserialize_server(self.default_server)
  183.             except:
  184.                 self.print_error('Warning: failed to parse server-string; falling back to random.')
  185.                 self.default_server = None
  186.         if not self.default_server:
  187.             self.default_server = pick_random_server()
  188.         self.lock = threading.Lock()
  189.         self.pending_sends = []
  190.         self.message_id = 0
  191.         self.debug = False
  192.         self.irc_servers = {} # returned by interface (list from irc)
  193.         self.recent_servers = self.read_recent_servers()
  194.  
  195.         self.banner = ''
  196.         self.donation_address = ''
  197.         self.relay_fee = None
  198.         # callbacks passed with subscriptions
  199.         self.subscriptions = defaultdict(list)
  200.         self.sub_cache = {}
  201.         # callbacks set by the GUI
  202.         self.callbacks = defaultdict(list)
  203.  
  204.         dir_path = os.path.join( self.config.path, 'certs')
  205.         if not os.path.exists(dir_path):
  206.             os.mkdir(dir_path)
  207.             os.chmod(dir_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
  208.  
  209.         # subscriptions and requests
  210.         self.subscribed_addresses = set()
  211.         self.h2addr = {}
  212.         # Requests from client we've not seen a response to
  213.         self.unanswered_requests = {}
  214.         # retry times
  215.         self.server_retry_time = time.time()
  216.         self.nodes_retry_time = time.time()
  217.         # kick off the network.  interface is the main server we are currently
  218.         # communicating with.  interfaces is the set of servers we are connecting
  219.         # to or have an ongoing connection with
  220.         self.interface = None
  221.         self.interfaces = {}
  222.         self.auto_connect = self.config.get('auto_connect', True)
  223.         self.connecting = set()
  224.         self.requested_chunks = set()
  225.         self.socket_queue = queue.Queue()
  226.         self.start_network(deserialize_server(self.default_server)[2],
  227.                            deserialize_proxy(self.config.get('proxy')))
  228.  
  229.     def register_callback(self, callback, events):
  230.         with self.lock:
  231.             for event in events:
  232.                 self.callbacks[event].append(callback)
  233.  
  234.     def unregister_callback(self, callback):
  235.         with self.lock:
  236.             for callbacks in self.callbacks.values():
  237.                 if callback in callbacks:
  238.                     callbacks.remove(callback)
  239.  
  240.     def trigger_callback(self, event, *args):
  241.         with self.lock:
  242.             callbacks = self.callbacks[event][:]
  243.         [callback(event, *args) for callback in callbacks]
  244.  
  245.     def read_recent_servers(self):
  246.         if not self.config.path:
  247.             return []
  248.         path = os.path.join(self.config.path, "recent_servers")
  249.         try:
  250.             with open(path, "r", encoding='utf-8') as f:
  251.                 data = f.read()
  252.                 return json.loads(data)
  253.         except:
  254.             return []
  255.  
  256.     def save_recent_servers(self):
  257.         if not self.config.path:
  258.             return
  259.         path = os.path.join(self.config.path, "recent_servers")
  260.         s = json.dumps(self.recent_servers, indent=4, sort_keys=True)
  261.         try:
  262.             with open(path, "w", encoding='utf-8') as f:
  263.                 f.write(s)
  264.         except:
  265.             pass
  266.  
  267.     def get_server_height(self):
  268.         return self.interface.tip if self.interface else 0
  269.  
  270.     def server_is_lagging(self):
  271.         sh = self.get_server_height()
  272.         if not sh:
  273.             self.print_error('no height for main interface')
  274.             return True
  275.         lh = self.get_local_height()
  276.         result = (lh - sh) > 1
  277.         if result:
  278.             self.print_error('%s is lagging (%d vs %d)' % (self.default_server, sh, lh))
  279.         return result
  280.  
  281.     def set_status(self, status):
  282.         self.connection_status = status
  283.         self.notify('status')
  284.  
  285.     def is_connected(self):
  286.         return self.interface is not None
  287.  
  288.     def is_connecting(self):
  289.         return self.connection_status == 'connecting'
  290.  
  291.     def is_up_to_date(self):
  292.         return self.unanswered_requests == {}
  293.  
  294.     def queue_request(self, method, params, interface=None):
  295.         # If you want to queue a request on any interface it must go
  296.         # through this function so message ids are properly tracked
  297.         if interface is None:
  298.             interface = self.interface
  299.         message_id = self.message_id
  300.         self.message_id += 1
  301.         if self.debug:
  302.             self.print_error(interface.host, "-->", method, params, message_id)
  303.         interface.queue_request(method, params, message_id)
  304.         return message_id
  305.  
  306.     def send_subscriptions(self):
  307.         self.print_error('sending subscriptions to', self.interface.server, len(self.unanswered_requests), len(self.subscribed_addresses))
  308.         self.sub_cache.clear()
  309.         # Resend unanswered requests
  310.         requests = self.unanswered_requests.values()
  311.         self.unanswered_requests = {}
  312.         if self.interface.ping_required():
  313.             params = [ELECTRUM_VERSION, PROTOCOL_VERSION]
  314.             self.queue_request('server.version', params, self.interface)
  315.         for request in requests:
  316.             message_id = self.queue_request(request[0], request[1])
  317.             self.unanswered_requests[message_id] = request
  318.         self.queue_request('server.banner', [])
  319.         self.queue_request('server.donation_address', [])
  320.         self.queue_request('server.peers.subscribe', [])
  321.         self.request_fee_estimates()
  322.         self.queue_request('blockchain.relayfee', [])
  323.         for h in list(self.subscribed_addresses):
  324.             self.queue_request('blockchain.scripthash.subscribe', [h])
  325.  
  326.     def request_fee_estimates(self):
  327.         from .simple_config import FEE_ETA_TARGETS
  328.         self.config.requested_fee_estimates()
  329.         self.queue_request('mempool.get_fee_histogram', [])
  330.         for i in FEE_ETA_TARGETS:
  331.             self.queue_request('blockchain.estimatefee', [i])
  332.  
  333.     def get_status_value(self, key):
  334.         if key == 'status':
  335.             value = self.connection_status
  336.         elif key == 'banner':
  337.             value = self.banner
  338.         elif key == 'fee':
  339.             value = self.config.fee_estimates
  340.         elif key == 'fee_histogram':
  341.             value = self.config.mempool_fees
  342.         elif key == 'updated':
  343.             value = (self.get_local_height(), self.get_server_height())
  344.         elif key == 'servers':
  345.             value = self.get_servers()
  346.         elif key == 'interfaces':
  347.             value = self.get_interfaces()
  348.         return value
  349.  
  350.     def notify(self, key):
  351.         if key in ['status', 'updated']:
  352.             self.trigger_callback(key)
  353.         else:
  354.             self.trigger_callback(key, self.get_status_value(key))
  355.  
  356.     def get_parameters(self):
  357.         host, port, protocol = deserialize_server(self.default_server)
  358.         return host, port, protocol, self.proxy, self.auto_connect
  359.  
  360.     def get_donation_address(self):
  361.         if self.is_connected():
  362.             return self.donation_address
  363.  
  364.     def get_interfaces(self):
  365.         '''The interfaces that are in connected state'''
  366.         return list(self.interfaces.keys())
  367.  
  368.     def get_servers(self):
  369.         out = constants.net.DEFAULT_SERVERS
  370.         if self.irc_servers:
  371.             out.update(filter_version(self.irc_servers.copy()))
  372.         else:
  373.             for s in self.recent_servers:
  374.                 try:
  375.                     host, port, protocol = deserialize_server(s)
  376.                 except:
  377.                     continue
  378.                 if host not in out:
  379.                     out[host] = { protocol:port }
  380.         return out
  381.  
  382.     def start_interface(self, server):
  383.         if (not server in self.interfaces and not server in self.connecting):
  384.             if server == self.default_server:
  385.                 self.print_error("connecting to %s as new interface" % server)
  386.                 self.set_status('connecting')
  387.             self.connecting.add(server)
  388.             c = Connection(server, self.socket_queue, self.config.path)
  389.  
  390.     def start_random_interface(self):
  391.         exclude_set = self.disconnected_servers.union(set(self.interfaces))
  392.         server = pick_random_server(self.get_servers(), self.protocol, exclude_set)
  393.         if server:
  394.             self.start_interface(server)
  395.  
  396.     def start_interfaces(self):
  397.         self.start_interface(self.default_server)
  398.         for i in range(self.num_server - 1):
  399.             self.start_random_interface()
  400.  
  401.     def set_proxy(self, proxy):
  402.         self.proxy = proxy
  403.         # Store these somewhere so we can un-monkey-patch
  404.         if not hasattr(socket, "_socketobject"):
  405.             socket._socketobject = socket.socket
  406.             socket._getaddrinfo = socket.getaddrinfo
  407.         if proxy:
  408.             self.print_error('setting proxy', proxy)
  409.             proxy_mode = proxy_modes.index(proxy["mode"]) + 1
  410.             socks.setdefaultproxy(proxy_mode,
  411.                                   proxy["host"],
  412.                                   int(proxy["port"]),
  413.                                   # socks.py seems to want either None or a non-empty string
  414.                                   username=(proxy.get("user", "") or None),
  415.                                   password=(proxy.get("password", "") or None))
  416.             socket.socket = socks.socksocket
  417.             # prevent dns leaks, see http://stackoverflow.com/questions/13184205/dns-over-proxy
  418.             socket.getaddrinfo = lambda *args: [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (args[0], args[1]))]
  419.         else:
  420.             socket.socket = socket._socketobject
  421.             socket.getaddrinfo = socket._getaddrinfo
  422.  
  423.     def start_network(self, protocol, proxy):
  424.         assert not self.interface and not self.interfaces
  425.         assert not self.connecting and self.socket_queue.empty()
  426.         self.print_error('starting network')
  427.         self.disconnected_servers = set([])
  428.         self.protocol = protocol
  429.         self.set_proxy(proxy)
  430.         self.start_interfaces()
  431.  
  432.     def stop_network(self):
  433.         self.print_error("stopping network")
  434.         for interface in list(self.interfaces.values()):
  435.             self.close_interface(interface)
  436.         if self.interface:
  437.             self.close_interface(self.interface)
  438.         assert self.interface is None
  439.         assert not self.interfaces
  440.         self.connecting = set()
  441.         # Get a new queue - no old pending connections thanks!
  442.         self.socket_queue = queue.Queue()
  443.  
  444.     def set_parameters(self, host, port, protocol, proxy, auto_connect):
  445.         proxy_str = serialize_proxy(proxy)
  446.         server = serialize_server(host, port, protocol)
  447.         # sanitize parameters
  448.         try:
  449.             deserialize_server(serialize_server(host, port, protocol))
  450.             if proxy:
  451.                 proxy_modes.index(proxy["mode"]) + 1
  452.                 int(proxy['port'])
  453.         except:
  454.             return
  455.         self.config.set_key('auto_connect', auto_connect, False)
  456.         self.config.set_key("proxy", proxy_str, False)
  457.         self.config.set_key("server", server, True)
  458.         # abort if changes were not allowed by config
  459.         if self.config.get('server') != server or self.config.get('proxy') != proxy_str:
  460.             return
  461.         self.auto_connect = auto_connect
  462.         if self.proxy != proxy or self.protocol != protocol:
  463.             # Restart the network defaulting to the given server
  464.             self.stop_network()
  465.             self.default_server = server
  466.             self.start_network(protocol, proxy)
  467.         elif self.default_server != server:
  468.             self.switch_to_interface(server)
  469.         else:
  470.             self.switch_lagging_interface()
  471.             self.notify('updated')
  472.  
  473.     def switch_to_random_interface(self):
  474.         '''Switch to a random connected server other than the current one'''
  475.         servers = self.get_interfaces()    # Those in connected state
  476.         if self.default_server in servers:
  477.             servers.remove(self.default_server)
  478.         if servers:
  479.             self.switch_to_interface(random.choice(servers))
  480.  
  481.     def switch_lagging_interface(self):
  482.         '''If auto_connect and lagging, switch interface'''
  483.         if self.server_is_lagging() and self.auto_connect:
  484.             # switch to one that has the correct header (not height)
  485.             header = self.blockchain().read_header(self.get_local_height())
  486.             filtered = list(map(lambda x:x[0], filter(lambda x: x[1].tip_header==header, self.interfaces.items())))
  487.             if filtered:
  488.                 choice = random.choice(filtered)
  489.                 self.switch_to_interface(choice)
  490.  
  491.     def switch_to_interface(self, server):
  492.         '''Switch to server as our interface.  If no connection exists nor
  493.        being opened, start a thread to connect.  The actual switch will
  494.        happen on receipt of the connection notification.  Do nothing
  495.        if server already is our interface.'''
  496.         self.default_server = server
  497.         if server not in self.interfaces:
  498.             self.interface = None
  499.             self.start_interface(server)
  500.             return
  501.         i = self.interfaces[server]
  502.         if self.interface != i:
  503.             self.print_error("switching to", server)
  504.             # stop any current interface in order to terminate subscriptions
  505.             # fixme: we don't want to close headers sub
  506.             #self.close_interface(self.interface)
  507.             self.interface = i
  508.             self.send_subscriptions()
  509.             self.set_status('connected')
  510.             self.notify('updated')
  511.  
  512.     def close_interface(self, interface):
  513.         if interface:
  514.             if interface.server in self.interfaces:
  515.                 self.interfaces.pop(interface.server)
  516.             if interface.server == self.default_server:
  517.                 self.interface = None
  518.             interface.close()
  519.  
  520.     def add_recent_server(self, server):
  521.         # list is ordered
  522.         if server in self.recent_servers:
  523.             self.recent_servers.remove(server)
  524.         self.recent_servers.insert(0, server)
  525.         self.recent_servers = self.recent_servers[0:20]
  526.         self.save_recent_servers()
  527.  
  528.     def process_response(self, interface, response, callbacks):
  529.         if self.debug:
  530.             self.print_error("<--", response)
  531.         error = response.get('error')
  532.         result = response.get('result')
  533.         method = response.get('method')
  534.         params = response.get('params')
  535.  
  536.         # We handle some responses; return the rest to the client.
  537.         if method == 'server.version':
  538.             interface.server_version = result
  539.         elif method == 'blockchain.headers.subscribe':
  540.             if error is None:
  541.                 self.on_notify_header(interface, result)
  542.         elif method == 'server.peers.subscribe':
  543.             if error is None:
  544.                 self.irc_servers = parse_servers(result)
  545.                 self.notify('servers')
  546.         elif method == 'server.banner':
  547.             if error is None:
  548.                 self.banner = result
  549.                 self.notify('banner')
  550.         elif method == 'server.donation_address':
  551.             if error is None:
  552.                 self.donation_address = result
  553.         elif method == 'mempool.get_fee_histogram':
  554.             if error is None:
  555.                 self.print_error('fee_histogram', result)
  556.                 self.config.mempool_fees = result
  557.                 self.notify('fee_histogram')
  558.         elif method == 'blockchain.estimatefee':
  559.             if error is None and result > 0:
  560.                 i = params[0]
  561.                 fee = int(result*COIN)
  562.                 self.config.update_fee_estimates(i, fee)
  563.                 self.print_error("fee_estimates[%d]" % i, fee)
  564.                 self.notify('fee')
  565.         elif method == 'blockchain.relayfee':
  566.             if error is None:
  567.                 self.relay_fee = int(result * COIN) if result is not None else None
  568.                 self.print_error("relayfee", self.relay_fee)
  569.         elif method == 'blockchain.block.get_chunk':
  570.             self.on_get_chunk(interface, response)
  571.         elif method == 'blockchain.block.get_header':
  572.             self.on_get_header(interface, response)
  573.  
  574.         for callback in callbacks:
  575.             callback(response)
  576.  
  577.     def get_index(self, method, params):
  578.         """ hashable index for subscriptions and cache"""
  579.         return str(method) + (':' + str(params[0]) if params else '')
  580.  
  581.     def process_responses(self, interface):
  582.         responses = interface.get_responses()
  583.         for request, response in responses:
  584.             if request:
  585.                 method, params, message_id = request
  586.                 k = self.get_index(method, params)
  587.                 # client requests go through self.send() with a
  588.                 # callback, are only sent to the current interface,
  589.                 # and are placed in the unanswered_requests dictionary
  590.                 client_req = self.unanswered_requests.pop(message_id, None)
  591.                 if client_req:
  592.                     assert interface == self.interface
  593.                     callbacks = [client_req[2]]
  594.                 else:
  595.                     # fixme: will only work for subscriptions
  596.                     k = self.get_index(method, params)
  597.                     callbacks = self.subscriptions.get(k, [])
  598.  
  599.                 # Copy the request method and params to the response
  600.                 response['method'] = method
  601.                 response['params'] = params
  602.                 # Only once we've received a response to an addr subscription
  603.                 # add it to the list; avoids double-sends on reconnection
  604.                 if method == 'blockchain.scripthash.subscribe':
  605.                     self.subscribed_addresses.add(params[0])
  606.             else:
  607.                 if not response:  # Closed remotely / misbehaving
  608.                     self.connection_down(interface.server)
  609.                     break
  610.                 # Rewrite response shape to match subscription request response
  611.                 method = response.get('method')
  612.                 params = response.get('params')
  613.                 k = self.get_index(method, params)
  614.                 if method == 'blockchain.headers.subscribe':
  615.                     response['result'] = params[0]
  616.                     response['params'] = []
  617.                 elif method == 'blockchain.scripthash.subscribe':
  618.                     response['params'] = [params[0]]  # addr
  619.                     response['result'] = params[1]
  620.                 callbacks = self.subscriptions.get(k, [])
  621.  
  622.             # update cache if it's a subscription
  623.             if method.endswith('.subscribe'):
  624.                 self.sub_cache[k] = response
  625.             # Response is now in canonical form
  626.             self.process_response(interface, response, callbacks)
  627.  
  628.     def addr_to_scripthash(self, addr):
  629.         h = bitcoin.address_to_scripthash(addr)
  630.         if h not in self.h2addr:
  631.             self.h2addr[h] = addr
  632.         return h
  633.  
  634.     def overload_cb(self, callback):
  635.         def cb2(x):
  636.             x2 = x.copy()
  637.             p = x2.pop('params')
  638.             addr = self.h2addr[p[0]]
  639.             x2['params'] = [addr]
  640.             callback(x2)
  641.         return cb2
  642.  
  643.     def subscribe_to_addresses(self, addresses, callback):
  644.         hashes = [self.addr_to_scripthash(addr) for addr in addresses]
  645.         msgs = [('blockchain.scripthash.subscribe', [x]) for x in hashes]
  646.         self.send(msgs, self.overload_cb(callback))
  647.  
  648.     def request_address_history(self, address, callback):
  649.         h = self.addr_to_scripthash(address)
  650.         self.send([('blockchain.scripthash.get_history', [h])], self.overload_cb(callback))
  651.  
  652.     def send(self, messages, callback):
  653.         '''Messages is a list of (method, params) tuples'''
  654.         messages = list(messages)
  655.         with self.lock:
  656.             self.pending_sends.append((messages, callback))
  657.  
  658.     def process_pending_sends(self):
  659.         # Requests needs connectivity.  If we don't have an interface,
  660.         # we cannot process them.
  661.         if not self.interface:
  662.             return
  663.  
  664.         with self.lock:
  665.             sends = self.pending_sends
  666.             self.pending_sends = []
  667.  
  668.         for messages, callback in sends:
  669.             for method, params in messages:
  670.                 r = None
  671.                 if method.endswith('.subscribe'):
  672.                     k = self.get_index(method, params)
  673.                     # add callback to list
  674.                     l = self.subscriptions.get(k, [])
  675.                     if callback not in l:
  676.                         l.append(callback)
  677.                     self.subscriptions[k] = l
  678.                     # check cached response for subscriptions
  679.                     r = self.sub_cache.get(k)
  680.                 if r is not None:
  681.                     self.print_error("cache hit", k)
  682.                     callback(r)
  683.                 else:
  684.                     message_id = self.queue_request(method, params)
  685.                     self.unanswered_requests[message_id] = method, params, callback
  686.  
  687.     def unsubscribe(self, callback):
  688.         '''Unsubscribe a callback to free object references to enable GC.'''
  689.         # Note: we can't unsubscribe from the server, so if we receive
  690.         # subsequent notifications process_response() will emit a harmless
  691.         # "received unexpected notification" warning
  692.         with self.lock:
  693.             for v in self.subscriptions.values():
  694.                 if callback in v:
  695.                     v.remove(callback)
  696.  
  697.     def connection_down(self, server):
  698.         '''A connection to server either went down, or was never made.
  699.        We distinguish by whether it is in self.interfaces.'''
  700.         self.disconnected_servers.add(server)
  701.         if server == self.default_server:
  702.             self.set_status('disconnected')
  703.         if server in self.interfaces:
  704.             self.close_interface(self.interfaces[server])
  705.             self.notify('interfaces')
  706.         for b in self.blockchains.values():
  707.             if b.catch_up == server:
  708.                 b.catch_up = None
  709.  
  710.     def new_interface(self, server, socket):
  711.         # todo: get tip first, then decide which checkpoint to use.
  712.         self.add_recent_server(server)
  713.         interface = Interface(server, socket)
  714.         interface.blockchain = None
  715.         interface.tip_header = None
  716.         interface.tip = 0
  717.         interface.mode = 'default'
  718.         interface.request = None
  719.         self.interfaces[server] = interface
  720.         self.queue_request('blockchain.headers.subscribe', [], interface)
  721.         if server == self.default_server:
  722.             self.switch_to_interface(server)
  723.         #self.notify('interfaces')
  724.  
  725.     def maintain_sockets(self):
  726.         '''Socket maintenance.'''
  727.         # Responses to connection attempts?
  728.         while not self.socket_queue.empty():
  729.             server, socket = self.socket_queue.get()
  730.             if server in self.connecting:
  731.                 self.connecting.remove(server)
  732.             if socket:
  733.                 self.new_interface(server, socket)
  734.             else:
  735.                 self.connection_down(server)
  736.  
  737.         # Send pings and shut down stale interfaces
  738.         # must use copy of values
  739.         for interface in list(self.interfaces.values()):
  740.             if interface.has_timed_out():
  741.                 self.connection_down(interface.server)
  742.             elif interface.ping_required():
  743.                 params = [ELECTRUM_VERSION, PROTOCOL_VERSION]
  744.                 self.queue_request('server.version', params, interface)
  745.  
  746.         now = time.time()
  747.         # nodes
  748.         if len(self.interfaces) + len(self.connecting) < self.num_server:
  749.             self.start_random_interface()
  750.             if now - self.nodes_retry_time > NODES_RETRY_INTERVAL:
  751.                 self.print_error('network: retrying connections')
  752.                 self.disconnected_servers = set([])
  753.                 self.nodes_retry_time = now
  754.  
  755.         # main interface
  756.         if not self.is_connected():
  757.             if self.auto_connect:
  758.                 if not self.is_connecting():
  759.                     self.switch_to_random_interface()
  760.             else:
  761.                 if self.default_server in self.disconnected_servers:
  762.                     if now - self.server_retry_time > SERVER_RETRY_INTERVAL:
  763.                         self.disconnected_servers.remove(self.default_server)
  764.                         self.server_retry_time = now
  765.                 else:
  766.                     self.switch_to_interface(self.default_server)
  767.         else:
  768.             if self.config.is_fee_estimates_update_required():
  769.                 self.request_fee_estimates()
  770.  
  771.     def request_chunk(self, interface, index):
  772.         if index in self.requested_chunks:
  773.             return
  774.         interface.print_error("requesting chunk %d" % index)
  775.         self.requested_chunks.add(index)
  776.         self.queue_request('blockchain.block.get_chunk', [index], interface)
  777.  
  778.     def on_get_chunk(self, interface, response):
  779.         '''Handle receiving a chunk of block headers'''
  780.         error = response.get('error')
  781.         result = response.get('result')
  782.         params = response.get('params')
  783.         blockchain = interface.blockchain
  784.         if result is None or params is None or error is not None:
  785.             interface.print_error(error or 'bad response')
  786.             return
  787.         index = params[0]
  788.         # Ignore unsolicited chunks
  789.         if index not in self.requested_chunks:
  790.             interface.print_error("received chunk %d (unsolicited)" % index)
  791.             return
  792.         else:
  793.             interface.print_error("received chunk %d" % index)
  794.         self.requested_chunks.remove(index)
  795.         connect = blockchain.connect_chunk(index, result)
  796.         if not connect:
  797.             self.connection_down(interface.server)
  798.             return
  799.         # If not finished, get the next chunk
  800.         if index >= len(blockchain.checkpoints) and blockchain.height() < interface.tip:
  801.             self.request_chunk(interface, index+1)
  802.         else:
  803.             interface.mode = 'default'
  804.             interface.print_error('catch up done', blockchain.height())
  805.             blockchain.catch_up = None
  806.         self.notify('updated')
  807.  
  808.     def request_header(self, interface, height):
  809.         #interface.print_error("requesting header %d" % height)
  810.         self.queue_request('blockchain.block.get_header', [height], interface)
  811.         interface.request = height
  812.         interface.req_time = time.time()
  813.  
  814.     def on_get_header(self, interface, response):
  815.         '''Handle receiving a single block header'''
  816.         header = response.get('result')
  817.         if not header:
  818.             interface.print_error(response)
  819.             self.connection_down(interface.server)
  820.             return
  821.         height = header.get('block_height')
  822.         if interface.request != height:
  823.             interface.print_error("unsolicited header",interface.request, height)
  824.             self.connection_down(interface.server)
  825.             return
  826.         chain = blockchain.check_header(header)
  827.         if interface.mode == 'backward':
  828.             can_connect = blockchain.can_connect(header)
  829.             if can_connect and can_connect.catch_up is None:
  830.                 interface.mode = 'catch_up'
  831.                 interface.blockchain = can_connect
  832.                 interface.blockchain.save_header(header)
  833.                 next_height = height + 1
  834.                 interface.blockchain.catch_up = interface.server
  835.             elif chain:
  836.                 interface.print_error("binary search")
  837.                 interface.mode = 'binary'
  838.                 interface.blockchain = chain
  839.                 interface.good = height
  840.                 next_height = (interface.bad + interface.good) // 2
  841.                 assert next_height >= self.max_checkpoint(), (interface.bad, interface.good)
  842.             else:
  843.                 if height == 0:
  844.                     self.connection_down(interface.server)
  845.                     next_height = None
  846.                 else:
  847.                     interface.bad = height
  848.                     interface.bad_header = header
  849.                     delta = interface.tip - height
  850.                     next_height = max(self.max_checkpoint(), interface.tip - 2 * delta)
  851.  
  852.         elif interface.mode == 'binary':
  853.             if chain:
  854.                 interface.good = height
  855.                 interface.blockchain = chain
  856.             else:
  857.                 interface.bad = height
  858.                 interface.bad_header = header
  859.             if interface.bad != interface.good + 1:
  860.                 next_height = (interface.bad + interface.good) // 2
  861.                 assert next_height >= self.max_checkpoint()
  862.             elif not interface.blockchain.can_connect(interface.bad_header, check_height=False):
  863.                 self.connection_down(interface.server)
  864.                 next_height = None
  865.             else:
  866.                 branch = self.blockchains.get(interface.bad)
  867.                 if branch is not None:
  868.                     if branch.check_header(interface.bad_header):
  869.                         interface.print_error('joining chain', interface.bad)
  870.                         next_height = None
  871.                     elif branch.parent().check_header(header):
  872.                         interface.print_error('reorg', interface.bad, interface.tip)
  873.                         interface.blockchain = branch.parent()
  874.                         next_height = None
  875.                     else:
  876.                         interface.print_error('checkpoint conflicts with existing fork', branch.path())
  877.                         branch.write('', 0)
  878.                         branch.save_header(interface.bad_header)
  879.                         interface.mode = 'catch_up'
  880.                         interface.blockchain = branch
  881.                         next_height = interface.bad + 1
  882.                         interface.blockchain.catch_up = interface.server
  883.                 else:
  884.                     bh = interface.blockchain.height()
  885.                     next_height = None
  886.                     if bh > interface.good:
  887.                         if not interface.blockchain.check_header(interface.bad_header):
  888.                             b = interface.blockchain.fork(interface.bad_header)
  889.                             self.blockchains[interface.bad] = b
  890.                             interface.blockchain = b
  891.                             interface.print_error("new chain", b.checkpoint)
  892.                             interface.mode = 'catch_up'
  893.                             next_height = interface.bad + 1
  894.                             interface.blockchain.catch_up = interface.server
  895.                     else:
  896.                         assert bh == interface.good
  897.                         if interface.blockchain.catch_up is None and bh < interface.tip:
  898.                             interface.print_error("catching up from %d"% (bh + 1))
  899.                             interface.mode = 'catch_up'
  900.                             next_height = bh + 1
  901.                             interface.blockchain.catch_up = interface.server
  902.  
  903.                 self.notify('updated')
  904.  
  905.         elif interface.mode == 'catch_up':
  906.             can_connect = interface.blockchain.can_connect(header)
  907.             if can_connect:
  908.                 interface.blockchain.save_header(header)
  909.                 next_height = height + 1 if height < interface.tip else None
  910.             else:
  911.                 # go back
  912.                 interface.print_error("cannot connect", height)
  913.                 interface.mode = 'backward'
  914.                 interface.bad = height
  915.                 interface.bad_header = header
  916.                 next_height = height - 1
  917.  
  918.             if next_height is None:
  919.                 # exit catch_up state
  920.                 interface.print_error('catch up done', interface.blockchain.height())
  921.                 interface.blockchain.catch_up = None
  922.                 self.switch_lagging_interface()
  923.                 self.notify('updated')
  924.  
  925.         else:
  926.             raise BaseException(interface.mode)
  927.         # If not finished, get the next header
  928.         if next_height:
  929.             if interface.mode == 'catch_up' and interface.tip > next_height + 50:
  930.                 self.request_chunk(interface, next_height // 2016)
  931.             else:
  932.                 self.request_header(interface, next_height)
  933.         else:
  934.             interface.mode = 'default'
  935.             interface.request = None
  936.             self.notify('updated')
  937.         # refresh network dialog
  938.         self.notify('interfaces')
  939.  
  940.     def maintain_requests(self):
  941.         for interface in list(self.interfaces.values()):
  942.             if interface.request and time.time() - interface.request_time > 20:
  943.                 interface.print_error("blockchain request timed out")
  944.                 self.connection_down(interface.server)
  945.                 continue
  946.  
  947.     def wait_on_sockets(self):
  948.         # Python docs say Windows doesn't like empty selects.
  949.         # Sleep to prevent busy looping
  950.         if not self.interfaces:
  951.             time.sleep(0.1)
  952.             return
  953.         rin = [i for i in self.interfaces.values()]
  954.         win = [i for i in self.interfaces.values() if i.num_requests()]
  955.         try:
  956.             print("Line 956", select)
  957.             rout, wout, xout = select.select(rin, win, [], 0.1)
  958.         except socket.error as e:
  959.             # TODO: py3, get code from e
  960.             code = None
  961.             if code == errno.EINTR:
  962.                 return
  963.             raise
  964.         assert not xout
  965.         for interface in wout:
  966.             interface.send_requests()
  967.         for interface in rout:
  968.             self.process_responses(interface)
  969.  
  970.     def init_headers_file(self):
  971.         b = self.blockchains[0]
  972.         filename = b.path()
  973.         length = 80 * len(constants.net.CHECKPOINTS) * 2016
  974.         if not os.path.exists(filename) or os.path.getsize(filename) < length:
  975.             with open(filename, 'wb') as f:
  976.                 if length>0:
  977.                     f.seek(length-1)
  978.                     f.write(b'\x00')
  979.         with b.lock:
  980.             b.update_size()
  981.  
  982.     def run(self):
  983.         self.init_headers_file()
  984.         while self.is_running():
  985.             self.maintain_sockets()
  986.             self.wait_on_sockets()
  987.             self.maintain_requests()
  988.             self.run_jobs()    # Synchronizer and Verifier
  989.             self.process_pending_sends()
  990.         self.stop_network()
  991.         self.on_stop()
  992.  
  993.     def on_notify_header(self, interface, header):
  994.         height = header.get('block_height')
  995.         if not height:
  996.             return
  997.         if height < self.max_checkpoint():
  998.             self.connection_down(interface.server)
  999.             return
  1000.         interface.tip_header = header
  1001.         interface.tip = height
  1002.         if interface.mode != 'default':
  1003.             return
  1004.         b = blockchain.check_header(header)
  1005.         if b:
  1006.             interface.blockchain = b
  1007.             self.switch_lagging_interface()
  1008.             self.notify('updated')
  1009.             self.notify('interfaces')
  1010.             return
  1011.         b = blockchain.can_connect(header)
  1012.         if b:
  1013.             interface.blockchain = b
  1014.             b.save_header(header)
  1015.             self.switch_lagging_interface()
  1016.             self.notify('updated')
  1017.             self.notify('interfaces')
  1018.             return
  1019.         tip = max([x.height() for x in self.blockchains.values()])
  1020.         if tip >=0:
  1021.             interface.mode = 'backward'
  1022.             interface.bad = height
  1023.             interface.bad_header = header
  1024.             self.request_header(interface, min(tip +1, height - 1))
  1025.         else:
  1026.             chain = self.blockchains[0]
  1027.             if chain.catch_up is None:
  1028.                 chain.catch_up = interface
  1029.                 interface.mode = 'catch_up'
  1030.                 interface.blockchain = chain
  1031.                 self.print_error("switching to catchup mode", tip,  self.blockchains)
  1032.                 self.request_header(interface, 0)
  1033.             else:
  1034.                 self.print_error("chain already catching up with", chain.catch_up.server)
  1035.  
  1036.     def blockchain(self):
  1037.         if self.interface and self.interface.blockchain is not None:
  1038.             self.blockchain_index = self.interface.blockchain.checkpoint
  1039.         return self.blockchains[self.blockchain_index]
  1040.  
  1041.     def get_blockchains(self):
  1042.         out = {}
  1043.         for k, b in self.blockchains.items():
  1044.             r = list(filter(lambda i: i.blockchain==b, list(self.interfaces.values())))
  1045.             if r:
  1046.                 out[k] = r
  1047.         return out
  1048.  
  1049.     def follow_chain(self, index):
  1050.         blockchain = self.blockchains.get(index)
  1051.         if blockchain:
  1052.             self.blockchain_index = index
  1053.             self.config.set_key('blockchain_index', index)
  1054.             for i in self.interfaces.values():
  1055.                 if i.blockchain == blockchain:
  1056.                     self.switch_to_interface(i.server)
  1057.                     break
  1058.         else:
  1059.             raise BaseException('blockchain not found', index)
  1060.  
  1061.         if self.interface:
  1062.             server = self.interface.server
  1063.             host, port, protocol, proxy, auto_connect = self.get_parameters()
  1064.             host, port, protocol = server.split(':')
  1065.             self.set_parameters(host, port, protocol, proxy, auto_connect)
  1066.  
  1067.     def get_local_height(self):
  1068.         return self.blockchain().height()
  1069.  
  1070.     def synchronous_get(self, request, timeout=30):
  1071.         q = queue.Queue()
  1072.         self.send([request], q.put)
  1073.         try:
  1074.             r = q.get(True, timeout)
  1075.         except queue.Empty:
  1076.             raise util.TimeoutException(_('Server did not answer'))
  1077.         if r.get('error'):
  1078.             raise BaseException(r.get('error'))
  1079.         return r.get('result')
  1080.  
  1081.     def broadcast(self, tx, timeout=30):
  1082.         tx_hash = tx.txid()
  1083.         try:
  1084.             out = self.synchronous_get(('blockchain.transaction.broadcast', [str(tx)]), timeout)
  1085.         except BaseException as e:
  1086.             return False, "error: " + str(e)
  1087.         if out != tx_hash:
  1088.             return False, "error: " + out
  1089.         return True, out
  1090.  
  1091.     def export_checkpoints(self, path):
  1092.         # run manually from the console to generate checkpoints
  1093.         cp = self.blockchain().get_checkpoints()
  1094.         with open(path, 'w', encoding='utf-8') as f:
  1095.             f.write(json.dumps(cp, indent=4))
  1096.  
  1097.     def max_checkpoint(self):
  1098.         return max(0, len(constants.net.CHECKPOINTS) * 2016 - 1)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement