Data Pipeline: From PySpark to PyTorch

TL;DR

Uber’s Petastorm library provides a data reader for PyTorch that reads files generated by PySpark. Clone the project from Github for more information.

Introduction

As a data scientist, I spend much time wrangling data than making models, and the data scale from hundreds of millions to billions of pieces of records. Spark has been of great use to me due to its capability to process big data. Usually, I run all the dirty jobs with Spark and generated the-ready-to-use-files for the downstream model training process.

However, there is a gap between Spark and PyTorch, which is the data reader. As Spark runs in parallel, it writes multiple partitions of files by default. It’s painful that PyTorch doesn’t provide a data loader with the support of multiple files out of the box. While PyTorch gives a proper level of customisation, writing a high-efficiency data loader is not easy. Then, I found Petastorm.

Petastorm is an open-source data access library developed by Uber that provides more than a data loader. It supports multiple machine learning frameworks, such as TensorFlow, PyTorch, and PySpark. To install Petastorm, run

1
pip install petastorm

Check out their repository for more details.

In this post, I would like to demonstrate how I process data with PySpark, train a model with PyTorch and fill in the gap in between with Petastorm.

Data Processing: PySpark

We use the famous Iris flower data set as part of the demonstration. We load the CSV file with PySpark and create two new columns: the features column by assembling all the feature vectors and the label column by changing the class string to an integer.

1
2
3
4
5
6
7
8
9
10
11
def transform_iris_data(data: DataFrame):
data = (
data
.withColumn('features',
assemble_features('sepal-length', 'sepal-width', 'petal-length', 'petal-width')
)
.withColumn('label', transform_label('class'))
.select('features', 'label')
)

return data

After that, we split the data frame into the train set and test set.

1
2
3
4
5
6
def split_train_test(
data: DataFrame,
train_ratio: float = 0.7,
test_ratio: float = 0.3
):
return data.randomSplit([train_ratio, test_ratio], seed=42)

Finally, we save the two data frames in parquet format.

1
2
3
4
5
6
7
8
9
10
11
12
(
train
.write
.mode('overwrite')
.parquet(train_path)
)
(
test
.write
.mode('overwrite')
.parquet(test_path)
)

PyTorch: Define Model

We build a simple three-layer network for this easy training task.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def get_linear_model(
input_dim: int = 4,
output_dim: int = 3,
device='cpu'
):
net = nn.Sequential(
nn.Linear(input_dim, 6),
nn.ReLU(),
nn.Linear(6, 4),
nn.ReLU(),
nn.Linear(4, output_dim)
).to(device)

return net

Petastorm: The Data Reader

Now, it’s time to build the data reader. Building a data reader is simple; what we have to do is to call DataLoader and make_batch_reader, passing in the data path, batch size, and the number of the epoch.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def get_data_loader(
data_path: str = None,
num_epochs: int = 1,
batch_size: int = 16
):
if not data_path:
return None

return DataLoader(
make_batch_reader(
dataset_url=data_path,
num_epochs=num_epochs
),
batch_size=batch_size
)

Keep in mind that the dataset_url must consist of the URL scheme. For example, if the data path is /some/localpath/a_dataset, you should pass in file:///some/localpath/a_dataset. Petastorm also supports hdfs:// and s3://, but I haven’t tried them yet.

Model Training

The final step is to train our model.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def train_model(
train_path: str,
device: str = 'cpu',
num_epoch: int = 50
):
# get model
model = get_linear_model()

# define loss function
loss_fun = torch.nn.CrossEntropyLoss()

# define optimiser
opt = torch.optim.Adam(model.parameters(), lr=0.01)

model.train()

for epoch in range(num_epoch):
batch_losses = 0
for batch, data in enumerate(get_data_loader(train_path)):
opt.zero_grad()

features = data['features'].to(device)
label = data['label'].to(device)

y_pred = model(features)
loss = loss_fun(y_pred, label)

loss.backward()
opt.step()

# track batch losses
batch_losses += loss.item()

avg_batch_loss = batch_losses / (batch + 1)
print(f'Epoch {epoch} average loss: {avg_batch_loss}')

return model

Using a Petastorm data loader is just like using a PyTorch data loader: wrap it in a for loop.
Remember that the data frame previously generated by PySpark consists of only two columns? We can retrieve the values by writing data['features'] and data['label']. If everything goes smoothly, you get a model with the following test performance:

1
2
3
4
5
6
7
8
9
                 precision    recall  f1-score   support

Iris-setosa 1.00 1.00 1.00 11
Iris-versicolor 1.00 1.00 1.00 13
Iris-virginica 1.00 1.00 1.00 12

accuracy 1.00 36
macro avg 1.00 1.00 1.00 36
weighted avg 1.00 1.00 1.00 36

Final Words

Petastorm is a reliable and helpful library. Be sure to check out their latest features!