Once a XGBoost model is trained, we would like to use PySpark for batch predictions.
The method we use here is through Pandas UDF.
udf function, PySpark evaluates it one record at a time, which is the slowest possible way to execute the prediction. While for a
pandas_udf function, it takes a bunch of pandas
Series and returns a
Series, which is vectorised.
Let’s say we have a dataframe
df with following records:
|[149.0, 84.0. 38.5, 79.7]|
|[2.5, 34.0. 35.5, 97.6]|
|[0.0, 65.3. 58.7, 45.7]|
features is the feature column with
ArrayType(FloatType()) and each row of
features contains 4 feature values.
When we call
get_prediction_result function, it receives a
Series object. In this case, it would be
0 [149.0, 84.0, 38.5, 79.7]
We would need to convert it to a
DMatrix object, which is the input for a XGBoost model. We first call its
values method, which yields:
array([list([149.0, 84.0, 38.5, 79.7]), list([2.5, 34.0, 35.5, 97.6]),
Then we call
tolist to convert it into a list of list, yields:
[[149.0, 84.0, 38.5, 79.7], [2.5, 34.0, 35.5, 97.6], [0.0, 65.3, 58.7, 45.7]]
After that, we convert it into a 2D numpy array:
array([[149. , 84. , 38.5, 79.7],
Finally, we initialise a
DMatrix object with the 2D numpy array we just created. We then run the prediction. As a
pandas_udf requires a
Series object to return, we convert the prediction scores into a
Series at the line 16.
pandas_udf function is just like how we use a normal
That’s how we integrate the XGBoost batch prediction into our PySpark pipeline.