Spark Join Strategies Explained: Shuffle Hash
Everything You Need to Know About Shuffle Hash Join
1. Introduction
Modern big data applications often require joining huge datasets efficiently. Choosing the right join strategy is critical to optimize performance and resource usage. Apache Spark offers several join methods, including broadcast joins, sort-merge joins, and shuffle hash joins. SHJ stands out as a middle-ground approach:
It shuffles both tables like sort-merge joins to align data with the same key.
Instead of sorting, it builds an in-memory hash table for the smaller dataset per partition and probes it with rows from the larger dataset.
This dual approach has the potential to improve execution time by reducing the sorting overhead but demands careful memory management.
2. Understanding Shuffle Hash Join
Shuffle Hash Join is best understood as a hybrid that borrows elements from two traditional join methods:
Sort Merge Join (SMJ)
Mechanism: Both datasets are sorted by the join key and then merged.
Pros: Reliable for large datasets.
Cons: Sorting is CPU intensive.
Broadcast Hash Join (BHJ)
Mechanism: The smaller table is broadcast to all nodes, and each executor performs a local hash join.
Pros: Eliminates shuffling.
Cons: Limited by broadcast size, not suitable when the smaller table exceeds available memory on executors.
How SHJ Differentiates Itself:
Key Step: It shuffles both datasets based on the join key so that every partition contains matching keys.
In-Partition Operation: Instead of sorting the data in each partition, Spark builds a hash table from the smaller dataset's partition and then probes that table with each row from the larger dataset.
Memory Sensitivity: The approach assumes that each partition of the smaller side can be held in memory, which is crucial for performance and avoiding runtime errors.
Key Concepts to Remember:
No Sorting: Eliminates the costly sort phase.
Memory Requirement: High dependency on the ability to fit the hashed partition in memory, risking OOM errors if miscalculated.
3. When to Use SHJ
Historical Perspective
Pre-Spark 3.0:
Spark defaulted to Sort Merge Join for equality-based joins due to the risk of OOM when building in-memory hash tables.Spark 3.x and Beyond:
With enhancements like Adaptive Query Execution (AQE), Spark can dynamically decide to use SHJ when it detects that:The smaller dataset, after partitioning, is of manageable size.
Avoiding the expensive sorting operation is beneficial for performance.
Practical Scenarios
Moderately Small Datasets:
When one dataset is small enough that its partitions are lightweight (e.g., 5 MB per partition out of 5 GB divided across 1000 partitions), yet not small enough for a broadcast join.High Sorting Overhead:
When joining a massive fact table (e.g., 1 TB) with a dimension table that is too big to broadcast but small enough per partition, the cost of sorting the entire dataset (as in SMJ) may dominate and thus SHJ becomes more efficient.
Decision Factors
Estimated Partition Size:
Spark’s optimizer checks if the estimated per-partition size of the smaller table is below a threshold (set viaspark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold
).Configuration and Hints:
Users can guide Spark’s optimizer using hints like/*+ SHUFFLE_HASH(tab) */
or disable sort-merge joins by togglingspark.sql.join.preferSortMergeJoin
.spark.conf.set("spark.sql.join.preferSortMergeJoin","false")
4. How SHJ Works
The execution of a Shuffle Hash Join can be understood through two primary phases, with some literature breaking it into a three-phase model for clarity.
A. Shuffle Phase
Objective:
Bring together all rows associated with a given join key within the same partition.
Process:
Repartitioning:
Both datasets are re-distributed (shuffled) using the join key as the partitioning key. Note that both sides are shuffled – so network cost is still incurred for both datasets.Data Co-location:
Post-shuffle, each partition will hold all the relevant rows for a specific range of join keys.Network I/O:
While shuffling ensures correct join semantics, it incurs the cost of network communication for both datasets.
Example Scenario:
Imagine two datasets, Person
and Address
, initially spread across different partitions. In the shuffle phase, rows with the same key (e.g., A001
) are sent to the same partition. This guarantees that later join operations will have all matching keys available on the same executor.
B. Hash Join Phase
After the shuffle phase, the join is executed within each partition through these steps:
Hash Table Creation:
Selection:
Spark selects the smaller dataset based on statistics or join hints.Building the Hash Table:
For every partition, Spark creates an in-memory hash table that maps join keys to the associated rows.
Probing the Hash Table:
Streaming Data:
The larger dataset’s rows are processed sequentially within the partition.Lookup and Join:
For each row in the larger dataset, the hash table is queried using the join key. If a match exists, Spark produces the joined row as output.
Because no sort is done, if the data per partition is large, the hash table may also be large. Spark assumes the build side will fit in memory. If it doesn’t, the task can spill partitions of the build side to disk (Spark has some support for spilling hash tables, but it is more complex than spilling a sort). In worst cases, an SHJ can run out of memory if the hash table grows too big, causing the executor to OOM. This is why Spark is conservative in using SHJ unless it’s confident the partitions are small enough
Conceptual Diagram:
Imagine a partition where:
The smaller dataset’s partition (say, 5 MB worth of data) is fully loaded into a hash table.
The larger dataset streams through, and for each key, Spark quickly checks the in-memory hash table for corresponding rows.
This operation is performed concurrently across all partitions on different worker nodes.
Alternative Three-Phase View
For some, a detailed three-phase breakdown clarifies the process:
Shuffle:
Repartition both datasets so that all rows sharing the same join key are co-located.Hash Table Creation:
For each partition, build the in-memory hash table using the smaller dataset.Hash Join:
Join the larger dataset’s partition by probing the hash table.
This view underlines the importance of parallel execution, where each worker node processes its partitions independently, which is key to Spark’s scalability.
5. Supported Join Types
Shuffle Hash Join is designed to work primarily with equi-joins. In Apache Spark, it supports:
Inner Joins:
Only matching rows are returned.Left, Right, Semi, and Anti Joins:
These join types function well as long as the join condition is based on equality.
Additional Notes:
Full Outer Join:
Initially, SHJ did not support full outer joins in Spark 3.0 but was later introduced in Spark 3.1+.Non-equi Joins and Cross Joins:
SHJ does not naturally handle cross joins or non-equi conditions. In such cases, Spark falls back on other, more suitable join strategies.
6. Performance Characteristics & Trade-Offs
Understanding the performance implications of SHJ is critical for designing robust, high-performance Spark jobs.
Advantages
No Sorting Required:
By eliminating the sort step used in SMJ, SHJ significantly reduces CPU overhead.
Efficient CPU Usage:
Hash functions and probing operations are generally less costly than sorting large datasets.
Parallel Execution:
The join is processed in parallel across partitions, making it scalable across large clusters.
Considerations and Pitfalls
Memory Sensitivity:
Build Side Dependency:
Every partition on the smaller side must fit in memory. If a partition exceeds available memory, it may cause disk spills or even OOM errors.Configuration Challenges:
Incorrect estimations or misconfigured thresholds can lead to failures. Monitoring and adjusting Spark’s parameters is essential.
Data Skew:
Uneven Distribution:
A heavily skewed join key might result in one partition holding a disproportionate amount of data, dramatically increasing memory requirements for that partition.Mitigation Strategies:
Use techniques like increasing the number of shuffle partitions (viaspark.sql.shuffle.partitions
) or applying custom salting techniques.
Network I/O:
While SHJ saves on CPU cycles, it does not reduce the network cost of shuffling. If your workload is network-bound, the benefits of SHJ may be limited.
Fallback and Spilling:
If the hash table grows too large, Spark may attempt to spill data to disk. However, disk spilling is less efficient and can severely impact performance.
7. SHJ Compared to Other Join Strategies
A clear comparison can help decide when to use SHJ over other join methods:
AspectBroadcast Hash Join (BHJ)Sort Merge Join (SMJ)Shuffle Hash Join (SHJ)When to UseVery small tables (typically <10 MB by default)Large tables where sorting is tolerableModerately small build side that cannot be broadcast; avoid sorting overheadSorting RequirementNo sorting; smaller dataset is broadcastedSorting required across partitionsNo sorting within partitions; uses in-memory hash tableMemory ImpactMinimal memory impact on executorsUses more CPU for sortingRequires sufficient memory per partition for hash tablesNetwork CostMinimal network I/O (broadcast eliminates shuffle)High network I/O due to data shufflingSame network cost as SMJ
Key Takeaways:
BHJ is best when the smaller table is extremely small.
SMJ is a general-purpose join that is robust for large datasets.
SHJ strikes a balance by avoiding the heavy sorting cost when the per-partition memory size is manageable.
Shuffle hash join over sort-merge join
In most cases Spark chooses sort-merge join (SMJ) when it can’t broadcast tables. Sort-merge joins are the most expensive ones. Shuffle-hash join (SHJ) has been found to be faster in some circumstances (but not all) than sort-merge since it does not require an extra sorting step like SMJ. There is a setting that allows you to advise Spark that you would prefer SHJ over SMJ, and with that Spark will try to use SHJ instead of SMJ wherever possible. Please note that this does not mean that Spark will always choose SHJ over SMJ. We are simply defining your preference for this option.
set spark.sql.join.preferSortMergeJoin = false
Databricks Photon engine also replaces sort-merge join with shuffle hash join to boost the query performance.
Setting the
preferSortMergeJoin
config option to false for each job is not necessary. For the first execution of a concerned job, you can leave this value to default (which is true).
If the job in question performs a lot of joins, involving a lot of data shuffling and making it difficult to meet the desired SLA, then you can use this option and change the
preferSortMergeJoin
value to false
8. Configuration and Tuning Best Practices
Optimizing SHJ involves careful configuration and continuous monitoring. Below are some best practices.
A. Adaptive Query Execution (AQE)
What is AQE?
Adaptive Query Execution dynamically adapts the physical plan based on runtime statistics. With Spark 3.x, AQE can convert a sort-merge join to a shuffle hash join if it detects that partition sizes are favorable.
Configuration Example:
// Set AQE threshold such that if post-shuffle partition size is below 64MB, Spark uses SHJ. spark.conf.set("spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold", "64MB")
This dynamic adjustment helps balance between CPU use and memory load without manual intervention.
B. Join Hints and Configurations
Explicit Hints:
When you know the data characteristics, you can direct Spark to use SHJ via hints:
// Using a hint to explicitly request a Shuffle Hash Join val dfJoined = factTable.join(dimensionTable.hint("SHUFFLE_HASH"), "joinKey") dfJoined.explain() // The physical plan should show ShuffledHashJoin
Disabling SMJ Preference:
For cases where SHJ is preferred over SMJ, you can adjust the setting as follows:
// Tell Spark to favor hash-based join strategies over sort-merge join. spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
C. Monitoring and Debugging
Using the Spark UI:
Partition Metrics:
Monitor the size and distribution of shuffle partitions to ensure they meet expected thresholds.Task Execution Details:
Observe tasks’ memory usage and CPU times. Unexpected OOM errors or high spill metrics may indicate misconfigured thresholds or skewed data.
Log Analysis:
AQE Logs:
When AQE is enabled, logs will show if the join strategy was dynamically switched.Executor Logs:
Pay attention to memory allocation logs and warnings about data spills.
9. Practical Example
Let’s consider a real-world scenario to solidify our understanding. Suppose you are joining a large fact table with a moderately sized dimension table:
Fact Table: ~1 TB of transactional data.
Dimension Table: ~5 GB of reference data.
Rationale:
Broadcasting a 5 GB table is infeasible in this scenario, but if you partition the 5 GB table into 1000 slices, each partition is only about 5 MB. This makes it an ideal candidate for a shuffle hash join.
Implementation Example in Spark (Scala):
// Assuming factTable and dimensionTable are pre-defined DataFrames val dfJoined = factTable.join( dimensionTable.hint("SHUFFLE_HASH"), Seq("joinKey") // Using column(s) that define the join condition ) // Explain the plan to verify the join strategy dfJoined.explain(true) // Expected outcome: // The physical plan should display an operator "ShuffledHashJoin" // indicating that Spark is using SHJ for the join.
What to Look For:
Physical Plan Inspection:
Look for theShuffledHashJoin
operator in the explain plan output.Resource Usage:
Monitor executor memory usage and check that each partition from the smaller dimension table fits within the allotted memory, avoiding spills or OOM errors.
ShuffledHashJoin [id1#3], [id2#8], Inner, BuildRight
:- Exchange hashpartitioning(id1#3, 200)
: +- LocalTableScan [id1#3]
+- Exchange hashpartitioning(id2#8, 200)
+- LocalTableScan [id2#8]
10. Databricks platform specific insights
Databricks generally relies on BHJ and SMJ under the hood, and uses SHJ in a more limited, adaptive way. Under AQE, Databricks might start a join as a sort-merge join but then convert it to a shuffled hash join at runtime if it finds that each partition’s size is below a threshold (and thus can fit in memory).
This is an optimization: Spark saves the cost of sorting when it realizes it wasn’t needed. By default, this conversion is off (threshold = 0) on vanilla Spark 3.2, but Databricks may enable it or allow setting it. If using hints, you can explicitly ask for a SHJ: e.g., .hint("SHUFFLE_HASH")
in DataFrame API or SQL hints. This can be useful if you know one side is moderately small but Spark’s stats are missing. Always ensure that the hint-targeted side will be small per partition; otherwise, you might get memory errors.
Databricks’ strong skew mitigation helps SHJ as well – if one partition is skewed and would OOM an SHJ, AQE’s skew join handling could split that partition and even fall back to a sort-merge or a replicated join for that partition if necessary. Also, note that Photon (Databricks’ vectorized engine) has an improved hashed join implementation that can spill gracefully and use multiple threads per join, which makes SHJ more viable for large data in Photon. In standard Spark, SHJ is single-threaded per task for the join itself (just like SMJ merge is single-threaded per task).
11. Conclusion
Shuffle Hash Join (SHJ) provides a balanced approach by eliminating the high cost of sorting that is present in Sort Merge Joins, while sidestepping the broadcast size limitations of Broadcast Hash Joins. By shuffling data to co-locate matching join keys and then using an in-memory hash table to perform the join, SHJ offers:
Improved CPU efficiency due to reduced sorting overhead.
Scalability when the smaller dataset can be effectively partitioned.
A flexible mechanism that can adapt to runtime data sizes through AQE.
However, SHJ requires meticulous tuning and monitoring:
Memory Utilization:
Ensure that each partition’s hash table fits in memory.Data Skew:
Address uneven data distributions to prevent performance bottlenecks.Network Costs:
Understand that while CPU usage may decrease, shuffling still incurs network overhead.
By leveraging configuration settings, join hints, and adaptive query execution, data engineers can optimize their Spark workloads using SHJ. This detailed understanding equips you with the knowledge to carefully evaluate when SHJ is the right tool for your data joining needs, ensuring robust and efficient Spark application performance.
Further Reading
For more in-depth information and the latest updates on Spark join optimizations, the following resources are highly recommended:
Apache Spark Official Documentation – SQL Performance Tuning: Covers join strategy hints, adaptive execution, etc. (See “Join Strategy Hints” and “Adaptive Query Execution” in the Spark docs)
Tuning Spark SQL queries for AWS Glue and Amazon EMR Spark jobs
Apache Spark Official Documentation – Adaptive Query Execution (AQE): Detailed explanation of AQE features like converting SMJ to BHJ/SHJ and skew join handling
Databricks Documentation – Join Hints & Optimizations: Databricks-specific docs on join strategies, including the
SKEW
andRANGE
hints, and how AQE is used on Databricks“How Databricks Optimizes Spark SQL Joins” – Medium (dezimaldata): A blog post (Aug 2023) summarizing Databricks’ techniques like CBO, AQE, range join and skew join optimizations
Spark Summit Talks on Joins and AQE: Videos like “Optimizing Shuffle Heavy Workloads” or “AQE in Spark 3.0” (by Databricks engineers) for a deeper understanding of the internals of join execution and tuning.
https://spark.apache.org/docs/latest/sql-performance-tuning.html#join-strategy-hints
https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution
Top 5 Mistakes That Make Your Databricks Queries Slow (and How to Fix Them)
By consulting these materials, you can deepen your understanding of Spark join mechanisms and keep up to date with the evolving best practices on the Databricks platform.