Advertisement
Guest User

Untitled

a guest
Jan 13th, 2017
97
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.07 KB | None | 0 0
  1. import copy
  2. import datetime
  3. import tempfile
  4.  
  5. from perfkitbenchmarker import configs
  6. from perfkitbenchmarker import dpb_service
  7. from perfkitbenchmarker import errors
  8. from perfkitbenchmarker import flags
  9. from perfkitbenchmarker import sample
  10.  
  11. BENCHMARK_NAME = 'dpb_beam_cassandra_io_benchmark'
  12.  
  13. BENCHMARK_CONFIG = """
  14. dpb_beam_cassandra_io_benchmark:
  15. description: Run Cassandra IO Benchmark against Beam services.
  16. dpb_service:
  17. service_type: dataproc # These are defaults and can be
  18. runner_type: spark # overridden on the command line.
  19. worker_group:
  20. vm_spec:
  21. GCP:
  22. machine_type: n1-standard-1
  23. boot_disk_size: 500
  24. AWS:
  25. machine_type: m3.medium
  26. disk_spec:
  27. GCP:
  28. disk_type: nodisk
  29. AWS:
  30. disk_size: 500
  31. disk_type: gp2
  32. worker_count: 2
  33. """
  34.  
  35. CASSANDRA_IT_CLASSNAME = 'org.apache.beam.io.CassandraIOIT'
  36. CASSANDRA_IT_DEFAULT_INPUT = 'path/to/cassandra/default/input'
  37. CASSANDRA_CONFIG_LOCATION = '$BEAM_DIR/io_config/cassandra.config'
  38. CASSANDRA_USERNAME = 'beam_user'
  39. CASSANDRA_PASSWORD = 'hunter2'
  40.  
  41. flags.DEFINE_string('dpb_cassandra_input', CASSANDRA_IT_DEFAULT_INPUT, 'Input for Cassandra Benchmark')
  42.  
  43. FLAGS = flags.FLAGS
  44.  
  45.  
  46. def GetConfig(user_config):
  47. return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
  48.  
  49.  
  50. def CheckPrerequisites():
  51. """Verifies that the required resources are present.
  52.  
  53. Raises:
  54. perfkitbenchmarker.data.ResourceNotFound: On missing resource.
  55. """
  56. if config.runner_type is not 'dataflow' and config.service_type is 'dataflow':
  57. raise errors.Config.InvalidValue('Only DataflowRunner can run on Dataflow Service.')
  58. elif config.runner_type is 'DataflowRunner' and config.service_type is not 'dataflow':
  59. raise errors.Config.InvalidValue('DataflowRunner can only run on Dataflow Service.')
  60.  
  61.  
  62. def Prepare(benchmark_spec):
  63. orchestration_service.start(CASSANDRA_CONFIG_LOCATION, benchmark_spec)
  64. orchestration_service.load_data()
  65. pass
  66.  
  67.  
  68. def Run(benchmark_spec):
  69. # Get handle to the dpb service.
  70. dpb_service_instance = benchmark_spec.dpb_service
  71. # Get the data store spun up by the orchestration service.
  72. data_store_instance = benchmark_spec.data_store
  73.  
  74. # Create a file handle to contain the response from running the job on
  75. # the dpb service
  76. stdout_file = tempfile.NamedTemporaryFile(suffix='.stdout',
  77. prefix='dpb_wordcount_benchmark',
  78. delete=False)
  79. stdout_file.close()
  80.  
  81. # Set job submission parameters
  82. job_arguments = []
  83. job_arguments.append('--inputLocation={}'.format(FLAGS.dpb_cassandra_input))
  84. job_arguments.append('--cassandraIP={}'.format(data_store_instance.ip_address))
  85. job_arguments.append('--cassandraUsername={}'.format(CASSANDRA_USERNAME))
  86. job_arguments.append('--cassandraPassword={}'.format(CASSANDRA_PASSWORD))
  87.  
  88. if dpb_service_instance.RUNNER_TYPE == dpb_service.APEX:
  89. # Set apex-specific parameters (if any) here.
  90. elif dpb_service_instance.RUNNER_TYPE == dpb_service.DATAFLOW:
  91. # Set dataflow-specific parameters (if any) here.
  92. elif dpb_service_instance.RUNNER_TYPE == dpb_service.FLINK:
  93. # Set flink-specific parameters (if any) here.
  94. elif dpb_service_instance.RUNNER_TYPE == dpb_service.SPARK:
  95. # Set spark-specific parameters (if any) here.
  96.  
  97. results = []
  98. metadata = copy.copy(dpb_service_instance.GetMetadata())
  99. metadata.update({'input_location': FLAGS.dpb_wordcount_input})
  100.  
  101. start = datetime.datetime.now()
  102. dpb_service_instance.SubmitJob(CASSANDRA_IT_CLASSNAME,
  103. job_arguments=job_arguments,
  104. job_stdout_file=stdout_file)
  105. end_time = datetime.datetime.now()
  106. run_time = (end_time - start).total_seconds()
  107. results.append(sample.Sample('run_time', run_time, 'seconds', metadata))
  108. job_metrics = dpb_service_instance.get_metrics()
  109. results.append(sample.Sample('bytes_processed', job_metrics['bytes_processed'],
  110. 'bytes', metadata))
  111. return results
  112.  
  113.  
  114. def Cleanup(benchmark_spec):
  115. orchestration_service.tear_down()
  116. pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement