Advertisement
JudeAustin

P2PMining work.py

Sep 24th, 2013
330
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 18.52 KB | None | 0 0
  1. from __future__ import division
  2.  
  3. import base64
  4. import random
  5. import sys
  6. import time
  7.  
  8. from twisted.internet import defer
  9. from twisted.python import log
  10.  
  11. import bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
  12. from bitcoin import script, worker_interface
  13. from util import jsonrpc, variable, deferral, math, pack
  14. import p2pool, p2pool.data as p2pool_data
  15. import MySQLdb
  16.  
  17. class WorkerBridge(worker_interface.WorkerBridge):
  18.     def __init__(self, my_pubkey_hash, net, donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var):
  19.         worker_interface.WorkerBridge.__init__(self)
  20.         self.recent_shares_ts_work = []
  21.        
  22.         self.my_pubkey_hash = my_pubkey_hash
  23.         self.net = net
  24.         self.donation_percentage = donation_percentage
  25.         self.bitcoind_work = bitcoind_work
  26.         self.best_block_header = best_block_header
  27.         self.best_share_var = best_share_var
  28.         self.tracker = tracker
  29.         self.my_share_hashes = my_share_hashes
  30.         self.my_doa_share_hashes = my_doa_share_hashes
  31.         self.worker_fee = worker_fee
  32.         self.p2p_node = p2p_node
  33.         self.submit_block = submit_block
  34.         self.set_best_share = set_best_share
  35.         self.broadcast_share = broadcast_share
  36.         self.block_height_var = block_height_var
  37.        
  38.        
  39.         def _(share):
  40.             if share.hash in self.my_share_hashes and tracker.is_child_of(share.hash, self.best_share_var.value):
  41.                 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
  42.                 self.removed_unstales_var.set((
  43.                     self.removed_unstales_var.value[0] + 1,
  44.                     self.removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
  45.                     self.removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
  46.                 ))
  47.             if share.hash in self.my_doa_share_hashes and self.tracker.is_child_of(share.hash, self.best_share_var.value):
  48.                 self.removed_doa_unstales_var.set(self.removed_doa_unstales_var.value + 1)
  49.        
  50.         # MERGED WORK
  51.        
  52.         self.merged_work = variable.Variable({})
  53.        
  54.         @defer.inlineCallbacks
  55.         def set_merged_work(merged_url, merged_userpass):
  56.             merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
  57.             while True:
  58.                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 30)(merged_proxy.rpc_getauxblock)()
  59.                 self.merged_work.set(dict(self.merged_work.value, **{auxblock['chainid']: dict(
  60.                     hash=int(auxblock['hash'], 16),
  61.                     target='p2pool' if auxblock['target'] == 'p2pool' else pack.IntType(256).unpack(auxblock['target'].decode('hex')),
  62.                     merged_proxy=merged_proxy,
  63.                 )}))
  64.                 yield deferral.sleep(1)
  65.         for merged_url, merged_userpass in merged_urls:
  66.             set_merged_work(merged_url, merged_userpass)
  67.        
  68.         @self.merged_work.changed.watch
  69.         def _(new_merged_work):
  70.             print 'Got new merged mining work!'
  71.        
  72.         # COMBINE WORK
  73.        
  74.         self.current_work = variable.Variable(None)
  75.         def compute_work():
  76.             t = self.bitcoind_work.value
  77.             bb = self.best_block_header.value
  78.             if bb is not None and bb['previous_block'] == t['previous_block'] and net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(bb)) <= t['bits'].target:
  79.                 print 'Skipping from block %x to block %x!' % (bb['previous_block'],
  80.                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(bb)))
  81.                 t = dict(
  82.                     version=bb['version'],
  83.                     previous_block=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(bb)),
  84.                     bits=bb['bits'], # not always true
  85.                     coinbaseflags='',
  86.                     height=t['height'] + 1,
  87.                     time=bb['timestamp'] + 600, # better way?
  88.                     transactions=[],
  89.                     merkle_link=bitcoin_data.calculate_merkle_link([None], 0),
  90.                     subsidy=net.PARENT.SUBSIDY_FUNC(self.block_height_var.value),
  91.                     last_update=self.bitcoind_work.value['last_update'],
  92.                 )
  93.            
  94.             self.current_work.set(t)
  95.         self.bitcoind_work.changed.watch(lambda _: compute_work())
  96.         self.best_block_header.changed.watch(lambda _: compute_work())
  97.         compute_work()
  98.        
  99.         self.new_work_event = variable.Event()
  100.         @self.current_work.transitioned.watch
  101.         def _(before, after):
  102.             # trigger LP if version/previous_block/bits changed or transactions changed from nothing
  103.             if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits']) or (not before['transactions'] and after['transactions']):
  104.                 self.new_work_event.happened()
  105.         self.merged_work.changed.watch(lambda _: self.new_work_event.happened())
  106.         self.best_share_var.changed.watch(lambda _: self.new_work_event.happened())
  107.    
  108.     def get_stale_counts(self):
  109.         '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
  110.         my_shares = len(self.my_share_hashes)
  111.         my_doa_shares = len(self.my_doa_share_hashes)
  112.         delta = self.tracker.verified.get_delta_to_last(self.best_share_var.value)
  113.         my_shares_in_chain = delta.my_count + self.removed_unstales_var.value[0]
  114.         my_doa_shares_in_chain = delta.my_doa_count + self.removed_doa_unstales_var.value
  115.         orphans_recorded_in_chain = delta.my_orphan_announce_count + self.removed_unstales_var.value[1]
  116.         doas_recorded_in_chain = delta.my_dead_announce_count + self.removed_unstales_var.value[2]
  117.        
  118.         my_shares_not_in_chain = my_shares - my_shares_in_chain
  119.         my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
  120.        
  121.         return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
  122.    
  123.     def get_user_details(self, request):
  124.         user = request.getUser() if request.getUser() is not None else ''
  125.        
  126.         desired_pseudoshare_target = None
  127.         if '+' in user:
  128.             user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
  129.             try:
  130.                 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
  131.             except:
  132.                 pass
  133.        
  134.         desired_share_target = 2**256 - 1
  135.         if '/' in user:
  136.             user, min_diff_str = user.rsplit('/', 1)
  137.             try:
  138.                 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
  139.             except:
  140.                 pass
  141.        
  142.         if random.uniform(0, 100) < self.worker_fee:
  143.             pubkey_hash = self.my_pubkey_hash
  144.         else:
  145.             try:
  146.                 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, self.net.PARENT)
  147.             except: # XXX blah
  148.                 pubkey_hash = self.my_pubkey_hash
  149.        
  150.         return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
  151.    
  152.     def preprocess_request(self, request):
  153.         user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
  154.         return pubkey_hash, desired_share_target, desired_pseudoshare_target
  155.    
  156.     def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
  157.         if len(self.p2p_node.peers) == 0 and self.net.PERSIST:
  158.             raise jsonrpc.Error_for_code(-12345)(u'p2pool is not connected to any peers')
  159.         if self.best_share_var.value is None and self.net.PERSIST:
  160.             raise jsonrpc.Error_for_code(-12345)(u'p2pool is downloading shares')
  161.         if time.time() > self.current_work.value['last_update'] + 60:
  162.             raise jsonrpc.Error_for_code(-12345)(u'lost contact with bitcoind')
  163.        
  164.         if self.merged_work.value:
  165.             tree, size = bitcoin_data.make_auxpow_tree(self.merged_work.value)
  166.             mm_hashes = [self.merged_work.value.get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
  167.             mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
  168.                 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
  169.                 size=size,
  170.                 nonce=0,
  171.             ))
  172.             mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in self.merged_work.value.iteritems()]
  173.         else:
  174.             mm_data = ''
  175.             mm_later = []
  176.        
  177.         tx_hashes = [bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx)) for tx in self.current_work.value['transactions']]
  178.         tx_map = dict(zip(tx_hashes, self.current_work.value['transactions']))
  179.        
  180.         if True:
  181.             share_info, gentx, other_transaction_hashes, get_share = p2pool_data.Share.generate_transaction(
  182.                 tracker=self.tracker,
  183.                 share_data=dict(
  184.                     previous_share_hash=self.best_share_var.value,
  185.                     coinbase=(script.create_push_script([
  186.                         self.current_work.value['height'],
  187.                         ] + ([mm_data] if mm_data else []) + [
  188.                     ]) + self.current_work.value['coinbaseflags'])[:100],
  189.                     nonce=random.randrange(2**32),
  190.                     pubkey_hash=pubkey_hash,
  191.                     subsidy=self.current_work.value['subsidy'],
  192.                     donation=math.perfect_round(65535*self.donation_percentage/100),
  193.                     stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
  194.                         'orphan' if orphans > orphans_recorded_in_chain else
  195.                         'doa' if doas > doas_recorded_in_chain else
  196.                         None
  197.                     )(*self.get_stale_counts()),
  198.                     desired_version=5,
  199.                 ),
  200.                 block_target=self.current_work.value['bits'].target,
  201.                 desired_timestamp=int(time.time() + 0.5),
  202.                 desired_target=desired_share_target,
  203.                 ref_merkle_link=dict(branch=[], index=0),
  204.                 desired_other_transaction_hashes=tx_hashes,
  205.                 net=self.net,
  206.             )
  207.        
  208.         transactions = [gentx] + [tx_map[tx_hash] for tx_hash in other_transaction_hashes]
  209.        
  210.         mm_later = [(dict(aux_work, target=aux_work['target'] if aux_work['target'] != 'p2pool' else share_info['bits'].target), index, hashes) for aux_work, index, hashes in mm_later]
  211.        
  212.         if desired_pseudoshare_target is None:
  213.             target = 2**256-1
  214.             if len(self.recent_shares_ts_work) == 50:
  215.                 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
  216.                 if hash_rate:
  217.                     target = min(target, int(2**256/hash_rate))
  218.         else:
  219.             target = desired_pseudoshare_target
  220.         target = max(target, share_info['bits'].target)
  221.         for aux_work, index, hashes in mm_later:
  222.             target = max(target, aux_work['target'])
  223.         target = math.clip(target, self.net.PARENT.SANE_TARGET_RANGE)
  224.        
  225.         getwork_time = time.time()
  226.         lp_count = self.new_work_event.times
  227.         merkle_link = bitcoin_data.calculate_merkle_link([bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx)) for tx in transactions], 0)
  228.        
  229.         print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
  230.             bitcoin_data.target_to_difficulty(target),
  231.             bitcoin_data.target_to_difficulty(share_info['bits'].target),
  232.             self.current_work.value['subsidy']*1e-8, self.net.PARENT.SYMBOL,
  233.             len(self.current_work.value['transactions']),
  234.         )
  235.        
  236.         ba = bitcoin_getwork.BlockAttempt(
  237.             version=min(self.current_work.value['version'], 2),
  238.             previous_block=self.current_work.value['previous_block'],
  239.             merkle_root=bitcoin_data.check_merkle_link(bitcoin_data.hash256(bitcoin_data.tx_type.pack(transactions[0])), merkle_link),
  240.             timestamp=self.current_work.value['time'],
  241.             bits=self.current_work.value['bits'],
  242.             share_target=target,
  243.         )
  244.        
  245.         received_header_hashes = set()
  246.        
  247.         def got_response(header, request):
  248.             header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
  249.             pow_hash = self.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
  250.             try:
  251.                 if pow_hash <= header['bits'].target or p2pool.DEBUG:
  252.                     self.submit_block(dict(header=header, txs=transactions), ignore_failure=False)
  253.                     if pow_hash <= header['bits'].target:
  254.                         print
  255.                         print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
  256.                         print
  257.             except:
  258.                 log.err(None, 'Error while processing potential block:')
  259.            
  260.             user, _, _, _ = self.get_user_details(request)
  261.             assert header['previous_block'] == ba.previous_block
  262.             assert header['merkle_root'] == ba.merkle_root
  263.             assert header['bits'] == ba.bits
  264.            
  265.             on_time = self.new_work_event.times == lp_count
  266.            
  267.             for aux_work, index, hashes in mm_later:
  268.                 try:
  269.                     if pow_hash <= aux_work['target'] or p2pool.DEBUG:
  270.                         df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
  271.                             pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
  272.                             bitcoin_data.aux_pow_type.pack(dict(
  273.                                 merkle_tx=dict(
  274.                                     tx=transactions[0],
  275.                                     block_hash=header_hash,
  276.                                     merkle_link=merkle_link,
  277.                                 ),
  278.                                 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
  279.                                 parent_block_header=header,
  280.                             )).encode('hex'),
  281.                         )
  282.                         @df.addCallback
  283.                         def _(result, aux_work=aux_work):
  284.                             if result != (pow_hash <= aux_work['target']):
  285.                                 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
  286.                             else:
  287.                                 print 'Merged block submittal result: %s' % (result,)
  288.                         @df.addErrback
  289.                         def _(err):
  290.                             log.err(err, 'Error submitting merged block:')
  291.                 except:
  292.                     log.err(None, 'Error while processing merged mining POW:')
  293.            
  294.             if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
  295.                 share = get_share(header, transactions)
  296.                
  297.                 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
  298.                     request.getUser(),
  299.                     p2pool_data.format_hash(share.hash),
  300.                     p2pool_data.format_hash(share.previous_hash),
  301.                     time.time() - getwork_time,
  302.                     ' DEAD ON ARRIVAL' if not on_time else '',
  303.                 )
  304.                 self.my_share_hashes.add(share.hash)
  305.                 if not on_time:
  306.                     self.my_doa_share_hashes.add(share.hash)
  307.                
  308.                 self.tracker.add(share)
  309.                 if not p2pool.DEBUG:
  310.                     self.tracker.verified.add(share)
  311.                 self.set_best_share()
  312.                
  313.                 try:
  314.                     if pow_hash <= header['bits'].target or p2pool.DEBUG:
  315.                         self.broadcast_share(share.hash)
  316.                 except:
  317.                     log.err(None, 'Error forwarding block solution:')
  318.                
  319.                 self.share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
  320.            
  321.             if pow_hash > target:
  322.                 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
  323.                 print '    Hash:   %56x' % (pow_hash,)
  324.                 print '    Target: %56x' % (target,)
  325.             elif header_hash in received_header_hashes:
  326.                 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
  327.             else:
  328.                 received_header_hashes.add(header_hash)
  329.                 try:
  330.                     dbf_user = request.getUser()
  331.                     dbuser_items = dbf_user.split('+')
  332.                     db_diff = bitcoin_data.target_to_difficulty(target) * 1000000000
  333.                     proxy_db = MySQLdb.connect(host="localhost",user="jason",passwd="p2pool2012",db="p2pmining")
  334.                     pdb_c = proxy_db.cursor()
  335.                     pdb_c.execute("""INSERT INTO lminer_data (id,address,hashrate,timestamp,difficulty,ontime) VALUES (NULL, %s , %s , UNIX_TIMESTAMP() , %s, %s)""", (dbuser_items[0], db_diff * on_time, db_diff , on_time ) )
  336.                     proxy_db.close()
  337.                 except:
  338.                     log.err(None, 'Error with database:')
  339.                 ###
  340.                 self.pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
  341.                 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
  342.                 while len(self.recent_shares_ts_work) > 50:
  343.                     self.recent_shares_ts_work.pop(0)
  344.                 self.local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
  345.            
  346.             return on_time
  347.        
  348.         return ba, got_response
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement