Data patterns change over time, so tuning activities have to be continuously performed to keep up with them. In this blog, let’s look at the following topics for managing performance with Delta.
- Common strategies for improving performance
- Optimizing with Delta
- Is it always true that cost is inversely proportional to performance?
- Best practices for managing performance
Common strategies for improving performance:
A pipeline’s performance refers to how quickly the data load can be processed. The volume of data that can be processed is defined as throughput. Both are important scalable metrics in a big data system. Let’s take a look at some ways to boost performance.
Increase the parallelism level:
The ability to divide a large chunk into smaller independent chunks that can be executed in parallel.
Efficient algorithms and code help to crunch through the same business transformations faster.
Workflow that captures task dependencies:
There are inherent dependencies between tasks, and pipelining or orchestration refers to chaining these dependencies as DAG’s, where the inherent lineage determines which can run concurrently and which must wait until all dependent stages have completed successfully. Even better would be the ability to share compute for some of these tasks, as well as the ability to repair and run only the affected section of the pipeline in the event of a localised failure. Using external time schedulers to stitch the task together invariably fails because an unexpected spike in data volume or a temporary glitch in resource availability can cause timelines to slip and tasks to stampede on one another.
When attempting to tune a workload, there are a few standard things to keep an eye out for:
- Which of the three (CPU, memory, or I/O) is the true bottleneck?
- What type of node or horsepower is best for the task?
- Does it need to be scaled vertically or horizontally?
- What is the acceptable range for auto-scaling?
- Will GPUs be advantageous?
- Is there caching?
A pipeline has several distinct stages, and it is critical that they are all performant. Among these stages are the following:
- ETL aspects
- Ingesting data.
- Processing and transformations to model the data for the use case at hand in the lake.
- Enriching data for relevant stakeholders.
- The primary persona involved here is that of a data engineer.
- Consumption aspects
- Getting data ready for consumption by BI and AI personas.
- Ingesting strategies may not match the consumption patterns and this may require different partitioning strategies,file layouts, and so on. For example, while designing a pipeline for ingestion, you may be concerned about how quickly to bring in the data, so your partitioning could be by processing date, whereas a consumption pattern may care about the vent date. So, it is important to a consumption pattern may care about the event date. So, it is important to understand what patterns are most common and cater to those first.
- The primary personals involved here are that of a business analyst or an ML practitioner.
- Delta Aspects
- OPTIMIZE helps with file compaction to avoid the small file problem.
- ANALYZE generates additional runtime statistics metadata that is stored in the metastore that helps during operations such as Adaptive Query Execution(AQE) to modify and refine the query plan on the fly.
- VACUUM removes data files(not log files) that are no longer referenced by the Delta table and are older than the configured retention threshold, which by default is 7 days.
What to look for and where to look
With the help of the various views the underlying query plan is given in the Spark UI, you can identify trouble spots and address them appropriately by adding more compute memory or I/O. You can check the plan, confirm it, and ensure that generated statistics are used to optimise queries by using the explain command. The following syndromes can affect big data pipelines:
- When a dataset is too large to fit in memory, it will spill to disc, which is more expensive, so spills should be avoided.
- When data operations necessitate data movement across workers, internode communication may slow down operations, which is why shuffles should be minimised. Shuffles are produced by operations such as groupBy, orderBy, and sort.
- When the mapping of tasks to cores is not well laid out,it results in the last few tasks running on a few worker nodes while the rest of the nodes are passively waiting, burning to compute but not actively participating in the computation
- Slow functions
- These are usually unoptimized user-defined functions that need to be reviewed and optimized. The thread dump of the executors will reveal slow functions.
- Small files
- Continuous ingestion of micro-batches of data leads to small files, which affects query performance and hence needs coalescing and compaction.
Optimizing with Delta
Data reliability is supported by Delta’s support for ACID transactions and quality guarantee, which minimises the need for superfluous validation steps and accelerates end-to-end processing. This involves less downtime and a triage cycle.
Changing the data layout in storage
There are two ways to optimise the layout of the data to help speed up query performance:
- Compaction, also known as bin-packing
- Here, lots of smaller files are combined into fewer large ones.
- Recommended to run it during off-peak hours.
- Operation is idempotent so no harm in running more frequently
- Can be run on a table, on a file path or on a smaller subset of the table by specifying ‘where’ clause
- Default file size is 1GB. But you can alter that by setting optimize.minFileSize
- Maximum number of threads that can run can be changed by tweaking optimize.maxThreads.
- Data Skipping
- When you write data into a Delta Lake table, data skipping information is automatically collected. At query time, Delta Lake uses this information (minimum and maximum values for each column) to provide faster queries. There is no need to configure data skipping; the feature is enabled whenever it is applicable. Its effectiveness, however, is dependent on the layout of your data.
- Collecting statistics on a column containing long values such as string or binary is an expensive operation. To avoid collecting statistics on such columns you can configure the table property delta.dataSkippingNumIndexedCols. This property indicates the position index of a column in the table’s schema. All columns with a position index less than the delta.dataSkippingNumIndexedCols property will have statistics collected. For the purposes of collecting statistics, each field within a nested column is considered as an individual column. To avoid collecting statistics on columns containing long values, either set the delta.dataSkippingNumIndexedCols property so that the long value columns are after this index in the table’s schema, or move columns containing long strings to an index position greater than the delta.dataSkippingNumIndexedCols property by using ALTER TABLE ALTER COLUMN.
Other Platform Optimizations
Z-Ordering (multi-dimensional clustering),Z-Ordering is a method for grouping related data into one set of files. Data-skipping algorithms in Delta Lake use this co-locality automatically. The amount of data that Delta Lake on Apache Spark must read is significantly decreased by this behaviour. You must specify the columns to order on in the ZORDER BY clause in order to Z-order data:
OPTIMIZE events ZORDER BY (eventType) —
If you have a large amount of data and only want to optimize a subset of it, you can specify an optional partition predicate by using “where”.
OPTIMIZE events WHERE date = ‘2021-11-18’ ZORDER BY (eventType)
Auto optimize is an optional set of features that automatically compact small files during individual writes to a Delta table. Paying a small cost during writes offers significant benefits for tables that are queried actively. Auto optimize is particularly useful in the following scenarios:
- Streaming use cases where latency in the order of minutes is acceptable
- MERGE INTO is the preferred method of writing into Delta Lake
- CREATE TABLE AS SELECT OR INSERT INTO are commonly used operations
Dynamic file pruning:
Dynamic file pruning can significantly improve the performance of many queries on Delta Lake tables. Dynamic file pruning is especially efficient for non-partitioned tables, or for joins on non-partitioned columns. The performance impact of dynamic file pruning is often correlated to the clustering of data so consider using Z-Ordering to maximize the benefit.More information about this can be found here.
Bloom filter indexes:
A Bloom filter index is a space-efficient data structure that enables data skipping on chosen columns, particularly for fields containing arbitrary text.
The Bloom filter operates by either stating that data is definitively not in the file, or that it is probably in the file, with a defined false positive probability (FPP).
Databricks supports file-level Bloom filters; each data file can have a single Bloom filter index file associated with it. Before reading a file Databricks checks the index file and the file is read-only if the index indicates that the file might match a data filter. Databricks always reads the data file if an index does not exist or if a Bloom filter is not defined for a queried column.
The size of a Bloom filter depends on the number of elements in the set for which the Bloom filter has been created and the required FPP. The lower the FPP, the higher the number of used bits per element and the more accurate it will be, at the cost of more disk space and slower downloads. For example, an FPP of 10% requires 5 bits per element.
Low shuffle merge on Databricks:
The MERGE command is used to perform simultaneous updates, insertions, and deletions from a Delta Lake table. Databricks has an optimized implementation of MERGE that improves performance substantially for common workloads by reducing the number of shuffle operations.
Databricks low shuffle merge provides better performance by processing unmodified rows in a separate, more streamlined processing mode, instead of processing them together with the modified rows. As a result, the amount of shuffled data is reduced significantly, leading to improved performance. Low shuffle merge also reduces the need for users to re-run the OPTIMIZE ZORDER BY command after performing a MERGE operation.
Databricks uses disk caching to accelerate data reads by creating copies of remote Parquet data files in nodes’ local storage using a fast intermediate data format. The data is cached automatically whenever a file has to be fetched from a remote location. Successive reads of the same data are then performed locally, which results in significantly improved reading speed. The cache works for all Parquet data files (including Delta Lake tables).More information can be found here.
AQE should be enabled by default because it performs many optimizations on the user’s behalf. In some cases, you may need to specify hints that both range and join optimizations benefit from. The bin size is used to specify hints that are useful for both range and join optimizations. The bin size is a numeric tuning parameter that divides the range condition’s values domain into multiple bins of equal size. For example, with a bin size of 10, the optimisation divides the domain into bins of length 10.
Data skew is a condition in which a table’s data is unevenly distributed across partitions in the cluster, which can severely degrade query performance, particularly for join queries.
Optimize data transformation:
Higher-order functions for operating on arrays and complex nested data type transformations can be very expensive. Using Spark’s built-in functions and primitives is recommended in array manipulation operations.
Is cost always inversely proportional to performance?
Higher performance is typically associated with higher costs. Spark offers tuneable performance and cost options. At a high level, it is a foregone conclusion that if your end-to-end latency is stringent or low, your cost will be higher.
However, using Delta to unify all of your workloads on a single platform brings scalability benefits through automation and standardization, resulting in cost savings by reducing the number of hops and processing steps, which translates to less compute power. Furthermore, by running your queries faster on the same hardware, you pay for a shorter duration of your cloud computing cost.
So, yes, it is possible to improve performance while keeping costs low. The SLA requirements are not jeopardized. Superior architecture options, such as the unification of batch and streaming workloads, handling both schema enforcement and schema evolution, and the ability to handle unstructured data alongside traditional structured and semi-structured data, are available instead. Pipeline simplification, combined with increased reliability and quality, results in fewer outages and triages/fixes, freeing up people from support tasks to focus on improving use case effectiveness.
There are few things to consider when setting up a pipeline that includes
- Ease of setup & use
- Ease of extensibility and integrations
- Ease of migrations
- Ease of moving from dev to prod.
- Ease of moving from low
- Support for both data and machine learning pipelines, as well as REST APIs for CI/CD automation and observability.
The metrics to consider can be summarized as follows
- Total cost of running the entire pipeline
- Time duration from end to end to get the data ready and the access pattern to consume it.
- Productivity gains for the engineering and business teams
- Extensibility of the architecture
- Complexity with creation and maintenance
- Portability of the solution without expensive vendor lock-in.
Best practices for managing performance
Management of costs and performance is a continuous process. They occasionally have opposing effects, but they also function well together. A workload pattern may alter after it has been optimised, requiring fresh adjustments. The data engineer is relieved of these duties by managed platforms like Databricks, which are getting better at analysing workloads and suggesting or implementing optimizations. Before reaching full auto-pilot, there is still a long way to go. The most crucial elements are partition pruning and I/O pruning, among other things:
Is a file-based process that uses directories for each partition value. On-disk,it will look like <partition_key> = <partition_value> and a set of associated parquet files. If the amount of data returned to the driver from executors is large, use spark.driver.maxresultsize to increase it. It may also have too many files, which may slow down writing operations. To optimize writes and auto-compaction, use optimize and auto-optimize. You should investigate the Delta log table.
Data skipping and Ordering help with the better management of the granularity of the file size and avoiding skews in data file size.
Choose your Cluster configuration judiciously. To avoid a lot of storage, add a TTL policy or disable cloud storage versioning, and set spark.sql.shuffle.partitions to the number of executor cores. Use Delta caching to cache data locally rather than storing it in the cloud. This is especially useful in search applications where query reuse is high or filter conditions change only slightly.
Here are some common best practices to follow:
- Employ the right compute type
- Optimise based on the need of the workload under consideration – storage versus compute versus memory
- Once a family type is chosen, start with the lowest configuration and understand bottlenecks before throwing larger node types.
- Depending on SLA’s, choose a reserved instance for the driver and a combination of reserved and spot instances for workers as spot instances can greatly reduce cost.
- Benchmark to size the cluster but allow for auto-scaling(both up and down)
- In most scenarios, these setting should be applied:
- Turn on AQE(spark.sql.adaptive.enabled)
- Turn on coalesce partitions(spark.sql.adaptive.coalescePartitions.enabled).
- Turn on skew join(spark.sql.adaptive.skewjoin.enabled).
- Turn on local shuffle reader(spark.sql.adaptive.localshufflereader.enabled).
- Set the broadcast join threshold(spark.sql.autoBroadcaseJoinThreshold).
- AvoidSortMerge join (spark.sql.join.preferSortMergeJoin = false)
- Build pipelines that capture all associated tasks’ dependencies so that downstream tasks are automatically triggered when relevant, dependent upstream tasks are completed.
- Examine and improve code to make it as lean and mean as possible. Wherever possible, parallelize. Examine the Spark UI and other monitoring dashboards for the five Ss and address them as soon as possible. Configure alerts and notifications to notify you when a job takes too long or fails.
- Build all pipelines to use streaming as well as Delta – the dial can easily be moved from less frequent to more frequent to continuous streaming ingestion mode.
- Plan to carry out scheduled maintenance tasks such as optimizing and vacuuming.
Here are some pointers to help you make sound decisions about your data layout and operations in order to maintain data hygiene.
What should your partitioning strategy be?
A common design consideration is which column(s) to use for partitioning. If the table is small to moderate in size, it may be best to skip the partition entirely. However, if you do partition, you should consider access patterns and where columns are commonly used. Date, for example, is a popular partition strategy. As a general rule, avoid using a high-cardinality column, such as identifiers. However, if the data in a partition is expected to be less than 1GB, then is a viable option.
How do you compact files?
Writing data in micro-batches over time creates small files that degrade performance. Compacting with repartition or optimize will help change the data layout and consolidate the data into fewer files. You could partition both cased and do it for the entire table. In the case of repartition, the older files have to be removed by running a vacuum command.
How do you gracefully replace content or a schema?
There may be times when business logic requires the replacement of a Delta table. You might be tempted to delete the directory and then recreate or create a new one. This is not a good idea because it not only takes longer to delete files but also increases the likelihood of data corruption. A better approach is to use the overwrite mode to replace data with the overwriteSchema option set to true or fix a few values using Delta’s inherent capability for fine-grained updates and deletes.
What type of caching should you apply?
When using Delta, it is best not to use Spark caching because any additional data filters, as well as accessing with a different identifier, will cause it to go back to disk. Delta caching should be used at the VM level whenever possible because the dataset is stored locally. Because the cache is invalidated as the data in cloud storage changes, you don’t have to worry about accessing data.
At Daimlinc , we strongly consider these factors when designing data pipelines for Delta Lake for our customers.
We have incredibly skilled Data & Analytics specialized consultants who can help you implement and optimize the data and AI platforms from start to finish.
If you’re looking to implement Datapipelines with Delta Lake for your Organization and need someone to help you from start to finish, talk to one of our experts.