Advertisement
maximkeremet

Untitled

Dec 20th, 2021
995
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.68 KB | None | 0 0
  1. EXECUTOR_ENV = 'hdfs:///path/to/python/anaconda_2.4.4.tar.gz' # tar.gz file with python for executors
  2. SPARK_ARCHIVE = 'hdfs:///path/to/lib/spark/sparkjars-2.4.4.zip'
  3.  
  4. os.environ["ARROW_LIBHDFS_DIR"] = "/usr/hdp/2.6.5.0-292/usr/lib"
  5. os.environ['HADOOP_HOME'] = '/usr/hdp/current/hadoop-client/'
  6. os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64/'
  7. os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf/'
  8. os.environ['SPARK_HOME'] = '/opt/conda/lib/python3.7/site-packages/pyspark'
  9. os.environ['PYSPARK_PYTHON'] = 'anaconda_2.4.4.tar.gz/bin/python3'
  10.  
  11.  
  12. def start_spark(app_name: str,
  13.                 driver_memory='12G',
  14.                 num_executors: int,
  15.                 executor_memory='6G',
  16.                 executor_cores=2,
  17.                 queue='default',
  18.                 additional_params: Dict[str, str] = None):
  19.  
  20.    
  21.     spark_driver_host = os.getenv('HOST_IP')
  22.    
  23.     spark_session = (
  24.         SparkSession
  25.         .builder
  26.         .appName(app_name)
  27.         .master('yarn')
  28.         .config('spark.driver.memory', driver_memory)
  29.         .config('spark.driver.maxResultSize', driver_memory)
  30.         .config('spark.driver.allowMultipleContexts', 'True')
  31.         .config('spark.executor.cores', executor_cores)
  32.         .config('spark.executor.memory', executor_memory)
  33.         .config('spark.executor.memoryOverhead', '1G')
  34.         .config('spark.dynamicAllocation.enabled', 'true')
  35.         .config('spark.dynamicAllocation.maxExecutors', num_executors)
  36.         .config("spark.dynamicAllocation.minExecutors", int(num_executors * 0.2))
  37.         .config('spark.sql.broadcastTimeout', '36000')
  38.         .config('spark.dynamicAllocation.cachedExecutorIdleTimeout', '1200s')
  39.         .config('spark.driver.host', spark_driver_host)
  40.         .config('spark.driver.bindAddress', '0.0.0.0')
  41.         .config('spark.driver.extraLibraryPath', '/usr/hdp/2.6.5.0-292/hadoop/lib/native')
  42.         .config('spark.driver.extraJavaOptions', '-Dhdp.version=current')
  43.         .config('spark.debug.maxToStringFields', '50')
  44.         .config('spark.yarn.queue', queue)
  45.         .config('spark.yarn.dist.archives', EXECUTOR_ENV)
  46.         .config('spark.yarn.archive', SPARK_ARCHIVE)
  47.         .config('spark.yarn.am.extraJavaOptions', '-Dhdp.version=current')
  48.         .config('spark.rpc.message.maxSize', '1024')
  49.         .config('spark.sql.warehouse.dir', '/apps/hive/warehouse')
  50.         .config('spark.sql.execution.pandas.respectSessionTimeZone', 'false')
  51.         .config('spark.sql.orc.filterPushdown', 'true')
  52.         .config('spark.sql.hive.convertMetastoreOrc', 'true')
  53.         .config('spark.shuffle.service.enabled', 'true')
  54.         .config('spark.hadoop.yarn.timeline-service.enabled', 'false')
  55.         .config('spark.hadoop.yarn.client.failover-proxy-provider',
  56.                 'org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider')
  57.         .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
  58.         .config('spark.kryoserializer.buffer.max', '1024m')
  59.         .config('spark.executor.extraLibraryPath', '/usr/hdp/2.6.5.0-292/hadoop/lib/native')
  60.         .config("spark.sql.shuffle.partitions", "1000")
  61.         .config('spark.sql.parquet.writeLegacyFormat', 'true')
  62.         .config("spark.jars", "hdfs:///share/lib/jar/postgresql-42.2.12.jar,hdfs:///share/lib/jar/ojdbc7.jar")
  63.         .config("spark.port.maxRetries", '1000')
  64.         .config("spark.shuffle.io.maxRetries", '1000')
  65.     )
  66.  
  67.     if additional_params:
  68.         for key, value in additional_params.items():
  69.             spark_session = spark_session.config(key, value)
  70.  
  71.     spark = (
  72.         spark_session
  73.         .enableHiveSupport()
  74.         .getOrCreate()
  75.     )
  76.     sc = spark.sparkContext
  77.  
  78.     return sc, spark
  79.  
  80.  
  81.  
  82. sc, spark = restart_spark(app_name='test-app')
  83. spark.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement