Tuning Spark Jobs – A step-by-step approach

While Spark offers several tuning configurations, in this article we will cover a few of the most significant and frequently tweaked configurations.Below is the high level flow of information this article will deal with

Spark configuration 

There are three ways you can get and set Spark properties. The first is through a set of configuration files. In your deployment’s $SPARK_HOME directory (where you installed Spark), there are a number of config files:

  • conf/spark-defaults.conf.template,
  • conf/ log4j.properties.template, and
  • conf/spark-env.sh.template

3 ways to set spark configuration

  • Spark Default Configuration file
    • conf/spark-defaults.conf  – apply to the Spark cluster and all Spark applications submitted to the cluster.

  • Spark-submit – on the command line when submitting the application using the –conf flag
      • spark-submit –conf spark.sql.shuffle.partitions=5 –conf “spark.executor.memory=2g” –class main.scala.chapter7.SparkConfig_7_1 jars/mainscala-chapter7_2.12-1.0.jar
    • Spark application via application code
      • def main(args: Array[String]) {
        // Create a session
        val spark = SparkSession.builder
        .config(“spark.sql.shuffle.partitions”, 5)
        .config(“spark.executor.memory”, “2g”)
        .master(“local[*]”)
        .appName(“SparkConfig”)
        .getOrCreate()
  • Spark Shell – The third option is through a programmatic interface via the Spark shell.For example , below scala code shows the spark config launched
      • scala> val mconf = spark.conf.getAll
        scala> for (k <- mconf.keySet) { println(s”${k} -> ${mconf(k)}\n”) }
        spark.driver.host -> 10.13.200.101
        spark.driver.port -> 65204
        spark.repl.class.uri -> spark://10.13.200.101:65204/classes
        spark.jars ->
        spark.repl.class.outputDir -> /private/var/folders/my/qg062ynx5v39wwmfxmph5nn…
        spark.app.name -> Spark shell
        spark.submit.pyFiles ->
        spark.ui.showConsoleProgress -> true
        spark.executor.id -> driver
        spark.submit.deployMode -> client
        spark.master -> local[*]
        spark.home -> /Users/user1/spark/spark-3.0.0-preview2-bin-hadoop2.7
        spark.sql.catalogImplementation -> hive
        spark.app.id -> local-1580144503745
  • Other Options
    • Spark-SQL ( View Only)
      • // In Scala
        spark.sql(“SET -v”).select(“key”, “value”).show(5, false)
        # In Python
        spark.sql(“SET -v”).select(“key”, “value”).show(n=5, truncate=False)
    • Spark UI
      • Alternatively, you can access Spark’s current configuration through the Spark UI’s Environment tab

How to Set or Modify existing Configuration?

To programmatically set or modify an existing configuration, first determine whether the property is modifiable. spark.conf.isModifiable(“”) will return true or false. All modifiable configs can be set to new values using the API:

  • // In Scala
    scala> spark.conf.get(“spark.sql.shuffle.partitions”) –> Get configuration
    res26: String = 200
    scala> spark.conf.set(“spark.sql.shuffle.partitions”, 5) –> Set Configuration
    scala> spark.conf.get(“spark.sql.shuffle.partitions”) –> Verify using get
    res28: String = 5
  • # In Python
    >>> spark.conf.get(“spark.sql.shuffle.partitions”) –> Get configuration
    ‘200’
    >>> spark.conf.set(“spark.sql.shuffle.partitions”, 5) –> Set Configuration
    >>> spark.conf.get(“spark.sql.shuffle.partitions”) –> Verify using get
    ‘5’

Order of Precedence of configuration 

Choosing the best method for setting Spark properties. Which values are upheld? Any setting or flag in spark-defaults.conf Read those provided on the command line first, then those provided on the command line using spark-submit. Those set via SparkSession in the Spark application come last. Because all of these properties have been merged, all duplicate properties in the Spark application priority have been reset. Similarly, unless overridden in the application, the value specified on the command line replaces the setting in the config file.

Static Vs Dynamic resource allocation

You set the limit when you specify compute resources as command-line arguments to spark-submit. This means that if more resources are required later as tasks queue up in the driver due to a higher-than-expected workload, Spark will be unable to accommodate or allocate additional resources.

If you use Spark’s dynamic resource allocation configuration instead, the Spark driver can request more or fewer compute resources as the demand for large workloads fluctuates. When your workloads are dynamic—that is, their demand for compute capacity varies—using dynamic allocation helps to accommodate sudden peaks.

Example Scenarios include Spark Streaming where the data flow volume may be uneven . on-demand data analytics  , etc

Enable Dynamic Resource allocation 

To enable and configure dynamic allocation, you can use settings like the following.Note that the numbers here are arbitrary; the appropriate settings will depend on the nature of your workload and they should be adjusted accordingly. Some of these configs cannot be set inside a Spark REPL, so you will have to set them
programmatically:
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.schedulerBacklogTimeout 1m
spark.dynamicAllocation.maxExecutors 20
spark.dynamicAllocation.executorIdleTimeout 2min

By default spark.dynamicAllocation.enabled is set to false. When enabled with the settings shown here, the Spark driver will request that the cluster manager create two executors to start with, as a minimum (spark.dynamicAllocation.minExecutors). As the task queue backlog increases, new executors will be requested each time the backlog timeout (spark.dynamicAllocation.schedulerBacklogTimeout) is exceeded. In this case, whenever there are pending tasks that have not been scheduled for over 1 minute, the driver will request that a new executor be launched to schedule backlogged tasks, up to a maximum of 20 (spark.dynamicAllocation.maxExecutors). By contrast, if an executor finishes a task and is idle for 2 minutes (spark.dynamicAllocation.executorIdleTimeout), the Spark driver will terminate it.

Configure Spark executor memory and shuffle service

It is not enough to simply enable dynamic resource allocation. You must also understand how Spark allocates and uses executor memory so that executors are not starved of memory or hampered by JVM garbage collection.

Spark.executor.memory governs the amount of memory available to each executor. As shown in the diagrams, this is divided into three sections: execution memory, storage memory, and reserved memory. To protect against OOM errors, the default division is 60% for execution memory and 40% for storage, after allowing for 300 MB for reserved memory. The Spark documentation advises that this will work for most cases, but you can adjust what fraction of spark.executor.memory you want either section to use as a baseline. When storage memory is not being used, Spark can acquire it for use in execution memory for execution purposes, and vice versa.

Spark configurations to tweak for I/O during map and shuffle operations

Note: The recommendations in this table won’t work for all situations, but they should give you an idea of how to adjust these configurations based on your workload. Like with everything else in performance tuning, you have to experiment until you find the right balance.

A partition is a method of organising data on disc into a subset of configurable and readable chunks or blocks of contiguous data. These data subsets can be read or processed independently and in parallel by more than one thread in a process, if necessary. This independence is important because it enables massive parallelism in data processing. Spark is embarrassingly efficient at parallelizing tasks. As previously discussed, for large-scale workloads, a Spark job will have many stages, with many tasks within each stage. Spark will, at most, schedule one thread per task per core, with each task processing a separate partition.To optimize resource utilization and maximize parallelism, the ideal is at least as many partitions as there are cores on the executor. If there are more partitions than there are cores on each executor, all the cores are kept busy. You can think of partitions as
atomic units of parallelism: a single thread running on a single core can work on a single partition.

Size of Partitions

The size of a partition in Spark is dictated by spark.sql.files.maxPartitionBytes.The default is 128 MB. You can decrease the size, but that may result in what’s known as the “small file problem”—many small partition files, introducing an inordinate amount of disk I/O and performance degradation thanks to filesystem operations such as opening, closing, and listing directories, which on a distributed filesystem can be slow.

Shuffle partitions are created during the shuffle stage. By default, the number of shuffle partitions is set to 200 in spark.sql.shuffle.partitions. You can adjust this number depending on the size of the data set you have, to reduce the amount of small partitions being sent across the network to executors’ tasks.

There is no magic formula for the number of shuffle partitions to set for the shuffle stage; the number may vary depending on your use case, data set, number of cores, and the amount of executor memory available—it’s a trial-and-error approach.

Caching and Persistence of Data
What exactly is the distinction between caching and persistence? In Spark, they are interchangeable. These features are provided by two API calls: cache() and persist(). The latter gives you more control over where and how your data is stored—in memory and on disc, serialised or unserialized. Both help to improve performance for frequently accessed DataFrames or tables.

DataFrame.cache()

Cache() will store as many partitions read in memory by Spark executors as memory allows. While a DataFrame can be fractionally cached, partitions cannot (e.g., if you have 8 partitions but only 4.5 partitions can fit in memory, only 4 will be cached). However, if not all of your partitions are cached, when you access the data again, the partitions that are not cached must be recomputed, which slows down your Spark job.

DataFrame.persist()
Persist(StorageLevel.LEVEL) is nuanced, providing control over how your data is cached via StorageLevel. Summarizes the different storage levels. Data on
disk is always serialized using either Java or Kryo serialization.

When to Cache and Persist?
Common use cases for caching are scenarios where you will want to access a large data set repeatedly for queries or transformations. Some examples include:
• DataFrames commonly used during iterative machine learning training
• DataFrames accessed commonly for doing frequent transformations during ETL or building data pipelines
When Not to Cache and Persist?
Not all use cases dictate the need to cache. Some scenarios that may not warrant caching your DataFrames include:
• DataFrames that are too big to fit in memory
• An inexpensive transformation on a DataFrame not requiring frequent use, regardless of size

Spark Joins

Join operations are a common type of transformation in big data analytics in which two data sets are merged over a common matching key. The Spark DataFrame and Dataset APIs, as well as Spark SQL, provide a set of join transformations similar to relational databases: inner joins, outer joins, left joins, right joins, and so on. All of these operations cause a significant amount of data to be moved across Spark executors.

Spark has five distinct join strategies by which it exchanges, moves, sorts, groups, and merges data across executors:

1) the broadcast hash join (BHJ),
2) shuffle hash join (SHJ),
3) shuffle sort merge join (SMJ),
4) broadcast nested loop join (BNLJ), and
5) shuffle-and-replicated nested loop join (a.k.a. Cartesian product join).

This blogs focus on only two of these here (BHJ and SMJ), because they’re the most common ones you’ll encounter.

Broadcast Hash Join

The broadcast hash join, also known as a map-side-only join, is used when two data sets, one small (fitting in the driver’s and executor’s memory) and the other large enough to ideally be spared from movement, need to be joined over certain conditions or columns. The BHJ is the simplest and fastest join Spark provides because it does not involve any data shuffle; all data is available locally to the executor after a broadcast. Simply ensure that you have enough memory on both the Spark driver and the executors to hold the smaller data set in memory.

When to use a broadcast hash join?
• When Spark hashes each key in the smaller and larger data sets to the same partition
• When one data set is significantly smaller than the other (and within the default configuration of 10 MB, or more if you have enough memory)
• When you don’t mind wasting network bandwidth or experiencing OOM errors because the smaller data set will be broadcast to all Spark executors.

Shuffle Sort Merge Join
The sort-merge algorithm is an efficient method for merging two large data sets that are sortable, unique, and can be assigned to or stored in the same partition—that is, two data sets with a common hashable key that end up on the same partition. From the perspective of Spark, this means that all rows in each data set with the same key are hashed on the same partition on the same executor.

When to use a shuffle sort merge join?
• When Spark can sort and hash each key in two large data sets to the same partition
• When you want to combine two data sets using only equi-joins based on matching sorted keys.
• When you want to avoid Exchange and Sort operations in order to avoid large network shuffles

