Advertisement
Guest User

Untitled

a guest
Jun 1st, 2017
70
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 8.18 KB | None | 0 0
  1. import time
  2. import binascii
  3. import struct
  4. import re
  5.  
  6. from twisted.internet import defer
  7.  
  8. from stratum.services import GenericService
  9. from stratum.pubsub import Pubsub, Subscription
  10. from stratum.custom_exceptions import ServiceException, RemoteServiceException
  11. from stratum.socket_transport import SocketTransportFactory, SocketTransportClientFactory
  12. from mining_libs import client_service
  13. from mining_libs.jobs import Job
  14.  
  15. import stratum.logger
  16. log = stratum.logger.get_logger('proxy')
  17.  
  18. def var_int(i):
  19.     if i <= 0xff:
  20.         return struct.pack('>B', i)
  21.     elif i <= 0xffff:
  22.         return struct.pack('>H', i)
  23.     raise Exception("number is too big")
  24.  
  25. class UpstreamServiceException(ServiceException):
  26.     code = -2
  27.  
  28. class SubmitException(ServiceException):
  29.     code = -2
  30.  
  31. class MiningSubscription(Subscription):
  32.     '''This subscription object implements
  33.    logic for broadcasting new jobs to the clients.'''
  34.    
  35.     event = 'job'
  36.     subscribers = {}
  37.    
  38.     @classmethod
  39.     def disconnect_all(cls):
  40.         StratumProxyService.registered_tails = []
  41.         for subs in cls.subscribers:
  42.             try:
  43.                 cls.subscribers[subs].connection_ref().transport.abortConnection()
  44.             except Exception:
  45.                 pass
  46.         #for subs in Pubsub.iterate_subscribers(cls.event):
  47.         #    if subs.connection_ref().transport != None:
  48.         #        subs.connection_ref().transport.loseConnection()
  49.  
  50.     @classmethod
  51.     def add_user_id(cls, subsc, user_id):
  52.         cls.subscribers[user_id] = subsc
  53.        
  54.     @classmethod
  55.     def on_template(cls, job_id, blob, target, user_id):
  56.         '''Push new job to subscribed clients'''
  57.         #cls.last_broadcast = (job_id, blob, target)
  58.         #if user_id:
  59.         #    cls.user_id = user_id
  60.         if cls.subscribers.has_key(user_id):
  61.             subscr = cls.subscribers[user_id]
  62.             subscr.emit_single({'job_id':job_id, 'blob':blob, 'target':target})
  63.        
  64.     def _finish_after_subscribe(self, result):
  65.         '''Send new job to newly subscribed client'''
  66.         #try:
  67.         #    (job_id, blob, target) = self.last_broadcast
  68.         #except Exception:
  69.         #    log.error("Template not ready yet")
  70.         #    return result
  71.        
  72.         #self.emit_single({'job_id':job_id, 'blob':blob, 'target':target})
  73.         return result
  74.              
  75.     def after_subscribe(self, *args):
  76.         '''This will send new job to the client *after* he receive subscription details.
  77.        on_finish callback solve the issue that job is broadcasted *during*
  78.        the subscription request and client receive messages in wrong order.'''
  79.         #self.add_user_id(self, user_id)
  80.         self.connection_ref().on_finish.addCallback(self._finish_after_subscribe)
  81.        
  82. class StratumProxyService(GenericService):
  83.     service_type = 'mining'
  84.     service_vendor = 'mining_proxy'
  85.     is_default = True
  86.    
  87.     _f = None # Factory of upstream Stratum connection
  88.     custom_user = None
  89.     custom_password = None
  90.     enable_worker_id = False
  91.     worker_id_from_ip = False
  92.     tail_iterator = 0
  93.     registered_tails = []
  94.    
  95.     @classmethod
  96.     def _set_upstream_factory(cls, f):
  97.         cls._f = f
  98.  
  99.     @classmethod
  100.     def _set_custom_user(cls, custom_user, custom_password, enable_worker_id, worker_id_from_ip):
  101.         cls.custom_user = custom_user
  102.         cls.custom_password = custom_password
  103.         cls.enable_worker_id = enable_worker_id
  104.         cls.worker_id_from_ip = worker_id_from_ip
  105.  
  106.     @classmethod
  107.     def _is_in_tail(cls, tail):
  108.         if tail in cls.registered_tails:
  109.             return True
  110.         return False
  111.  
  112.     @classmethod
  113.     def _get_unused_tail(cls):
  114.         '''Currently adds up to two bytes to extranonce1,
  115.        limiting proxy for up to 65535 connected clients.'''
  116.        
  117.         for _ in range(0, 0xffff):  # 0-65535
  118.             cls.tail_iterator += 1
  119.             cls.tail_iterator %= 0xffff
  120.  
  121.             # Zero extranonce is reserved for getwork connections
  122.             if cls.tail_iterator == 0:
  123.                 cls.tail_iterator += 1
  124.  
  125.             # var_int throws an exception when input is >= 0xffff
  126.             tail = var_int(cls.tail_iterator)
  127.  
  128.             if tail not in cls.registered_tails:
  129.                 cls.registered_tails.append(tail)
  130.                 return binascii.hexlify(tail)
  131.            
  132.         raise Exception("Extranonce slots are full, please disconnect some miners!")
  133.    
  134.     def _drop_tail(self, result, tail):
  135.         tail = binascii.unhexlify(tail)
  136.         if tail in self.registered_tails:
  137.             self.registered_tails.remove(tail)
  138.         else:
  139.             log.error("Given extranonce is not registered1")
  140.         return result
  141.    
  142.     @defer.inlineCallbacks
  143.     def login(self, params, *args):
  144.         if self._f.client == None or not self._f.client.connected:
  145.             yield self._f.on_connect
  146.  
  147.         if self._f.client == None or not self._f.client.connected:
  148.             raise UpstreamServiceException("Upstream not connected")
  149.  
  150.         tail = self._get_unused_tail()
  151.        
  152.         session = self.connection_ref().get_session()
  153.         session['tail'] = tail
  154.  
  155.         custom_password = self.custom_password
  156.         if self.enable_worker_id and params.has_key("password"):
  157.             if self.worker_id_from_ip:
  158.                 ip_login = self.connection_ref()._get_ip()
  159.                 ip_temp = ip_login.split('.')
  160.                 ip_int = int(ip_temp[0])*16777216 + int(ip_temp[1])*65536 + int(ip_temp[2])*256 + int(ip_temp[3])
  161.                 custom_password = "%s.%s" % (ip_int, custom_password)
  162.             else:
  163.                 params_password = re.sub(r'[^\d]', '', params["login"])
  164.                 if params_password and int(params_password)>0:
  165.                     custom_password = "%s:%s" % (params_password, custom_password)
  166.  
  167.         first_job = (yield self._f.rpc('login', {"login":custom_user, "pass":custom_password}))
  168.  
  169.         try:
  170.             self.connection_ref().on_disconnect.addCallback(self._drop_tail, tail)
  171.         except Exception:
  172.             pass
  173.         subs = Pubsub.subscribe(self.connection_ref(), MiningSubscription())[0]
  174.  
  175.         MiningSubscription.add_user_id(subs[2], first_job['id'])
  176.  
  177.         defer.returnValue(first_job)
  178.            
  179.     @defer.inlineCallbacks
  180.     def submit(self, params, *args):
  181.         if self._f.client == None or not self._f.client.connected:
  182.             self.connection_ref().transport.abortConnection()
  183.             raise SubmitException("Upstream not connected")
  184.  
  185.         session = self.connection_ref().get_session()
  186.         tail = session.get('tail')
  187.         if tail == None:
  188.             raise SubmitException("Connection is not subscribed")
  189.  
  190.         ip = self.connection_ref()._get_ip()
  191.         start = time.time()
  192.        
  193.         try:
  194.             result = (yield self._f.rpc('submit', params))
  195.         except RemoteServiceException as exc:
  196.             response_time = (time.time() - start) * 1000
  197.             log.info("[%dms] Share from '%s' REJECTED: %s" % (response_time, ip, str(exc)))
  198.             raise SubmitException(*exc.args)
  199.  
  200.         response_time = (time.time() - start) * 1000
  201.         log.info("[%dms] Share from '%s' accepted" % (response_time, ip))
  202.         defer.returnValue(result)
  203.  
  204.     @defer.inlineCallbacks
  205.     def get_job(self, params, *args):
  206.         if self._f.client == None or not self._f.client.connected:
  207.             raise SubmitException("Upstream not connected")
  208.  
  209.         session = self.connection_ref().get_session()
  210.         tail = session.get('tail')
  211.         if tail == None:
  212.             raise SubmitException("Connection is not subscribed")
  213.  
  214.         ip = self.connection_ref()._get_ip()
  215.         start = time.time()
  216.  
  217.         try:
  218.             result = (yield self._f.rpc('get_job', params))
  219.         except RemoteServiceException as exc:
  220.             response_time = (time.time() - start) * 1000
  221.             log.info("[%dms] GetJob to '%s' ERROR: %s" % (response_time, ip, str(exc)))
  222.             raise SubmitException(*exc.args)
  223.  
  224.         response_time = (time.time() - start) * 1000
  225.         log.info("[%dms] send GetJob to '%s'" % (response_time, ip))
  226.         defer.returnValue(result)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement