Sustainable data preprocessing with pipelines
Early in my modeling of the Titanic dataset (a kind of “Hello World” for machine learning), I was struck by the variety - or inconsistency - of data preprocessing methods.
For instance, the Titanic Tutorial on Kaggle uses pd.get_dummies()
to convert categorical column to numeric values, and there are many examples of more complex feature engineering methods on this dataset, e.g. here and here.
The problem is that various functions or methods act on DataFrame columns to create temporary variables, and columns are added, dropped or retained, all at different locations in the code.
This approach encourages the growth of spaghetti code rather than the Pythonic ideal of readable, self-documenting code.
I was introduced to ColumnTransformer in the Springboard ML engineering bootcamp notebook, started to read more about it on Geeks for Geeks, and found a useful application to the Titanic dataset on Joris Van den Bossche’s blog.
As pointed out in that page, there’s more to ColumnTransformer()
than just a consistent interface to data preprocessing; it can also be used in a scikit-learn pipeline in order to optimize the parameters of the preprocessing steps, such as whether to use mean or median to impute missing values!
This notebook starts with a minimal example of data preprocessing with ColumnTransformer()
that one-hot-encodes a categorical variable, then develops a pipeline that enables a grid search to optimize preprocessing steps as well as hyperparameters of a Random Forest classifier.
Basic imports, read the data and target values
import numpy as np
import pandas as pd
# Read training and test datasets
train_data = pd.read_csv("../input/titanic/train.csv")
#test_data = pd.read_csv("../input/titanic/test.csv")
# Get the target variable in the training data
target_col = "Survived"
y_train = train_data[target_col]
train_data.dtypes
PassengerId int64
Survived int64
Pclass int64
Name object
Sex object
Age float64
SibSp int64
Parch int64
Ticket object
Fare float64
Cabin object
Embarked object
dtype: object
Preprocessing categorical data with pandas
This simple example is taken from the Titanic Tutorial on Kaggle.
Here, pd.get_dummies()
one-hot encodes a non-numeric variable (Sex).
Because we’re not going to use the test data until the very end (after fitting the pipeline to the training data), it’s commented out here.
features = ["Pclass", "Sex", "SibSp", "Parch"]
X_train = pd.get_dummies(train_data[features])
#X_test = pd.get_dummies(test_data[features])
X_train.head()
Pclass | SibSp | Parch | Sex_female | Sex_male | |
---|---|---|---|---|---|
0 | 3 | 1 | 0 | False | True |
1 | 1 | 1 | 0 | True | False |
2 | 3 | 0 | 0 | True | False |
3 | 1 | 1 | 0 | True | False |
4 | 3 | 0 | 0 | False | True |
Preprocessing categorical data with ColumnTransformer
Now let’s do the same thing with functions from scikit-learn. This adds a bit of up-front overhead, but when we start adding more processing steps it will become a cleaner codebase.
This instantiates ColumnTransformer()
instance then fits to the training data and perform the operations with the .fit_transform()
method.
If we were to process the test data now, we would use the .transform()
method, ensuring no data leakage.
For nicer display of the results, the call to .set_output()
is used to output a pandas DataFrame (see this StackOverflow post).
Because pandas output does not support sparse data, this also requires sparse_output=False to be passed to OneHotEncoder()
.
# Import modules to preprocess data
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
preprocess = ColumnTransformer(transformers=[
('onehot', OneHotEncoder(drop='first', sparse_output=False), ['Sex']),
], remainder='passthrough')
preprocess.set_output(transform='pandas')
# Fit and transform the data
X_train = preprocess.fit_transform(train_data[features])
#X_test = preprocess.transform(test_data)
X_train.head()
onehot__Sex_male | remainder__Pclass | remainder__SibSp | remainder__Parch | |
---|---|---|---|---|
0 | 1.0 | 3 | 1 | 0 |
1 | 0.0 | 1 | 1 | 0 |
2 | 0.0 | 3 | 0 | 0 |
3 | 0.0 | 1 | 1 | 0 |
4 | 1.0 | 3 | 0 | 0 |
Preprocessing different data types
Here we standardize numerical variables (Age and Fare) before adding them to the features. We also add a couple more categorical variables (Pclass and Embarked).
Because we to deal with NA values, multiple preprocessing steps are needed. In the first step verbose_feature_names_out=False prevents prefixing column names with the names of the transformers, so that the original columns names are passed to the next step. Also, remainder=’passthrough’ is required to keep all the features around for the next step.
Here we use make_column_transformer()
as a convenience function that doesn’t require explicit names for column prefixes.
The order of arguments within each transformer is modified from Joris Van den Bossche’s blog for compatibility with the current version of scikit-learn (1.5.2 at the time this notebook was written).
The KBinsDiscretizer()
transformer is used to bin the Fare values and is a replacement for the pandas qcut()
function.
# Import modules to preprocess data
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, KBinsDiscretizer
from sklearn.compose import make_column_transformer
preprocess1 = make_column_transformer(
(SimpleImputer(), ['Age', 'Fare']),
(SimpleImputer(strategy='most_frequent'), ['Embarked']),
remainder='passthrough',
verbose_feature_names_out=False
)
preprocess2 = make_column_transformer(
(StandardScaler(), ['Age']),
(KBinsDiscretizer(encode='ordinal'), ['Fare']),
(OneHotEncoder(sparse_output=False), ['Pclass', 'Sex', 'Embarked']),
)
preprocess1.set_output(transform='pandas')
preprocess2.set_output(transform='pandas')
# Fit and transform the data
features = ['Pclass', 'Sex', 'Age', 'Fare', 'Embarked']
X_train = preprocess1.fit_transform(train_data[features])
X_train = preprocess2.fit_transform(X_train)
#X_test = preprocess1.transform(test_data[features])
#X_test = preprocess2.transform(X_test)
# Show sums of categories
X_train.iloc[:, 2:].sum()
onehotencoder__Pclass_1 216.0
onehotencoder__Pclass_2 184.0
onehotencoder__Pclass_3 491.0
onehotencoder__Sex_female 314.0
onehotencoder__Sex_male 577.0
onehotencoder__Embarked_C 168.0
onehotencoder__Embarked_Q 77.0
onehotencoder__Embarked_S 646.0
dtype: float64
Using pipelines
Already in the last block there is a problem growing. We had to fit and transform multiple times. With this there’s a growing possibility of accidentally copy-pasting .fit_transform()
to the test data. To prevent data leakage, only .transform()
should be used on the test data. Enter pipelines. They encapsulate a sequence of any number of transformations as well as an estimator for classification or regression. Besides adding a level of safety against using test samples for training, pipelines offer the ability to perform grid search on the parameters of the transformations.
Let’s see the previous precprocessing steps set up in a pipeline. For simplicity, this example uses make_pipeline()
; just as with make_column_transformer()
used above, this avoids the need to give names to each transformation step. Also, we’re stripping out all but the essential non-default parameter values; strategy=’most_frequent’ gets SimpleImputer()
to work on non-numeric data.
Notice how the list of features is implicitly coded in the pipeline itself. There’s no compelling reason now to subset the features from the data before running the pipeline. And with more confidence in using the transformers, pandas output is not needed for inspecting the output, so we take out the transform=’pandas’ and sparse_output=False settings.
from sklearn.pipeline import make_pipeline
preprocess = make_column_transformer(
(make_pipeline(
SimpleImputer(),
StandardScaler()), ['Age']),
(make_pipeline(
SimpleImputer(),
KBinsDiscretizer()), ['Fare']),
(make_pipeline(
SimpleImputer(strategy='most_frequent'),
OneHotEncoder()), ['Embarked']),
(make_pipeline(
OneHotEncoder()), ['Pclass', 'Sex']),
)
X_train = preprocess.fit_transform(train_data)
#X_test = preprocess.transform(test_data)
print(X_train.shape)
preprocess
(891, 14)
ColumnTransformer(transformers=[('pipeline-1', Pipeline(steps=[('simpleimputer', SimpleImputer()), ('standardscaler', StandardScaler())]), ['Age']), ('pipeline-2', Pipeline(steps=[('simpleimputer', SimpleImputer()), ('kbinsdiscretizer', KBinsDiscretizer())]), ['Fare']), ('pipeline-3', Pipeline(steps=[('simpleimputer', SimpleImputer(strategy='most_frequent')), ('onehotencoder', OneHotEncoder())]), ['Embarked']), ('pipeline-4', Pipeline(steps=[('onehotencoder', OneHotEncoder())]), ['Pclass', 'Sex'])])In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
ColumnTransformer(transformers=[('pipeline-1', Pipeline(steps=[('simpleimputer', SimpleImputer()), ('standardscaler', StandardScaler())]), ['Age']), ('pipeline-2', Pipeline(steps=[('simpleimputer', SimpleImputer()), ('kbinsdiscretizer', KBinsDiscretizer())]), ['Fare']), ('pipeline-3', Pipeline(steps=[('simpleimputer', SimpleImputer(strategy='most_frequent')), ('onehotencoder', OneHotEncoder())]), ['Embarked']), ('pipeline-4', Pipeline(steps=[('onehotencoder', OneHotEncoder())]), ['Pclass', 'Sex'])])
['Age']
SimpleImputer()
StandardScaler()
['Fare']
SimpleImputer()
KBinsDiscretizer()
['Embarked']
SimpleImputer(strategy='most_frequent')
OneHotEncoder()
['Pclass', 'Sex']
OneHotEncoder()
Using custom functions
A common feature engineering task is to create new features as combinations of existing ones.
For example, family size computed from the SibSp and Parch columns in the Titanic dataset.
We add this step to the pipeline by writing our own function and wrapping it in FunctionTransformation()
.
The new transformation works on entire DataFrames rather than single columns, so is is placed before make_column_transformer()
in the pipeline.
The column transformer then mentions the newly created columns, in this case simply to pass FamilySize through to the output.
from sklearn.preprocessing import FunctionTransformer
def AddFamilySize(X):
X['FamilySize'] = X['SibSp'] + X['Parch']
return X
preprocess = make_pipeline(
FunctionTransformer(AddFamilySize),
make_column_transformer(
(make_pipeline(
SimpleImputer(),
StandardScaler()), ['Age']),
(make_pipeline(
SimpleImputer(),
KBinsDiscretizer()), ['Fare']),
(make_pipeline(
SimpleImputer(strategy='most_frequent'),
OneHotEncoder()), ['Embarked']),
(make_pipeline(
OneHotEncoder()), ['Pclass', 'Sex']),
('passthrough', ['FamilySize'])
)
)
preprocess
Pipeline(steps=[('functiontransformer', FunctionTransformer(func=<function AddFamilySize at 0x7fd82d461ee0>)), ('columntransformer', ColumnTransformer(transformers=[('pipeline-1', Pipeline(steps=[('simpleimputer', SimpleImputer()), ('standardscaler', StandardScaler())]), ['Age']), ('pipeline-2', Pipeline(steps=[('simpleimputer', SimpleImputer()), ('kbinsdiscretizer', KBinsDiscretizer())]), ['Fare']), ('pipeline-3', Pipeline(steps=[('simpleimputer', SimpleImputer(strategy='most_frequent')), ('onehotencoder', OneHotEncoder())]), ['Embarked']), ('pipeline-4', Pipeline(steps=[('onehotencoder', OneHotEncoder())]), ['Pclass', 'Sex']), ('passthrough', 'passthrough', ['FamilySize'])]))])In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
Pipeline(steps=[('functiontransformer', FunctionTransformer(func=<function AddFamilySize at 0x7fd82d461ee0>)), ('columntransformer', ColumnTransformer(transformers=[('pipeline-1', Pipeline(steps=[('simpleimputer', SimpleImputer()), ('standardscaler', StandardScaler())]), ['Age']), ('pipeline-2', Pipeline(steps=[('simpleimputer', SimpleImputer()), ('kbinsdiscretizer', KBinsDiscretizer())]), ['Fare']), ('pipeline-3', Pipeline(steps=[('simpleimputer', SimpleImputer(strategy='most_frequent')), ('onehotencoder', OneHotEncoder())]), ['Embarked']), ('pipeline-4', Pipeline(steps=[('onehotencoder', OneHotEncoder())]), ['Pclass', 'Sex']), ('passthrough', 'passthrough', ['FamilySize'])]))])
FunctionTransformer(func=<function AddFamilySize at 0x7fd82d461ee0>)
ColumnTransformer(transformers=[('pipeline-1', Pipeline(steps=[('simpleimputer', SimpleImputer()), ('standardscaler', StandardScaler())]), ['Age']), ('pipeline-2', Pipeline(steps=[('simpleimputer', SimpleImputer()), ('kbinsdiscretizer', KBinsDiscretizer())]), ['Fare']), ('pipeline-3', Pipeline(steps=[('simpleimputer', SimpleImputer(strategy='most_frequent')), ('onehotencoder', OneHotEncoder())]), ['Embarked']), ('pipeline-4', Pipeline(steps=[('onehotencoder', OneHotEncoder())]), ['Pclass', 'Sex']), ('passthrough', 'passthrough', ['FamilySize'])])
['Age']
SimpleImputer()
StandardScaler()
['Fare']
SimpleImputer()
KBinsDiscretizer()
['Embarked']
SimpleImputer(strategy='most_frequent')
OneHotEncoder()
['Pclass', 'Sex']
OneHotEncoder()
['FamilySize']
passthrough
Adding an estimator and grid search
Let’s wrap it all up with a pipeline that combines preprocessing and estimation.
We do a little more feature engineering to create new features (Title and Deck) from Name and Cabin, respectively.
These steps are added to the front of the pipeline and the new features are one-hot encoded in the column transformer.
Because of NA values in Cabin, another SimpleImputer()
is added.
While we’re at it, let’s add an option to take the logarithm of the new FamilySize column.
Next, we append an estimator to the pipeline, in this case Random Forest, and then set up a grid search over some of the preprocessing parameters.
This pipeline is initialized with empty processing steps (None) in place of transformers for Age and Fare.
The grid search is used to decide whether using StandardScaler()
or KBinsDiscretizer()
(or nothing) on these variables improves the model.
We also investigate the effect of the drop and max_categories arguments of OneHotEncoder()
and the strategy (mean or median) for SimpleImputer()
.
The argument name that ends with kw_args is how we pass parameters from the grid search to our custom function, AddFamilySize()
.
The code block below does not depend on the previous blocks, so you can copy it to start a new notebook!
import numpy as np
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, KBinsDiscretizer, FunctionTransformer
from sklearn.compose import make_column_transformer
from sklearn.pipeline import make_pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
def AddFamilySize(X, log=False):
FamilySize = X['SibSp'] + X['Parch']
if log:
FamilySize = np.log1p(FamilySize)
X['FamilySize'] = FamilySize
return X
def AddTitle(X):
Title = np.array(['Other'] * X.shape[0])
Title[X['Name'].str.contains('Mr')] = 'Mr'
Title[X['Name'].str.contains('Mrs')] = 'Mrs'
Title[X['Name'].str.contains('Mme')] = 'Mrs'
Title[X['Name'].str.contains('Miss')] = 'Miss'
Title[X['Name'].str.contains('Mlle')] = 'Miss'
Title[X['Name'].str.contains('Ms')] = 'Miss'
X['Title'] = Title
return X
def AddDeck(X):
Deck = X.Cabin.str[0]
X['Deck'] = Deck
return X
preprocess = make_pipeline(
FunctionTransformer(AddFamilySize),
FunctionTransformer(AddTitle),
FunctionTransformer(AddDeck),
make_column_transformer(
(make_pipeline(
SimpleImputer(),
None), ['Age']),
(make_pipeline(
SimpleImputer(),
None), ['Fare']),
(make_pipeline(
SimpleImputer(strategy='most_frequent'),
OneHotEncoder()), ['Embarked']),
(make_pipeline(
OneHotEncoder()), ['Pclass', 'Sex', 'Title']),
(make_pipeline(
SimpleImputer(strategy='most_frequent'),
OneHotEncoder(handle_unknown='ignore')), ['Deck']),
('passthrough', ['FamilySize'])
)
)
model = make_pipeline(
preprocess,
RandomForestClassifier(n_estimators=50, random_state=1)
)
# Uncomment this see all the parameters that can be adjusted
#print(model.get_params())
param_grid = {
'pipeline__functiontransformer-1__kw_args': [{'log': False}, {'log': True}],
'pipeline__columntransformer__pipeline-1__simpleimputer__strategy': ['mean', 'median'],
'pipeline__columntransformer__pipeline-1__nonetype': [None, StandardScaler()],
'pipeline__columntransformer__pipeline-2__nonetype': [None, KBinsDiscretizer()],
'pipeline__columntransformer__pipeline-3__onehotencoder__drop': [None, 'first'],
'pipeline__columntransformer__pipeline-4__onehotencoder__max_categories': [None, 3, 4],
}
train_data = pd.read_csv("../input/titanic/train.csv")
target_col = "Survived"
X_train = train_data.drop(columns = target_col)
y_train = train_data[target_col]
grid_pre = GridSearchCV(model, param_grid, n_jobs=6)
grid_pre.fit(X_train, y_train)
print("best score from grid search: %f" % grid_pre.best_score_)
grid_pre.best_params_
best score from grid search: 0.815950
{'pipeline__columntransformer__pipeline-1__nonetype': None,
'pipeline__columntransformer__pipeline-1__simpleimputer__strategy': 'mean',
'pipeline__columntransformer__pipeline-2__nonetype': KBinsDiscretizer(),
'pipeline__columntransformer__pipeline-3__onehotencoder__drop': None,
'pipeline__columntransformer__pipeline-4__onehotencoder__max_categories': 3,
'pipeline__functiontransformer-1__kw_args': {'log': True}}
Hyperparameter tuning
Now that we’ve systematically investigated the preprocessing parameters, let’s do some hyperparameter tuning on the classifier. We incorporate the optimized preprocessing parameters obtained above by enclosing them as list items (first line below) and joining them with the estimator parameters to be searched (using the ‘|’ operator).
preprocessor_params = {param: [value] for param, value in grid_pre.best_params_.items()}
estimator_params = {
'randomforestclassifier__n_estimators': [100, 200],
'randomforestclassifier__max_depth': [5, 10],
'randomforestclassifier__min_samples_split': [2, 5],
}
param_grid = preprocessor_params | estimator_params
grid_clf = GridSearchCV(model, param_grid, cv=10, n_jobs=6)
grid_clf.fit(X_train, y_train)
print("best score from grid search: %f" % grid_clf.best_score_)
grid_clf.best_params_
best score from grid search: 0.838390
{'pipeline__columntransformer__pipeline-1__nonetype': None,
'pipeline__columntransformer__pipeline-1__simpleimputer__strategy': 'mean',
'pipeline__columntransformer__pipeline-2__nonetype': KBinsDiscretizer(),
'pipeline__columntransformer__pipeline-3__onehotencoder__drop': None,
'pipeline__columntransformer__pipeline-4__onehotencoder__max_categories': 3,
'pipeline__functiontransformer-1__kw_args': {'log': True},
'randomforestclassifier__max_depth': 5,
'randomforestclassifier__min_samples_split': 5,
'randomforestclassifier__n_estimators': 200}
Wrap-up
Building pipelines improves the readability and maintenance of complex data preprocessing workflows. They can include column transformers for operations such as normalization and one-hot encoding, as well as custom functions that work on entire DataFrames.
I think there are two beautiful things about pipelines:
- All their parameters - including preprocessing and estimator hyperparameters - can be optimized with grid search or other tuning methods. Even the parameters of custom functions can be included in a grid search, as well as the choice of whether to include a particular preprocessing step at all.
- We didn’t touch the test dataset during the construction and fitting of the pipeline. Because the pipeline separates logic from data, there’s no need for dangerous copy-pasting of code that could lead to accidental data leakage.
If you want to make predictions on the test set for a submission to Kaggle, just uncomment and run the following lines after running the previous two code blocks.
# test_data = pd.read_csv("../input/titanic/test.csv")
# predictions = grid_clf.predict(test_data)
# output = pd.DataFrame({'PassengerId': test_data.PassengerId, 'Survived': predictions})
# output.to_csv('submission.csv', index=False)
# print("Your submission was successfully saved!")
Try it out and see how pipelines make your data preprocessing more powerful and easier to maintain!