Find myself looking for an overview too often. So let’s create a rough overview of common used config for Spark.
As a start, create a Spark Session with default config:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master(SPARK_MASTER) \
.appname("app name") \
.getOrCreate()
The Spark Context represents the connection to the cluster; communicaties with lower-level API’s and RDDs.
Some resource settings on the driver:
...
.config("spark.driver.memory", "8g")
...
.config("spark.cores.max", "4")
.config("spark.executor.memory", "8g")
.config("spark.executor.cores", "4")
...
Number of shuffle partitions (default is 200), should ideally be equal to the number of cores in the cluster:
...
.config("spark.sql.shuffle.partitions", "1800")
...
Connect to S3 bucket
Receive the credentials with boto3
, then:
...
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.aws.credentials.provider", ",".join([
"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider", # Must be set before amazonaws
"com.amazonaws.auth.EnvironmentVariableCredentialsProvider",
"com.amazonaws.auth.InstanceProfileCredentialsProvider", # Must be set explicitely
]))
.config("spark.hadoop.fs.s3a.access.key", credentials['AccessKeyId']) \
.config("spark.hadoop.fs.s3a.secret.key", credentials['SecretAccessKey']) \
.config("spark.hadoop.fs.s3a.session.token", credentials['SessionToken']) \
...
Alternatively, when you need to connect to multiple S3 buckets with different credentials per bucket:
...
.config(f"spark.hadoop.fs.s3.bucket.{bucket}.access.key", credentials["AccessKeyId"])
.config(f"spark.hadoop.fs.s3.bucket.{bucket}.secret.key", credentials["SecretAccessKey"])
.config(f"spark.hadoop.fs.s3.bucket.{bucket}.secret.token", credentials["SessionToken"])
...
Take full ownership of the bucket:
...
.config("org.apache.hadoop.fs.s3a.acl.default", "BucketOwnerFullControl")
.config("spark.hadoop.fs.s3a.canned.acl", "BucketOwnerFullControl")
.config("spark.hadoop.fs.s3.canned.acl", "BucketOwnerFullControl")
Enable Hive and point to Hive metastore URI:
...
.enableHiveSupport() \
.config("hive.metastore.uris", "thrift://metastore_uri:9083") \
Spark and timeseries
When working with timeseries:
.config("spark.sql.datetime.java8API.enabled", "true") \
Spark started before java8 and before that, Java had terrible built-in library for date/time (everyone is using joda time)/
Spark didn’t use joda time internally, it used things like java.sql.Timestamp
. Java8 essentially included joda time as a built-in library, and everyone has been happily using that for date/time on the JVM, including the Scala community. Spark also adapted and moved away from java.sql.Timestamp
to java.time.Instant
. But, to keep things backward compatible, it didn’t force this change upon the users. Setting this config will use the java8 time library instead.
Kafka
the basics (start a consumer):
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host-address:9092") // hosts
.option("subscribe", "joosts-topic") // topic
.option("includeHeaders", true)
// all options go here...
.load()
.select(...)
.filter(...)
configure the startingOffstets, either earliest
, latest
or specified per partition, like:
.option("startingOffsets", """{"joosts-topic":{"0": 3106500, "1": 3106500, "2": 3106500, "3": 3106500, "4": 3106500, "5": 3106500}}""")
Accept data is lost when the retention time has passes:
.option("failOnDataLoss", false)
Authentication:
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.option("kafka.sasl.mechanism", "PLAIN")
.option(
"kafka.sasl.jaas.config",
s"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$kafkaUsername" password="$kafkaPassword";""")
Write to console (for experiments):
.writeStream
.trigger(OneTimeTrigger)
.queryName(...)
.format("console")
.option("truncate", "false")
.start()
.awaitTermination()
Write to delta
.writeStream
.partitionBy(...)
.trigger(Trigger.ProcessingTime(10.seconds))
.queryName(config.appName)
.format("delta")
.outputMode("append")
.option("mergeSchema", "true")
.option("checkpointLocation", config.checkpointPath)
.start(config.tableFullPath)
TODO
...
.config("spark.sql.crossJoin.enabled", "true")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.default.parallelism", "1800")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("spark.files.ignoreMissingFiles", "true")
.config("spark.files.ignoreCorruptFiles", "true")
.config("spark.shuffle.io.retryWait", "60s")
.config("spark.shuffle.io.maxRetries", "10")
.config("spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored", "true")
.config("spark.hadoop.hive.exec.compress.output", "true")
.config("spark.hadoop.hive.exec.parallel", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.io.compression.codec", "snappy")
.config("spark.shuffle.compress", "true")
.config("spark.scheduler.mode", "FAIR")
.config("spark.speculation", "false")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("spark.hadoop.fs.s3a.fast.upload", "true")
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
...