Spark

Overview of Spark configurations

Overview of Spark configurations

cover photo: Emma Döbken July 2021.

Find myself looking for an overview too often. So let’s create a rough overview of common used config for Spark.

As a start, create a Spark Session with default config:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master(SPARK_MASTER) \
    .appname("app name") \
    .getOrCreate()

The Spark Context represents the connection to the cluster; communicaties with lower-level API’s and RDDs.

Some resource settings on the driver:

...
    .config("spark.driver.memory", "8g")
...
    .config("spark.cores.max", "4")
    .config("spark.executor.memory", "8g")
    .config("spark.executor.cores", "4")
...

Number of shuffle partitions (default is 200), should ideally be equal to the number of cores in the cluster:

...
    .config("spark.sql.shuffle.partitions", "1800")
...

Connect to S3 bucket

Receive the credentials with boto3, then:

...
    .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", ",".join([
        "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider",  # Must be set before amazonaws
        "com.amazonaws.auth.EnvironmentVariableCredentialsProvider",
        "com.amazonaws.auth.InstanceProfileCredentialsProvider",  # Must be set explicitely
    ]))
    .config("spark.hadoop.fs.s3a.access.key", credentials['AccessKeyId']) \
    .config("spark.hadoop.fs.s3a.secret.key", credentials['SecretAccessKey']) \
    .config("spark.hadoop.fs.s3a.session.token", credentials['SessionToken']) \
...

Alternatively, when you need to connect to multiple S3 buckets with different credentials per bucket:

...
    .config(f"spark.hadoop.fs.s3.bucket.{bucket}.access.key", credentials["AccessKeyId"])
    .config(f"spark.hadoop.fs.s3.bucket.{bucket}.secret.key", credentials["SecretAccessKey"])
    .config(f"spark.hadoop.fs.s3.bucket.{bucket}.secret.token", credentials["SessionToken"])
...

Take full ownership of the bucket:

...
    .config("org.apache.hadoop.fs.s3a.acl.default", "BucketOwnerFullControl")
    .config("spark.hadoop.fs.s3a.canned.acl", "BucketOwnerFullControl")
    .config("spark.hadoop.fs.s3.canned.acl", "BucketOwnerFullControl")

Enable Hive and point to Hive metastore URI:

...
    .enableHiveSupport() \
    .config("hive.metastore.uris", "thrift://metastore_uri:9083") \

TODO

... 
    .config("spark.sql.crossJoin.enabled", "true")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.default.parallelism", "1800")
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .config("spark.files.ignoreMissingFiles", "true")
    .config("spark.files.ignoreCorruptFiles", "true")
    .config("spark.shuffle.io.retryWait", "60s")
    .config("spark.shuffle.io.maxRetries", "10")
    .config("spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored", "true")
    .config("spark.hadoop.hive.exec.compress.output", "true")
    .config("spark.hadoop.hive.exec.parallel", "true")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.io.compression.codec", "snappy")
    .config("spark.shuffle.compress", "true")
    .config("spark.scheduler.mode", "FAIR")
    .config("spark.speculation", "false")
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .config("spark.hadoop.fs.s3a.fast.upload", "true")
    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
...

Kafka