Guest User

Untitled

a guest
Oct 18th, 2018
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.33 KB | None | 0 0
  1. 1 4:22 6:22 7:44 8:12312
  2. 1 4:44 7:44
  3. 0 1:33 9:0.44
  4. -1 1:55 4:0 8:12132
  5.  
  6. [(4, 3), (7, 2), (8, 2), (1, 2), (6, 1), (9, 1)]
  7.  
  8. import argparse
  9. import multiprocessing as mp
  10. import os
  11. from operator import itemgetter
  12. from collections import Counter
  13. import functools
  14. import json
  15.  
  16.  
  17. def parse_args():
  18. parser = argparse.ArgumentParser()
  19. parser.add_argument("--input", required=True)
  20. parser.add_argument("--output", action='store_true', default=False)
  21. parser.add_argument("--no-stdout", action='store_true', default=False)
  22. parser.add_argument("--cores", type=int, default=None)
  23.  
  24. return parser.parse_args()
  25.  
  26.  
  27. def parse_libsvm_line(line: str) -> list:
  28. """
  29. Parses a line in a LibSVM file to return the indexes of non-zero features
  30. :param line: A line in LibSVM format: "1 5:22 7:44 99:0.88"
  31. :return: A list of ints, one for each index appearing in the line
  32. """
  33. features = line.split()[1:] # Get rid of the class value
  34. indexes = [int(pair.split(":")[0]) for pair in features]
  35. return indexes
  36.  
  37.  
  38. def process_wrapper(arg_tuple):
  39. """
  40. Applies the process function to every line in a chunk of a file, to determine the frequency
  41. of features in the chunk.
  42. :param arg_tuple: A tuple that contains: line_process_fun, filename, chunk_start, chunk_size
  43. :return: A counter object that counts the frequency of each feature in the chunk
  44. """
  45. line_process_fun, filename, chunk_start, chunk_size = arg_tuple
  46. counter = Counter()
  47. with open(filename) as f:
  48. f.seek(chunk_start)
  49. lines = f.read(chunk_size).splitlines()
  50. for line in lines:
  51. indexes = line_process_fun(line)
  52. for index in indexes:
  53. counter[index] += 1
  54.  
  55. return counter
  56.  
  57.  
  58. def chunkify(fname, size=1024*1024):
  59. """
  60. Creates a generator that indicates how to chunk a file into parts.
  61. :param fname: The name of the file to be chunked
  62. :param size: The size of each chunk, in bytes.
  63. :return: A generator of (chunk_start, chunk_size) tuples for the file.
  64. """
  65. file_end = os.path.getsize(fname)
  66. with open(fname, 'r') as f:
  67. chunk_end = f.tell()
  68. while True:
  69. chunk_start = chunk_end
  70. f.seek(f.tell() + size, os.SEEK_SET)
  71. f.readline()
  72. chunk_end = f.tell()
  73. yield chunk_start, chunk_end - chunk_start
  74. if chunk_end > file_end:
  75. break
  76.  
  77.  
  78. if __name__ == '__main__':
  79. args = parse_args()
  80. pool = mp.Pool(args.cores)
  81. jobs = []
  82.  
  83. # Create one job argument tuple for each chunk of the file
  84. for chunk_start, chunk_size in chunkify(args.input):
  85. jobs.append((parse_libsvm_line, args.input, chunk_start, chunk_size))
  86.  
  87. # Process chunks in parallel. The result is a list of Counter objects
  88. res_list = pool.map(process_wrapper, jobs)
  89.  
  90. # Aggregate the chunk dictionaries and sort by decreasing value
  91. aggregated_count = sorted(functools.reduce(lambda a, b: a + b, res_list).items(),
  92. key=itemgetter(1), reverse=True)
  93. # Print the result
  94. if not args.no_stdout:
  95. print(aggregated_count)
  96.  
  97. # Write the result to a file as json (sorted list of [index, count] lists)
  98. if args.output:
  99. with open(args.input + "_frequencies.json", 'w') as out:
  100. json.dump(aggregated_count, out)
  101.  
  102. # Close the pool workers
  103. pool.close()
Add Comment
Please, Sign In to add comment