XGBoost Distributed Training and Parallel Predictions with Apache Spark

Image for post

Background

In Boosting (ML ensemble method), algorithms implement a sequential process (as opposed to Bagging where it is parallelised) that generates weak learners and combine them to a strong learner (as in all Ensemble methods). In Boosting, at each iteration of this process, the model tries to correct the mistakes of the previous one in adaptive way — Unlike Bagging in which weak learners are trained independently.

Image for post

One of the Boosting algorithms, Gradient Boosting, is using the gradient descent to minimise the loss function directly in these sequential models (As opposed to another boosting algorithm, AdaBoosting, where fitting is achieved by changing weights of training instances)

The weak learners created in Gradient boosting as part of training are usually implemented with decision trees. The major inefficiency in Gradient Boosting is that the process of creating these trees is sequential — i.e. it creates one decision tree at a time.

To overcome that, an extension of Gradient Boosting was introduced (by Tianqi Chen and Carlos Guestrin) named XGBoost which stands for Extreme Gradient Boosting. It’s kind of Gradient Boosting on steroids and is used for mainly classification, but also regression and ranking.

It introduces lots of performance enhancements through hyper-parameters, GPU support, cross validation capabilities & algorithm regularisations over the tradition Gradient Boosting to make the overall model more efficient, faster to train and less prune to overfitting.

XGboost became popular in the last years and and won many of the machine learning competitions at Kaggle and considered to have more computational power and accuracy.

Image for post


XGBoost with Apache Spark

A common workflow in ML is to utilize systems like Spark to construct ML Pipeline in which you preprocess and clean data, and pass the results to the machine learning phase, usually with Spark MLlib once you already use Spark.

In the context of this article the important feature XGBoost introduces is parallelism for the tree building — it essentially enables distributed training and predicting across nodes. This means that if I am an Apache Spark MLlib user or company, i could as well use it to empower XGBoost training and serving in a production grade way and enjoy both the high performant algorithm of XGboost and Spark’s powerfull processing engine for feature engineering and constructing ML pipelines.


Meet XGBoost4J-Spark — a project that integrates XGBoost and Apache Spark by fitting XGBoost to Apache Spark’s MLlIB framework.

XGBoost4J-Spark makes it possible to construct a MLlib pipeline that preprocess data to fit for XGBoost model, train it and serve it in a distributed fashion for predictions in production. With this library each XGBoost worker is wrapped by a Spark task and the training dataset in Spark’s memory space is sent to XGBoost workers that live inside the spark executors in a transparent way.

Image for post


To write a ML XGBoost4J-Spark application you first need to include its dependency:

<dependency>
    <groupId>ml.dmlc</groupId>
    <artifactId>xgboost4j-spark</artifactId>
    <version>0.90</version>
</dependency>

Data Preparation (Iris example)

As said, XGBoost4J-Spark enables fitting of data to the XGBoost interface.

Once we have the Iris dataset read into a DataFrame we need to:

  1. Transform String-typed label cols to Double-typed label.
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssemblerval stringIndexer = new StringIndexer().
  setInputCol("class").
  setOutputCol("classIndex").
  fit(irisDF)val labelTransformed = stringIndexer.transform(irisDF).drop("class")
val vectorAssembler = new VectorAssembler().
  setInputCols(Array("sepal length", "sepal width", "petal length", "petal width")).
  setOutputCol("features")val xgbInput = vectorAssembler.transform(labelTransformed).select("features", "classIndex")

The above results in a DataFrame with only two columns, “features”: vector-representing the iris features and “classIndex”: Double-typed label. A DataFrame like this can be fed to XGBoost4J-Spark’s training engine directly.

Distributed Training

import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier

val xgbClassifier = new XGBoostClassifier().
      setFeaturesCol("features").
      setLabelCol("classIndex").
      setObjective("multi:softmax")
      setMaxDepth(2).
      setNumClass(3).
      setNumRound(100).
      setNumWorkers(10).

for a full list of XGBoost parameters see here. Note that in XGBoost4J-Spark you can use also the camel case format as seen above.

Notes:

  1. multi:softmax objective means we are doing multiclass classification using softmax. This requires to set the number of classes using the num_class param.

2. max_depth is the maximum depth of a tree created in each boosting iteration. Increasing this value will make the model more complex and more likely to overfit. XGBoost consumes lots of memory when training deep trees.

3. num_rounds is the number of rounds for boosting.

4. The num_workers parameter controls how many parallel workers we want to have when training a XGBoostClassificationModel. This will later on translated to Spark pending tasks which in term will be handled by the cluster manager (YARN in most cases).

Early Stopping is supported using the num_early_stopping_rounds and maximize_evaluation_metrics parameters.

Now we can create the transformer by fitting XGBoost Classifier with the input DataFrame. This is essentially the training process that yields the model that can be used in prediction.

val xgbClassificationModel = xgbClassifier.fit(xgbInput)

Parallel Prediction

XGBoost4j-Spark supports batch prediction and single instance prediction.

For batch prediction, the model takes a DataFrame with a column containing feature vectors, predict for each feature vector, and output a new DataFrame with the results. In this process XGBoost4J-Spark starts a Spark task containing a XGBoost worker for each partition of the input DataFrame for parallel prediction of the batch.

val predictionsDf = xgbClassificationModel.transform(inputDF)
predictionsDf.show()+----------------+----------+-------------+-------------+----------+
|       features |classIndex|rawPrediction| probability |prediction|
+----------------+----------+-------------+-------------+----------+
|[5.1,3.5,1.2,.. |       0.0|[3.4556984...|[0.9957963...|       0.0|
|[4.7,3.2,1.3,.. |       0.0|[3.4556984...|[0.9961891...|       0.0|
|[5.7,4.4,1.5,.. |       0.0|[3.4556984...|[0.9964334...|       0.0|
+----------------+----------+-------------+-------------+----------+

For single prediction, the model accepts a single Vector.

val features = xgbInput.head().getAs[Vector]("features")
val result = xgbClassificationModel.predict(features)

Single prediction with XGBoost is not recommended because of the overhead that will be triggered internally compared with the just one prediction.


The latest release (0.9) of XGBoost’s XGBoost4J-Spark now requires Spark 2.4.x. mainly because it uses facilities of org.apache.spark.ml.param.shared which are not fully available on earlier versions of Spark.

This version also includes more consistent handling of missing values, better performance for multi-core CPUs, better control for caching partitioned training data to reduce training time and more.

for more information about XGBoost check out the docs.

References:

  1. XGBoost with CUDA

Gradient Boosting, Decision Trees and XGBoost with CUDA | NVIDIA Developer Blog

Gradient boosting is a powerful machine learning algorithm used to achieve state-of-the-art accuracy on a variety of…

devblogs.nvidia.com

2. XGBoost in spark with GPU with RAPIDS XGboost4J-Spark