Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- __author__ = 'LSL'
- import csv
- import faker
- import json
- import multiprocessing as mp
- import sys
- import time
- import Queue
- _ATTR = 'ATTRIBUTE'
- _FACT = 'FACT'
- # Analysis metadata template
- _METADATA = dict(
- load=dict(
- stagingUrl="${stagingUrl}",
- fileName="${csvUploadUnique}",
- dataHeader=dict(
- headerRowIndex=0,
- columns=[]
- )))
- # Maximum number of processes that should be generating random data
- _MAX_PROCESSES = mp.cpu_count() - 1
- # Queue to transfer generated data
- _Q = mp.Queue()
- # Event signalling that generators should end generating data and bail out
- _ENDA = mp.Event()
- def csv_file(csv_def_proto, rows):
- """
- Generates CSV file name for CSV file that will be generated from the provided definition and will contain
- provided number of rows.
- :param csv_def_proto: CSV file definition
- :param rows: number of rows
- :return: string - file name
- """
- return 'generated/%s_c%dr%d.csv' % (csv_def_proto['name'], len(csv_def_proto['cols']), rows)
- def fake_with_args(fn, d):
- """
- Returns argument-less function which when called will execute the provided 'fn' with the provided 'd'.
- :param fn: function to call
- :param d: dict with parameters
- :return: function
- """
- params = d.copy()
- def fun():
- return fn(**params)
- return fun
- def _status(generated, rows):
- """
- Updates status
- :param generated: number of rows generated so far
- :param rows: total rows to generate
- :return: None
- """
- sys.stdout.write('\rGenerated %10d rows out of %10d' % (generated, rows))
- def _generate_metadata(csv_def):
- """
- Generates metadata for the CSV file with the provided definition
- :param csv_def: CSV definition
- :return: dict with metadata
- """
- meta = _METADATA.copy()
- meta['load']['dataHeader']['columns'] = [dict(column=dict(name=n, type=t)) for (n, _, t) in csv_def['cols']]
- return meta
- def _write_results(processes_to_finish, rows, writer):
- """
- Gathers results that generators put into shared queue and writes them to file.
- :param processes_to_finish: number of processes generating the data
- :param rows: rows that will be generated
- :param writer: CSV writer
- :return: None
- """
- generated = 0
- try:
- while processes_to_finish > 0:
- try:
- row = _Q.get_nowait()
- if row is not None:
- generated += 1
- writer.writerow(row)
- else:
- processes_to_finish -= 1
- _status(generated, rows)
- except Queue.Empty:
- time.sleep(0.0001)
- except KeyboardInterrupt:
- _ENDA.set()
- def _generate_part(csv_def_factory, rows, seed):
- """
- Generate part of random data for the CSV file. Write generated data to shared queue.
- :param csv_def_factory: CSV factory to obtain CSV definition from
- :param rows: rows to generate
- :param seed: random number seed to use
- :return: None
- """
- csv_def = csv_def_factory()
- csv_def['fake'].seed(seed)
- print ('Starting generator. %d rows X %d columns' % (rows, len(csv_def['cols'])))
- for _ in range(0, rows):
- row = dict()
- for (field, generator, _) in csv_def['cols']:
- row[field] = generator()
- _Q.put(row)
- if _ENDA.is_set():
- break
- print ('Generator finished.')
- _Q.put(None)
- def _start_generator_processes(csv_def_factory, rows):
- """
- Starts processes that will generate CSV.
- :param csv_def_factory: factory function to obtain CSV definition from
- :param rows: number of rows the generated CSV should have
- :return: number of processes that were started and are generating data
- """
- for i in range(0, _MAX_PROCESSES):
- generate = rows / _MAX_PROCESSES
- # Last thread generating data deals with the remainder
- if i == _MAX_PROCESSES - 1:
- generate += rows % _MAX_PROCESSES
- p = mp.Process(target=_generate_part, kwargs=dict(csv_def_factory=csv_def_factory,
- rows=generate, seed=i))
- p.daemon = True
- p.start()
- return _MAX_PROCESSES
- def _generate(csv_def_factory, rows):
- """
- Generates CSV file with provided number of rows. The format and data of the CSV file is specified
- in the CSV definition which can be obtained via the provided csv_def_factory function.
- The CSV definition is obtained from factory function to ensure that everyone who gets the definition
- has their own unique copy of the definition. This is necessary in multi-threaded environments as an instance
- of definition is bound with an instance of faker.
- :param csv_def_factory: factory function to obtain CSV definition
- :param rows: number of rows to generate
- :return: None
- """
- csv_def_proto = csv_def_factory()
- csv_cols = csv_def_proto['cols']
- fn = csv_file(csv_def_proto, rows)
- with open(fn + '.meta', 'wt') as fw:
- json.dump(_generate_metadata(csv_def_proto), fw, indent=4)
- with open(fn, 'wt') as fw:
- fw.write(','.join([item[0] for item in csv_cols]))
- fw.write('\n')
- writer = csv.DictWriter(fw, [item[0] for item in csv_cols])
- # Start generator threads, each thread will have its own instance of csv def which it will
- # obtain from factory (this is to prevent locking)
- threads_to_finish = _start_generator_processes(csv_def_factory, rows)
- # Write results from queue and into the CSV writer
- _write_results(threads_to_finish, rows, writer)
- def narrow():
- """
- Returns definition of a narrow CSV file.
- :return: dict
- """
- fake = faker.Factory.create()
- return dict(fake=fake, name='narrow', cols=[
- ('name', fake.name, _ATTR),
- ('country', fake.country, _ATTR),
- ('credit_card', fake_with_args(fake.credit_card_number, dict(card_type='visa')), _ATTR),
- ('sec', fake_with_args(fake.credit_card_security_code, dict(card_type='visa')), _ATTR),
- ('stolen', fake_with_args(fake.random_number, dict(digits=5)), _FACT)
- ])
- def wide():
- """
- Returns definition of a wide CSV file.
- :return: dict
- """
- fake = faker.Factory.create()
- return dict(fake=fake, name='wide', cols=[
- ('name', fake.name, _ATTR),
- ('email', fake.email, _ATTR),
- ('company', fake.company, _ATTR),
- ('country', fake.country, _ATTR),
- ('when', fake.date_time, _ATTR),
- ('referer', fake.domain_name, _ATTR),
- ('secret_code', fake.color_name, _ATTR),
- ('ship_to', fake.address, _ATTR),
- ('amount', fake.random_number, _FACT),
- ])
- def extra_wide():
- """
- Returns definition of an extra wide CSV file.
- :return: dict
- """
- fake = faker.Factory.create()
- return dict(fake=fake, name='extra_wide', cols=[
- ('name', fake.name, _ATTR),
- ('email', fake.email, _ATTR),
- ('company', fake.company, _ATTR),
- ('country', fake.country, _ATTR),
- ('when', fake.date_time, _ATTR),
- ('referer', fake.domain_name, _ATTR),
- ('secret_code', fake.color_name, _ATTR),
- ('ship_to', fake.address, _ATTR),
- ('city', fake.city, _ATTR),
- ('firefox', fake.firefox, _ATTR),
- ('ip', fake.ipv4, _ATTR),
- ('ssn', fake.ssn, _ATTR),
- ('amount', fake.random_number, _FACT),
- ])
- if __name__ == "__main__":
- _generate(narrow, 10000)
Advertisement
Add Comment
Please, Sign In to add comment