Jobs and Stages:
You can navigate through the Jobs and Stages tabs and drill down to a granular level to examine the details of individual tasks.You can check their completion status and review metrics such as I/O, memory consumption, execution time, and so on.The Jobs tab has an expanded Event Timeline, which shows when executors were added to or removed from the cluster. It also displays a table of all completed jobs in the cluster. The Duration column shows how long it took each job (identified by the Job Id in the first column) to complete. If this time is long, it’s a good indication that you should look into the stages of that job to see what tasks are causing delays.

The Stages tab displays a summary of the current status of all job stages in the application. You can also access a details page for each stage, which includes a DAG as well as metrics on its tasks.

Executors
The Executors tab displays information about the executors that were created for the application. As shown in the screenshot below, you can drill down into the nitty gritty of resource usage (disc, memory, cores), time spent in GC, amount of data written and read during shuffle, and so on.

Storage
We cached two managed tables after bucketing in the Spark code for “Shuffle Sort Merge Join.” The Storage tab, as shown in the screenshot, displays information about any tables or DataFrames cached by the application as a result of the cache() or persist() methods.

Environment
The Environment tab, as shown in screen-shot, is equally important. Knowing about the environment in which your Spark application runs provides many useful clues for troubleshooting.

Takeaway

As you can see, you can improve scaling for large workloads, improve parallelism, and reduce memory starvation among Spark executors by adjusting some of the default Spark configurations. You also saw how you can use caching and persisting strategies with appropriate levels to speed up access to frequently used data sets, and we looked at two commonly used joins Spark uses during complex aggregations and demonstrated how you can avoid expensive shuffle operations by bucketing Data Frames by sorted keys.

Published On: May 14th, 2022 / Categories: Analytics, Data /