Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pyspark import SparkContext
- from blockchain_parser.blockchain import Blockchain
- def map_op(line):
- line = line.replace('\n', '').split('\t')
- line[1] = line[1].replace(':', '')
- return line[0], line[2]
- def parse_data(index):
- path = "/Users/hero/Library/Application Support/Bitcoin/blocks/blk%05d.dat" % index
- block_chain = Blockchain(path)
- results = []
- for block in block_chain.get_unordered_blocks():
- for tx_no, tx_item in enumerate(block.transactions):
- outputs = []
- inputs = []
- for no, tx_input in enumerate(tx_item.inputs):
- if not tx_item.is_coinbase():
- inputs.append((tx_input.transaction_hash, tx_input.transaction_index))
- for no, tx_output in enumerate(tx_item.outputs):
- outputs.append((no, tx_output.value))
- results.append((block.hash, (tx_item.hash, inputs, outputs)))
- return results
- def parse_line(line):
- fields = line.split(",")
- return fields[0], int(fields[1])
- sc = SparkContext(appName="MyFirstApp")
- # start = time.time()
- block_hash_height_rdd = sc.textFile("block_hash_height.csv").map(parse_line)
- block_tx_input_output_rdd = sc.parallelize(range(0, 1)).flatMap(parse_data)
- # print(block_hash_height_rdd.join(block_tx_input_output_rdd).take(1))
- block_tx_input_output_rdd = block_hash_height_rdd.join(block_tx_input_output_rdd).map(
- lambda x: (x[1][0], x[1][1][0], x[1][1][1], x[1][1][2]))
- # print(block_tx_input_output_rdd.take(1))
- outputs_rdd = block_tx_input_output_rdd.flatMap(lambda x: [((x[1], item[0]), item[1]) for item in x[3]])
- tx_rdd = block_tx_input_output_rdd.flatMap(lambda x: [((item[0], item[1]), (x[0], x[1])) for item in x[2]])
- block_in = tx_rdd.join(outputs_rdd).map(lambda x: (x[1][0][0], x[1][1])).reduceByKey(lambda x, y: x + y)
- block_out = block_tx_input_output_rdd.flatMap(lambda x: [(x[0], item[1]) for item in x[3]]).reduceByKey(
- lambda x, y: x + y)
- def reward(height):
- return 5000000000 / (2 ** (height // 210000))
- block_in_out = block_in.join(block_out).map(lambda x: (x[0], x[1][0] + reward(x[0]) - x[1][1]))
- block_in_out.sortByKey().repartition(1).saveAsTextFile("123123")
- # print(outputs_rdd.take(10))
- # block_tx_input_rdd = block_tx_input_output_rdd.flatMap(
- # lambda x: ",".join([(x[0], x[1], item[0], str(item[1])) for item in x[2]]))
- # block_tx_output_rdd = block_tx_input_output_rdd.flatMap(
- # lambda x: ",".join([(x[0], x[1], item[0], str(item[1])) for item in x[3]]))
- #
- # print(block_tx_input_rdd.saveAsTextFile("123"))
- # print(block_tx_output_rdd.saveAsTextFile("1234"))
- # block_hash_height_rdd = sc.textFile("block_hash_height.csv").map(
- # parse_line) # (000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f,0)
- #
- # # print(rdd2.sample(False, 0.1, 1).collect())
- #
- # rdd3 = rdd2.join(rdd1)
- # rdd4 = rdd3.map(lambda x: x[1])
- # rdd5 = rdd4.sortByKey()
- # rdd6 = rdd5.map(lambda x: "%d,%s,%d,%d" % (x[0], x[1][0], x[1][1], x[1][2]))
- #
- # rdd6.repartition(1).saveAsTextFile("out")
- # print(rdd.count())
- # # rdd2 = rdd.distinct()
- # print(rdd2.collect())
- # # rdd.saveAsTextFile("distinct")
- # print(rdd2.count())
- # print(time.time() - start)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement