Advertisement
Guest User

Untitled

a guest
Mar 25th, 2019
163
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.23 KB | None | 0 0
  1. from pyspark import SparkContext
  2. from blockchain_parser.blockchain import Blockchain
  3.  
  4.  
  5. def map_op(line):
  6. line = line.replace('\n', '').split('\t')
  7. line[1] = line[1].replace(':', '')
  8. return line[0], line[2]
  9.  
  10.  
  11. def parse_data(index):
  12. path = "/Users/hero/Library/Application Support/Bitcoin/blocks/blk%05d.dat" % index
  13. block_chain = Blockchain(path)
  14.  
  15. results = []
  16. for block in block_chain.get_unordered_blocks():
  17. for tx_no, tx_item in enumerate(block.transactions):
  18. outputs = []
  19. inputs = []
  20. for no, tx_input in enumerate(tx_item.inputs):
  21. if not tx_item.is_coinbase():
  22. inputs.append((tx_input.transaction_hash, tx_input.transaction_index))
  23. for no, tx_output in enumerate(tx_item.outputs):
  24. outputs.append((no, tx_output.value))
  25.  
  26. results.append((block.hash, (tx_item.hash, inputs, outputs)))
  27.  
  28. return results
  29.  
  30.  
  31. def parse_line(line):
  32. fields = line.split(",")
  33. return fields[0], int(fields[1])
  34.  
  35.  
  36. sc = SparkContext(appName="MyFirstApp")
  37.  
  38. # start = time.time()
  39. block_hash_height_rdd = sc.textFile("block_hash_height.csv").map(parse_line)
  40. block_tx_input_output_rdd = sc.parallelize(range(0, 1)).flatMap(parse_data)
  41.  
  42. # print(block_hash_height_rdd.join(block_tx_input_output_rdd).take(1))
  43.  
  44. block_tx_input_output_rdd = block_hash_height_rdd.join(block_tx_input_output_rdd).map(
  45. lambda x: (x[1][0], x[1][1][0], x[1][1][1], x[1][1][2]))
  46.  
  47. # print(block_tx_input_output_rdd.take(1))
  48.  
  49. outputs_rdd = block_tx_input_output_rdd.flatMap(lambda x: [((x[1], item[0]), item[1]) for item in x[3]])
  50.  
  51. tx_rdd = block_tx_input_output_rdd.flatMap(lambda x: [((item[0], item[1]), (x[0], x[1])) for item in x[2]])
  52.  
  53. block_in = tx_rdd.join(outputs_rdd).map(lambda x: (x[1][0][0], x[1][1])).reduceByKey(lambda x, y: x + y)
  54.  
  55. block_out = block_tx_input_output_rdd.flatMap(lambda x: [(x[0], item[1]) for item in x[3]]).reduceByKey(
  56. lambda x, y: x + y)
  57.  
  58.  
  59. def reward(height):
  60. return 5000000000 / (2 ** (height // 210000))
  61.  
  62.  
  63. block_in_out = block_in.join(block_out).map(lambda x: (x[0], x[1][0] + reward(x[0]) - x[1][1]))
  64.  
  65. block_in_out.sortByKey().repartition(1).saveAsTextFile("123123")
  66.  
  67. # print(outputs_rdd.take(10))
  68.  
  69. # block_tx_input_rdd = block_tx_input_output_rdd.flatMap(
  70. # lambda x: ",".join([(x[0], x[1], item[0], str(item[1])) for item in x[2]]))
  71. # block_tx_output_rdd = block_tx_input_output_rdd.flatMap(
  72. # lambda x: ",".join([(x[0], x[1], item[0], str(item[1])) for item in x[3]]))
  73. #
  74. # print(block_tx_input_rdd.saveAsTextFile("123"))
  75. # print(block_tx_output_rdd.saveAsTextFile("1234"))
  76.  
  77. # block_hash_height_rdd = sc.textFile("block_hash_height.csv").map(
  78. # parse_line) # (000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f,0)
  79. #
  80. # # print(rdd2.sample(False, 0.1, 1).collect())
  81. #
  82. # rdd3 = rdd2.join(rdd1)
  83. # rdd4 = rdd3.map(lambda x: x[1])
  84. # rdd5 = rdd4.sortByKey()
  85. # rdd6 = rdd5.map(lambda x: "%d,%s,%d,%d" % (x[0], x[1][0], x[1][1], x[1][2]))
  86. #
  87. # rdd6.repartition(1).saveAsTextFile("out")
  88.  
  89. # print(rdd.count())
  90. # # rdd2 = rdd.distinct()
  91. # print(rdd2.collect())
  92. # # rdd.saveAsTextFile("distinct")
  93. # print(rdd2.count())
  94. # print(time.time() - start)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement