At the recent Spark AI Summit 2020, held online for the first time, the highlights of the event were innovations to improve Apache Spark 3.0 performance, including optimizations for Spark SQL, and NVIDIA GPU acceleration.
In the opening keynote, CTO Matei Zaharia discussed that 90% of Spark API calls run via the Spark SQL engine, and as a result the Apache Spark community has invested 46% of the Spark patches on improving Spark SQL. Spark 3.0 is roughly 2x faster than Spark 2.4 (using TPC-DS) enabled by adaptive query execution, dynamic partition pruning, and other optimizations.
Spark 2.2 added cost-based optimization to the existing rule based SQL Optimizer. Spark 3.0 now has runtime adaptive query execution(AQE). With AQE, runtime statistics retrieved from completed stages of the query plan are used to re-optimize the execution plan of the remaining query stages. Databricks benchmarks yielded speed-ups ranging from 1.1x to 8x when using AQE.
Spark 3.0 AQE optimization features include:
- Dynamically coalesce shuffle partitions: In Spark the number of Partitions correspond to the number of tasks; for optimal parallelization you want partitions to be balanced and not too big nor too small. Partitions are data dependent and vary from stage to stage. AQE can combine adjacent small partitions into bigger partitions at runtime by looking at the shuffle file statistics, reducing the number of tasks for query aggregations.
- Dynamically switch join strategies: AQE can optimize the join strategy at runtime based on the join relation size. For example, AQE can change a sort merge join to a broadcast hash join which performs better if one side of the join is small enough to fit in memory.
- Dynamically optimize skew joins: A common issue when working with large datasets is dealing with data skew in sort-merge joins which can significantly slow down query performance. Dynamically optimized skew joins in Spark 3.0 can detect skew from partition sizes using runtime statistics and split skew partitions into smaller sub-partitions.
Spark 2.x static predicate pushdown and partition pruning is a performance optimization that limits the number of files and partitions that Spark reads when querying. After partitioning the data, queries that match certain partition filter criteria improve performance by allowing Spark to only read a subset of the directories and files. Spark 3.0 dynamic partition pruning allows the Spark engine to dynamically infer, at runtime, the specific partitions within a table that need to be read and processed for a specific query, by identifying the partition column values that result from filtering another table in a join.
In the Deep Dive into GPU Support in Apache Spark 3.x session, Robert Evans and Jason Lowe gave an overview of accelerator aware scheduling, and the RAPIDS Accelerator for Apache Spark, which enables GPU accelerated SQL/DataFrame operations and Spark shuffles with no code change.
GPUs have been widely used for accelerating deep learning, but not for data processing. As part of Project Hydrogen, a major Spark initiative to better unify deep learning and data processing on Spark, GPUs are now a schedulable resource in Apache Spark 3.0. This allows Spark to schedule executors with a specified number of GPUs, and users can specify how many GPUs each task requires. Spark conveys these resource requests to the underlying cluster manager, Kubernetes, YARN, or Standalone. Users can also configure a discovery script that can detect which GPUs were assigned by the cluster manager. This greatly simplifies running ML applications that need GPUs, as previously users were required to work around the lack of GPU scheduling in Spark applications.
Spark 3.0 supports SQL optimizer plugins to process data using columnar batches rather than rows. Columnar data is GPU-friendly, and this feature is what the RAPIDS Accelerator plugs into to accelerate SQL and DataFrame operators. With the RAPIDS accelerator, the Catalyst query optimizer has been modified to identify operators within a query plan that can be accelerated with the RAPIDS API, mostly a one-to-one mapping, and to schedule those operators on GPUs within the Spark cluster when executing the query plan.
Spark operations which sort, group or join data by value, have to move data between partitions, when creating a new DataFrame from an existing one between stages, in a process called a shuffle which involves disk I/O, data serialization, and network I/O. The new RAPIDS Accelerator shuffle implementation leverages UCX to optimize GPU data transfers keeping as much data on the GPU as possible, finding the fastest path to move data between nodes, by using the best of available hardware resources, including bypassing the CPU to do GPU to GPU memory intra and inter node transfers.
Travis Addair and Thomas Graves discussed ETL and DL in a single pipeline in the End-to-End Deep Learning with Horovod on Apache Spark session. Horovod, a distributed deep learning training framework for TensorFlow, Keras, PyTorch, and Apache MXNet, added support for Spark 3.0 and GPU scheduling. Horovod also added a new KerasEstimator class that uses Spark Estimators with Spark ML Pipelines for better integration with Spark and ease of use. This enables TensorFlow and PyTorch models to be trained directly on Spark DataFrames, leveraging Horovod’s ability to scale to hundreds of GPUs in parallel, without any specialized code for distributed training. With the new accelerator aware scheduling and columnar processing APIs in Apache Spark 3.0, a production ETL job can hand off data to Horovod running distributed deep learning training on GPUs within the same pipeline.
Rong Ou and Bobby Wang discussed XGBoost innovations in the Scalable Acceleration of XGBoost Training on Apache Spark GPU Clusters session. XGBoost is a scalable, distributed gradient-boosted decision tree (GBDT) ML library. XGBoost is now integrated with the Rapids Accelerator for Spark 3.0 to achieve end-to-end GPU acceleration of: Spark SQL/DataFrame operations and XGBoost training time with in-memory optimally stored features based on the sparsity of a dataset, and improved GPU memory utilization.
The full schedule with videos and many session slides can be found on the Spark AI Summit 2020 page. Details on new Spark 3.0 features can be found in the Spark 3.0 release notes. To access the RAPIDS Accelerator for Apache Spark visit the nvidia/spark-rapids GitHub repo. Databricks recently announced the availability of Spark 3.0.0 as part of Databricks Runtime 7.0 and Spark 3.0.0 with preconfigured GPU-acceleration as part of Databricks Runtime 7.0 for Machine Learning, which is also available on Microsoft Azure and AWS. Google Cloud recently announced the availability of a Spark 3.0 preview on Dataproc image version 2.0.