How to productize your machine learning model using Scikit-learn? [2/2]
As we saw in How to productize your machine learning model using Scikit-learn [1/2], it is crucial that production-ready code is versionable, testable, manageable, and integrable. After understanding these concepts and why it is important not to use Jupyter notebooks to productize your model pipeline, let’s look at a practical example of transforming prototype code from data scientists into production-ready code.
We’ll start by exploring the Pipeline
class, discussing its significance and how to effectively use it. Following that, we will briefly cover how to adapt your development code into a Scikit-learn class, where you can encapsulate all your feature engineering steps as methods.
First, let’s understand what a Pipeline is in this context.
1. What is a Scikit-Learn Pipeline?
According to the official documentation, the class sklearn.pipeline.Pipeline
can be summarized as follows:
A Pipeline
allows you to sequentially apply a list of transformers to preprocess the data and, if desired, conclude the sequence with a final predictor for predictive modeling.
For a machine learning model pipeline, it is crucial to perform the same preprocessing steps on the training, validation, and test data to avoid data leakage. Data leakage is a common issue during model building, where information from outside the training dataset is inadvertently used to develop the model. In other words, information about the future is used to train the model, making it unsuitable for production. The main benefits of using a Scikit-Learn Pipeline
class are:
- Consistency: A
Pipeline
ensures that your data transformations are consistently applied across all stages—training, validation, and testing. This consistency allows the model to handle unseen data appropriately and reliably. - No data leakage: When preprocessing steps like scaling and normalizing are applied to the training dataset, using a
Pipeline
ensures that the test data remains unseen and unaltered by these transformations. This prevents “peeking” into the test data, which helps the model’s performance metrics accurately reflect its ability to generalize to new, unseen data. - Fair evaluation: By maintaining consistent data transformations, a
Pipeline
ensures that during model evaluation, the metrics truly represent the model’s performance on new, unseen data. This allows for a fair and accurate assessment of the model’s true capabilities.
This is why keeping your set of data transformations reproducible using the sklearn.pipeline.Pipeline
class is so important.
2. What does production-ready code look like?
To demonstrate production-ready code, I will use code snippets from a dataset with the following features: ‘City’, ‘Website’, ‘Revenue’, ‘Status’, and the target ‘Value’.
The hypothetical code provided by the data scientist is in the following format:
Feature engineering steps
import pandas as pd
import numpy as np
import json
from sklearn.preprocessing import OneHotEncoder
N = 20
######### Handle Cities ###################
top_N_cities = list(X_train_raw.City.value_counts().head(N).index)
X_train_raw['city_revised'] = X_train_raw['City']
mask = ~X_train_raw['City'].isin(top_N_cities)
X_train_raw.loc[mask, 'city_revised'] = 'Other'
X_test_raw['city_revised'] = X_test_raw['City']
mask = ~X_test_raw['City'].isin(top_N_cities)
X_test_raw.loc[mask, 'city_revised'] = 'Other'
######### Handle Domains ###################
X_train_raw['domain'] = X_train_raw.Website.apply(lambda x: x.split('.')[-1].split('/')[0]).str.lower()
X_test_raw['domain'] = X_test_raw.Website.apply(lambda x: x.split('.')[-1].split('/')[0]).str.lower()
top_N_domains = list(X_train_raw.domain.value_counts().head(N).index)
X_train_raw['domain_revised'] = X_train_raw['domain']
mask = ~X_train_raw['domain'].isin(top_N_domains)
X_train_raw.loc[mask, 'domain_revised'] = 'Other'
X_test_raw['domain_revised'] = X_test_raw['domain']
mask = ~X_test_raw['domain'].isin(top_N_domains)
X_test_raw.loc[mask, 'domain_revised'] = 'Other'
######### Handle Revenue ###################
def revise_revenue(x):
if x == 'Undefined':
return np.nan
elif x == 'Below $1':
return 1.
elif x == '$100,000,001 or Above':
return 100_000_001.
else:
min_dollars, max_dollars = [float(d.replace('$', '').replace(',', '')) for d in x.split(' to ')]
med_dollars = (min_dollars + max_dollars) / 2.
return med_dollars
X_train_raw['revenue_revised'] = X_train_raw.Revenue.apply(revise_revenue)
X_test_raw['revenue_revised'] = X_test_raw.Revenue.apply(revise_revenue)
######### Handle HTTPS ###################
X_train_raw['has_https'] = X_train_raw.domain.str.lower().str.startswith('https')
X_test_raw['has_https'] = X_test_raw.domain.str.lower().str.startswith('https')
######### Create Cat Features ###################
enc = OneHotEncoder(handle_unknown='ignore')
cat_columns = ['city_revised', 'domain_revised', 'Status']
enc.fit(X_train_raw[cat_columns])
new_cat_columns = enc.get_feature_names_out(cat_columns)
train_cats_array = enc.transform(X_train_raw[cat_columns]).toarray()
test_cats_array = enc.transform(X_test_raw[cat_columns]).toarray()
cat_feats_train = pd.DataFrame(train_cats_array, columns=new_cat_columns)
cat_feats_test = pd.DataFrame(test_cats_array, columns=new_cat_columns)
############ Final Features ###############
IMPUTE = -10000000000000.
train_dfs = [
cat_feats_train.reset_index(drop=True),
X_train_raw[['revenue_revised', 'has_https']].reset_index(drop=True)
]
X_train = pd.concat(train_dfs, axis=1).fillna(IMPUTE)
test_dfs = [
cat_feats_test.reset_index(drop=True),
X_test_raw[['revenue_revised', 'has_https']].reset_index(drop=True)
]
X_test = pd.concat(test_dfs, axis=1).fillna(IMPUTE)
y_train = y_train.reset_index(drop=True)
y_test = y_test.reset_index(drop=True)
To refactor the provided code, we’ll employ object-oriented programming principles, utilizing a CustomPreprocessor
and a Pipeline
.
2.1. Setting up Scikit-learn
pip3 install scikit-learn
2.2. Create a CustomPreprocessor
Then, our CustomerPreprocessor()
class will inherit from the classes BaseEstimator
and TransformerMixin
.
Before building our custom preprocessor, we need to convert the feature engineering steps from procedural programming into methods for the CustomPreprocessor
class.
def preprocessing_cities(self, X, N=20):
"""
Preprocess the 'City' feature by retaining the top N
cities and grouping all other cities into 'Other'.
Inputs:
- X : pandas.DataFrame
The input DataFrame containing the 'City' feature.
- N : int, optional
The number of top cities to retain. Default is 20.
Returns:
pandas.DataFrame: A DataFrame with an additional
column 'city_revised', where non-top N cities
are labeled as 'Other'.
"""
self.X_ = X.copy()
top_N_cities = list(self.X_['City'].value_counts().head(N).index)
self.X_['city_revised'] = self.X_['City']
mask = ~self.X_['City'].isin(top_N_cities)
self.X_.loc[mask, 'city_revised'] = 'Other'
return self.X_
def preprocessing_domains(self, X, N=20):
"""
Preprocesses the 'Domain' feature by extracting domain
names from the 'Website' column, selecting the top N
domains, and replacing others with 'Other'.
Parameters:
- X: DataFrame, input data containing the 'Website' column.
- N: int, number of top domains to keep.
Returns:
- DataFrame: Preprocessed DataFrame with the
'domain_revised' column.
"""
self.X_ = X.copy()
# Extracting domain names from the 'Website' column
self.X_['domain'] = self.X_['Website'].apply(lambda x: \
x.split('.')[-1].split('/')[0]).str.lower()
# Selecting top N domains
top_N_domains = list(self.X_['domain'].value_counts().head(self.N).index)
# Creating 'domain_revised' column with top N domains and 'Other'
self.X_['domain_revised'] = self.X_['domain']
mask = ~self.X_['domain'].isin(top_N_domains)
self.X_.loc[mask, 'domain_revised'] = 'Other'
return self.X_
def preprocessing_revenue(self, X):
"""
Preprocesses the 'Revenue' feature by revising revenue
values.
Parameters:
- X: DataFrame, input data containing the 'Revenue'
column.
Returns:
- DataFrame: Preprocessed DataFrame with the
'revenue_revised' column.
"""
self.X_ = X.copy()
# Replace 'Undefined' with NaN
self.X_['revenue_revised'] = self.X_['Revenue']\
.replace('Undefined', np.nan)
# Replace 'Below $1' with 1.0
self.X_['revenue_revised'] = self.X_['revenue_revised']\
.replace('Below $1', 1.0)
# Replace '$100,000,001 or Above' with 100000001.0
self.X_['revenue_revised'] = self.X_['revenue_revised']\
.replace('$100,000,001 or Above', 100000001.0)
# Extract median revenue for ranges
self.X_['revenue_revised'] = self.X_['revenue_revised']\
.apply(lambda x: (float(x.split(' to ')[0]\
.replace('$', '').replace(',', '')) +\
float(x.split(' to ')[1].replace('$', '')\
.replace(',', ''))) / 2\
if isinstance(x, str) \
and ' to ' in x else x)
return self.X_
def preprocessing_https(self, X):
"""
Preprocesses the 'HTTPS' feature by checking
if the domain starts with 'https'.
Parameters:
- X: DataFrame, input data containing the
'domain' column.
Returns:
- DataFrame: Preprocessed DataFrame with
the 'has_https' column.
"""
self.X_ = X.copy()
# Check if domain starts with 'https'
self.X_['has_https'] = self.X_['domain'].str.lower().str.startswith('https')
return self.X_
2.2.1. BaseEstimator class
According to the Scikit-learn documentation, BaseEstimator
serves as a base class for all estimators in Scikit-learn. In our context, this means we’re leveraging a foundational class that provides several built-in functionalities, including parameter validation, data validation, and estimator serialization. You can find more details in the official documentation here.
2.2.2. TransformerMixin class
We will also need to utilize the TransformerMixin
class to construct our CustomPreprocessor
, which serves as a mixin class for all transformers in Scikit-Learn. You can refer to the documentation here for more details.
Each feature engineering step will be encapsulated as a method within the CustomPreprocessor()
, allowing us to utilize them in both the fit()
and transform()
methods.
2.3. Building a CustomPreprocessor
By inheriting from the BaseEstimator
and TransformerMixin
classes, we can create our new preprocessor to encapsulate preprocessing logic in a reusable and maintainable way. The first method is the constructor:
2.3.1. __init__ method
class CustomPreprocessor(BaseEstimator, TransformerMixin):
def __init__(self, cat_columns, N=20):
self.cat_columns = cat_columns
self.N = N
self.IMPUTE = -10000000000000.
self.cat_preprocessor = Pipeline([
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
self.feature_names = []
In the __init__
method, we initialize the object’s attributes and perform any setup or initialization tasks needed.
One of the feature engineering steps in this pipeline involves performing one-hot encoding. To handle categorical variables appropriately as part of the data preprocessing workflow, I opted to use the built-in OneHotEncoder()
preprocessor. This ensures consistency and reusability throughout the preprocessing process.
2.3.2. fit() method
The primary goal of the fit()
method is to learn and store the necessary parameters or statistics from the training data that are required for the preprocessing steps. For instance, when encoding categorical features using OneHotEncoder
, the fit()
method determines the unique categories present in the training data. This step ensures that the preprocessing is prepared to handle unseen data during prediction, aligning with the custom preprocessor’s design.
def fit(self, X, y=None):
self.X_ = X
self.y_ = y
# Preprocess the data
self.X_ = self.preprocessing_cities(self.X_, self.N)
self.X_ = self.preprocessing_domains(self.X_, self.N)
self.X_ = self.preprocessing_revenue(self.X_)
self.X_ = self.preprocessing_https(self.X_)
# Fit OneHotEncoder to categorical columns
self.cat_preprocessor.fit(self.X_[self.cat_columns])
self.X = self.X_.reset_index(drop=True)
self.y = self.y_.reset_index(drop=True) if y is not None else None
self.feature_names = self.X.columns.tolist()
return self
2.3.3. transform() method
The transform()
method applies the parameter statistics learned in fit()
to perform actual transformations on the data. These transformations can include encoding categorical features, scaling numerical values, or other preprocessing steps. When predicting with your machine learning pipeline, only the transform()
method is used. Therefore, it’s crucial that transform()
applies the same transformations learned during fit()
to ensure consistency and integrity across the preprocessing pipeline. This approach guarantees that the preprocessing steps applied during training are replicated accurately on new data during prediction.
def transform(self, X):
self.X_ = X.copy()
# Preprocess the data
self.X_ = self.preprocessing_cities(self.X_, self.N)
self.X_ = self.preprocessing_domains(self.X_, self.N)
self.X_ = self.preprocessing_revenue(self.X_)
self.X_ = self.preprocessing_https(self.X_)
self.cat_feats = pd.DataFrame(self.cat_preprocessor.transform(\
self.X_[self.cat_columns]).toarray(),
columns=self.cat_preprocessor.named_steps['onehot']\
.get_feature_names_out(self.cat_columns))
concat_dfs = [
self.cat_feats.reset_index(drop=True),
self.X_[['revenue_revised', 'has_https']].reset_index(drop=True)
]
self.X = pd.concat(concat_dfs, axis=1).fillna(self.IMPUTE)
self.feature_names = self.X.columns.tolist()
return self.X
Note that, following the original code, I’m using self.cat_preprocessor.transform()
with self.cat_columns
to encode features based on what was learned in the CustomPreprocessor().fit()
method. Additionally, it continues to concatenate the newly encoded features listed in cat_feats
, along with ‘revenue_revised’ and ‘has_https’.
In this pipeline, a GradientBoostingClassifier()
was chosen as the machine learning model to predict a binary class. The previously created CustomPreprocessor()
can be integrated using sklearn.pipeline.Pipeline()
.
The Pipeline
class represents a sequence of data transformers with an optional final predictor.
pipeline = Pipeline([
('preprocessor', CustomPreprocessor(cat_columns=['city_revised', 'domain_revised', 'Status'], N=20)),
('gbc', GradientBoostingClassifier(random_state=10)),
])
2.5. Run the Pipeline
# Fit and transform the pipeline on training data
pipeline_output = pipeline.fit(X_train_raw, y_train)
pipeline_output
After running the pipeline, we can verify its success by examining the output, which connects our CustomPreprocessor
with our model (GradientBoostingClassifier
).
Conclusion
In this post, we’ve explored in detail how to transform machine learning code from a prototype into production-ready code using Scikit-learn’s pipeline capabilities.
We began by discussing the CustomPreprocessor
class, emphasizing how to adapt prototype feature engineering steps into methods for object-oriented programming. This approach enhances testability, manageability, and integration, ensuring a reproducible and scalable solution for fair model evaluation.
Next, we delved into the Pipeline
class, a powerful tool for sequentially applying data transformations across training, validation, and test datasets. By maintaining consistency, pipelines help prevent data leakage, a common issue where insights from test data unintentionally influence model training, compromising its ability to generalize to new data.
Ultimately, leveraging Scikit-learn pipelines and custom preprocessing classes facilitates the transition from experimental code to robust production-ready machine learning systems. This approach not only enhances code quality and maintainability but also ensures that models generalize effectively to new, unseen data.
Share your comments, thoughts, or key considerations regarding what is important when creating production-ready machine learning code.