Guest User

Untitled

a guest
Nov 27th, 2015
117
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.40 KB | None | 0 0
  1. __author__ = 'LSL'
  2.  
  3. import csv
  4. import faker
  5. import json
  6. import multiprocessing as mp
  7. import sys
  8. import time
  9. import Queue
  10.  
  11. _ATTR = 'ATTRIBUTE'
  12. _FACT = 'FACT'
  13.  
  14. # Analysis metadata template
  15. _METADATA = dict(
  16. load=dict(
  17. stagingUrl="${stagingUrl}",
  18. fileName="${csvUploadUnique}",
  19. dataHeader=dict(
  20. headerRowIndex=0,
  21. columns=[]
  22. )))
  23.  
  24. # Maximum number of processes that should be generating random data
  25. _MAX_PROCESSES = mp.cpu_count() - 1
  26.  
  27. # Queue to transfer generated data
  28. _Q = mp.Queue()
  29.  
  30. # Event signalling that generators should end generating data and bail out
  31. _ENDA = mp.Event()
  32.  
  33.  
  34. def csv_file(csv_def_proto, rows):
  35. """
  36. Generates CSV file name for CSV file that will be generated from the provided definition and will contain
  37. provided number of rows.
  38.  
  39. :param csv_def_proto: CSV file definition
  40. :param rows: number of rows
  41. :return: string - file name
  42. """
  43. return 'generated/%s_c%dr%d.csv' % (csv_def_proto['name'], len(csv_def_proto['cols']), rows)
  44.  
  45.  
  46. def fake_with_args(fn, d):
  47. """
  48. Returns argument-less function which when called will execute the provided 'fn' with the provided 'd'.
  49.  
  50. :param fn: function to call
  51. :param d: dict with parameters
  52. :return: function
  53. """
  54. params = d.copy()
  55.  
  56. def fun():
  57. return fn(**params)
  58.  
  59. return fun
  60.  
  61.  
  62. def _status(generated, rows):
  63. """
  64. Updates status
  65. :param generated: number of rows generated so far
  66. :param rows: total rows to generate
  67. :return: None
  68. """
  69. sys.stdout.write('\rGenerated %10d rows out of %10d' % (generated, rows))
  70.  
  71.  
  72. def _generate_metadata(csv_def):
  73. """
  74. Generates metadata for the CSV file with the provided definition
  75.  
  76. :param csv_def: CSV definition
  77. :return: dict with metadata
  78. """
  79. meta = _METADATA.copy()
  80. meta['load']['dataHeader']['columns'] = [dict(column=dict(name=n, type=t)) for (n, _, t) in csv_def['cols']]
  81.  
  82. return meta
  83.  
  84.  
  85. def _write_results(processes_to_finish, rows, writer):
  86. """
  87. Gathers results that generators put into shared queue and writes them to file.
  88.  
  89. :param processes_to_finish: number of processes generating the data
  90. :param rows: rows that will be generated
  91. :param writer: CSV writer
  92. :return: None
  93. """
  94. generated = 0
  95.  
  96. try:
  97. while processes_to_finish > 0:
  98. try:
  99. row = _Q.get_nowait()
  100.  
  101. if row is not None:
  102. generated += 1
  103. writer.writerow(row)
  104. else:
  105. processes_to_finish -= 1
  106.  
  107. _status(generated, rows)
  108. except Queue.Empty:
  109. time.sleep(0.0001)
  110. except KeyboardInterrupt:
  111. _ENDA.set()
  112.  
  113.  
  114. def _generate_part(csv_def_factory, rows, seed):
  115. """
  116. Generate part of random data for the CSV file. Write generated data to shared queue.
  117.  
  118. :param csv_def_factory: CSV factory to obtain CSV definition from
  119. :param rows: rows to generate
  120. :param seed: random number seed to use
  121. :return: None
  122. """
  123. csv_def = csv_def_factory()
  124. csv_def['fake'].seed(seed)
  125.  
  126. print ('Starting generator. %d rows X %d columns' % (rows, len(csv_def['cols'])))
  127.  
  128. for _ in range(0, rows):
  129. row = dict()
  130.  
  131. for (field, generator, _) in csv_def['cols']:
  132. row[field] = generator()
  133. _Q.put(row)
  134.  
  135. if _ENDA.is_set():
  136. break
  137.  
  138. print ('Generator finished.')
  139. _Q.put(None)
  140.  
  141.  
  142. def _start_generator_processes(csv_def_factory, rows):
  143. """
  144. Starts processes that will generate CSV.
  145.  
  146. :param csv_def_factory: factory function to obtain CSV definition from
  147. :param rows: number of rows the generated CSV should have
  148. :return: number of processes that were started and are generating data
  149. """
  150. for i in range(0, _MAX_PROCESSES):
  151. generate = rows / _MAX_PROCESSES
  152.  
  153. # Last thread generating data deals with the remainder
  154. if i == _MAX_PROCESSES - 1:
  155. generate += rows % _MAX_PROCESSES
  156.  
  157. p = mp.Process(target=_generate_part, kwargs=dict(csv_def_factory=csv_def_factory,
  158. rows=generate, seed=i))
  159. p.daemon = True
  160. p.start()
  161.  
  162. return _MAX_PROCESSES
  163.  
  164.  
  165. def _generate(csv_def_factory, rows):
  166. """
  167. Generates CSV file with provided number of rows. The format and data of the CSV file is specified
  168. in the CSV definition which can be obtained via the provided csv_def_factory function.
  169.  
  170. The CSV definition is obtained from factory function to ensure that everyone who gets the definition
  171. has their own unique copy of the definition. This is necessary in multi-threaded environments as an instance
  172. of definition is bound with an instance of faker.
  173.  
  174. :param csv_def_factory: factory function to obtain CSV definition
  175. :param rows: number of rows to generate
  176. :return: None
  177. """
  178. csv_def_proto = csv_def_factory()
  179. csv_cols = csv_def_proto['cols']
  180.  
  181. fn = csv_file(csv_def_proto, rows)
  182.  
  183. with open(fn + '.meta', 'wt') as fw:
  184. json.dump(_generate_metadata(csv_def_proto), fw, indent=4)
  185.  
  186. with open(fn, 'wt') as fw:
  187. fw.write(','.join([item[0] for item in csv_cols]))
  188. fw.write('\n')
  189. writer = csv.DictWriter(fw, [item[0] for item in csv_cols])
  190.  
  191. # Start generator threads, each thread will have its own instance of csv def which it will
  192. # obtain from factory (this is to prevent locking)
  193. threads_to_finish = _start_generator_processes(csv_def_factory, rows)
  194.  
  195. # Write results from queue and into the CSV writer
  196. _write_results(threads_to_finish, rows, writer)
  197.  
  198.  
  199. def narrow():
  200. """
  201. Returns definition of a narrow CSV file.
  202.  
  203. :return: dict
  204. """
  205. fake = faker.Factory.create()
  206.  
  207. return dict(fake=fake, name='narrow', cols=[
  208. ('name', fake.name, _ATTR),
  209. ('country', fake.country, _ATTR),
  210. ('credit_card', fake_with_args(fake.credit_card_number, dict(card_type='visa')), _ATTR),
  211. ('sec', fake_with_args(fake.credit_card_security_code, dict(card_type='visa')), _ATTR),
  212. ('stolen', fake_with_args(fake.random_number, dict(digits=5)), _FACT)
  213. ])
  214.  
  215.  
  216. def wide():
  217. """
  218. Returns definition of a wide CSV file.
  219.  
  220. :return: dict
  221. """
  222. fake = faker.Factory.create()
  223.  
  224. return dict(fake=fake, name='wide', cols=[
  225. ('name', fake.name, _ATTR),
  226. ('email', fake.email, _ATTR),
  227. ('company', fake.company, _ATTR),
  228. ('country', fake.country, _ATTR),
  229. ('when', fake.date_time, _ATTR),
  230. ('referer', fake.domain_name, _ATTR),
  231. ('secret_code', fake.color_name, _ATTR),
  232. ('ship_to', fake.address, _ATTR),
  233. ('amount', fake.random_number, _FACT),
  234. ])
  235.  
  236.  
  237. def extra_wide():
  238. """
  239. Returns definition of an extra wide CSV file.
  240.  
  241. :return: dict
  242. """
  243. fake = faker.Factory.create()
  244.  
  245. return dict(fake=fake, name='extra_wide', cols=[
  246. ('name', fake.name, _ATTR),
  247. ('email', fake.email, _ATTR),
  248. ('company', fake.company, _ATTR),
  249. ('country', fake.country, _ATTR),
  250. ('when', fake.date_time, _ATTR),
  251. ('referer', fake.domain_name, _ATTR),
  252. ('secret_code', fake.color_name, _ATTR),
  253. ('ship_to', fake.address, _ATTR),
  254. ('city', fake.city, _ATTR),
  255. ('firefox', fake.firefox, _ATTR),
  256. ('ip', fake.ipv4, _ATTR),
  257. ('ssn', fake.ssn, _ATTR),
  258. ('amount', fake.random_number, _FACT),
  259. ])
  260.  
  261.  
  262. if __name__ == "__main__":
  263. _generate(narrow, 10000)
Advertisement
Add Comment
Please, Sign In to add comment