Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- EXECUTOR_ENV = 'hdfs:///path/to/python/anaconda_2.4.4.tar.gz' # tar.gz file with python for executors
- SPARK_ARCHIVE = 'hdfs:///path/to/lib/spark/sparkjars-2.4.4.zip'
- os.environ["ARROW_LIBHDFS_DIR"] = "/usr/hdp/2.6.5.0-292/usr/lib"
- os.environ['HADOOP_HOME'] = '/usr/hdp/current/hadoop-client/'
- os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64/'
- os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf/'
- os.environ['SPARK_HOME'] = '/opt/conda/lib/python3.7/site-packages/pyspark'
- os.environ['PYSPARK_PYTHON'] = 'anaconda_2.4.4.tar.gz/bin/python3'
- def start_spark(app_name: str,
- driver_memory='12G',
- num_executors: int,
- executor_memory='6G',
- executor_cores=2,
- queue='default',
- additional_params: Dict[str, str] = None):
- spark_driver_host = os.getenv('HOST_IP')
- spark_session = (
- SparkSession
- .builder
- .appName(app_name)
- .master('yarn')
- .config('spark.driver.memory', driver_memory)
- .config('spark.driver.maxResultSize', driver_memory)
- .config('spark.driver.allowMultipleContexts', 'True')
- .config('spark.executor.cores', executor_cores)
- .config('spark.executor.memory', executor_memory)
- .config('spark.executor.memoryOverhead', '1G')
- .config('spark.dynamicAllocation.enabled', 'true')
- .config('spark.dynamicAllocation.maxExecutors', num_executors)
- .config("spark.dynamicAllocation.minExecutors", int(num_executors * 0.2))
- .config('spark.sql.broadcastTimeout', '36000')
- .config('spark.dynamicAllocation.cachedExecutorIdleTimeout', '1200s')
- .config('spark.driver.host', spark_driver_host)
- .config('spark.driver.bindAddress', '0.0.0.0')
- .config('spark.driver.extraLibraryPath', '/usr/hdp/2.6.5.0-292/hadoop/lib/native')
- .config('spark.driver.extraJavaOptions', '-Dhdp.version=current')
- .config('spark.debug.maxToStringFields', '50')
- .config('spark.yarn.queue', queue)
- .config('spark.yarn.dist.archives', EXECUTOR_ENV)
- .config('spark.yarn.archive', SPARK_ARCHIVE)
- .config('spark.yarn.am.extraJavaOptions', '-Dhdp.version=current')
- .config('spark.rpc.message.maxSize', '1024')
- .config('spark.sql.warehouse.dir', '/apps/hive/warehouse')
- .config('spark.sql.execution.pandas.respectSessionTimeZone', 'false')
- .config('spark.sql.orc.filterPushdown', 'true')
- .config('spark.sql.hive.convertMetastoreOrc', 'true')
- .config('spark.shuffle.service.enabled', 'true')
- .config('spark.hadoop.yarn.timeline-service.enabled', 'false')
- .config('spark.hadoop.yarn.client.failover-proxy-provider',
- 'org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider')
- .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
- .config('spark.kryoserializer.buffer.max', '1024m')
- .config('spark.executor.extraLibraryPath', '/usr/hdp/2.6.5.0-292/hadoop/lib/native')
- .config("spark.sql.shuffle.partitions", "1000")
- .config('spark.sql.parquet.writeLegacyFormat', 'true')
- .config("spark.jars", "hdfs:///share/lib/jar/postgresql-42.2.12.jar,hdfs:///share/lib/jar/ojdbc7.jar")
- .config("spark.port.maxRetries", '1000')
- .config("spark.shuffle.io.maxRetries", '1000')
- )
- if additional_params:
- for key, value in additional_params.items():
- spark_session = spark_session.config(key, value)
- spark = (
- spark_session
- .enableHiveSupport()
- .getOrCreate()
- )
- sc = spark.sparkContext
- return sc, spark
- sc, spark = restart_spark(app_name='test-app')
- spark.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement