An end-to-end demo to wrap a pre-processor and explainer into an algorithm-agnostic ML pipeline with mlflow.pyfunc
Intro
One common challenge in MLOps is the hassle of migrating between various algorithms or frameworks. To tackle the challenge, this is my second article on the topic of generic model building using mlflow.pyfunc.
In my previous article, I offered a beginner-friendly step-by-step demo on creating a minimalist algorithm-agnostic model wrapper.
Algorithm-Agnostic Model Building with MLflow
To further our journey, by the end of this article, we will build a much more sophisticated ML pipeline with the below functionalities:
- This pipeline supports both classification (binary) and regression tasks. It works with scikit-learn models and other algorithms that follow the scikit-learn interface (i.e., fit, predict/predict_proba).
- Incorporating a fully functional Pre-Processor that can be fitted on train data and then used to transform new data for model consumption. This pre-processor can handle both numeric and categorical features and handle missing values with various imputation strategies.
- Adding an explainer to shed light on the model’s reasoning, which is invaluable for model selection, monitoring and implementation. This task can be tricky due to the varying implementations of SHAP values across different ML algorithms. But, all good, we will address the challenge in this article. 😎
Consistent with the previous article,
- You will see how easy it is to switch between different customized pre-processors, similar to switching between various ML algorithms.
- This ML pipeline then encapsulates any customized pipeline elements under the hood, yet still offers a unified model representation in pyfunc flavour to simplify model deployment, redeployment, and downstream scoring.
🔗 All code and config are available on GitHub. 🧰
The Pre-Processor (V1)
Many machine learning algorithms — such as linear models (e.g., linear regression, SVM), distance-based models (e.g., KNN, PCA), and gradient-based models (e.g., gradient boosting methods or gradient descent optimization) — tend to perform better with scaled input features, because scaling prevents features with larger ranges from dominating the learning process. Additionally, real-world data often contains missing values. Therefore, in this first iteration, we will build a pre-processor that can be trained to scale new data and impute missing values, preparing it for model consumption.
Once this pre-processor is built, I will then demo how to easily plug it into pyfunc ML pipeline. Sounds good? Let’s go. 🤠
class PreProcessor(BaseEstimator, TransformerMixin):
"""
Custom preprocessor for numeric features.
- Handles scaling of numeric data
- Performs imputation of missing values
Attributes:
transformer (Pipeline): Pipeline for numeric preprocessing
features (List[str]): Names of input features
"""
def __init__(self):
"""
Initialize preprocessor.
- Creates placeholder for transformer pipeline
"""
self.transformer = None
def fit(self, X, y=None):
"""
Fits the transformer on the provided dataset.
- Configures scaling for numeric features
- Sets up imputation for missing values
- Stores feature names for later use
Parameters:
X (pd.DataFrame): The input features to fit the transformer.
y (pd.Series, optional): Target variable, not used in this method.
Returns:
PreProcessor: The fitted transformer instance.
"""
self.features = X.columns.tolist()
if self.features:
self.transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
self.transformer.fit(X[self.features])
return self
def transform(self, X):
"""
Transform input data using fitted pipeline.
- Applies scaling to numeric features
- Handles missing values through imputation
Parameters:
X (pd.DataFrame): Input features to transform
Returns:
pd.DataFrame: Transformed data with scaled and imputed features
"""
X_transformed = pd.DataFrame()
if self.features:
transformed_data = self.transformer.transform(X[self.features])
X_transformed[self.features] = transformed_data
X_transformed.index = X.index
return X_transformed
def fit_transform(self, X, y=None):
"""
Fits the transformer on the input data and then transforms it.
Parameters:
X (pd.DataFrame): The input features to fit and transform.
y (pd.Series, optional): Target variable, not used in this method.
Returns:
pd.DataFrame: The transformed data.
"""
self.fit(X, y)
return self.transform(X)
This pre-processor can be fitted on train data and then used to process any new data. It will become an element in the ML pipeline below, but of course, we can use or test it independently. Let’s create a synthetic dataset and use the pre-processor to transform it.
# Set parameters for synthetic data
n_feature = 10
n_inform = 4
n_redundant = 0
n_samples = 1000
# Generate synthetic classification data
X, y = make_classification(
n_samples=n_samples,
n_features=n_feature,
n_informative=n_inform,
n_redundant=n_redundant,
shuffle=False,
random_state=12
)
# Create feature names
feat_names = [f'inf_{i+1}' for i in range(n_inform)] +
[f'rand_{i+1}' for i in range(n_feature - n_inform)]
# Convert to DataFrame with named features
X = pd.DataFrame(X, columns=feat_names)
# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=0.2,
random_state=22
)
Below are screenshots from {sweetViz} reports before vs after scaling; you can see that scaling didn’t change the underlying shape of each feature’s distribution but simply rescaled and shifted it. BTW, it takes two lines to generate a pretty comprehensive EDA report with {sweetViz}, code available in the GitHub repo linked above. 🥂
ML Pipeline with Pre-Processor
Now, let’s create an ML pipeline in the mlflow.pyfunc flavour that can encapsulate this preprocessor.
class ML_PIPELINE(mlflow.pyfunc.PythonModel):
"""
Custom ML pipeline for classification and regression.
- work with any scikit-learn compatible model
- Combines preprocessing and model training
- Handles model predictions
- Compatible with MLflow tracking
- Supports MLflow deployment
Attributes:
model (BaseEstimator or None): A scikit-learn compatible model instance
preprocessor (Any or None): Data preprocessing pipeline
config (Any or None): Optional config for model settings
task(str): Type of ML task ('classification' or 'regression')
"""
def __init__(self, model=None, preprocessor=None, config=None):
"""
Initialize the ML_PIPELINE.
Parameters:
model (BaseEstimator, optional):
- Scikit-learn compatible model
- Defaults to None
preprocessor (Any, optional):
- Transformer or pipeline for data preprocessing
- Defaults to None
config (Any, optional):
- Additional model settings
- Defaults to None
"""
self.model = model
self.preprocessor = preprocessor
self.config = config
self.task = "classification" if hasattr(self.model, "predict_proba") else "regression"
def fit(self, X_train: pd.DataFrame, y_train: pd.Series):
"""
Train the model on provided data.
- Applies preprocessing to features
- Fits model on transformed data
Parameters:
X_train (pd.DataFrame): Training features
y_train (pd.Series): Target values
"""
X_train_preprocessed = self.preprocessor.fit_transform(X_train.copy())
self.model.fit(X_train_preprocessed, y_train)
def predict(
self, context: Any, model_input: pd.DataFrame
) -> np.ndarray:
"""
Generate predictions using trained model.
- Applies preprocessing to new data
- Uses model to make predictions
Parameters:
context (Any): Optional context information provided
by MLflow during the prediction phase
model_input (pd.DataFrame): Input features
Returns:
Any: Model predictions or probabilities
"""
processed_model_input = self.preprocessor.transform(model_input.copy())
if self.task == "classification":
prediction = self.model.predict_proba(processed_model_input)[:,1]
elif self.task == "regression":
prediction = self.model.predict(processed_model_input)
return prediction
The ML pipeline defined above takes the preprocessor and ML algorithm as parameters. Usage example below
# define the ML pipeline instance with lightGBM classifier
ml_pipeline = ML_PIPELINE(model = lgb.LGBMClassifier(),
preprocessor = PreProcessor())
It is as simple as that! 🎉 If you want to experiment with another algorithm, just swap it like shown below. As a wrapper, it can encapsulate both regression and classification algorithms. For the latter, predicted probabilities are returned, as shown in the example above.
# define the ML pipeline instance with random forest regressor
ml_pipeline = ML_PIPELINE(model = RandomForestRegressor(),
preprocessor = PreProcessor())
As you can see from the code chunk below, passing hyperparameters to the algorithms is easy, making this ML pipeline a perfect instrument for hyperparameter tuning. I will elaborate on this topic in the following articles.
params = {
'n_estimators': 100,
'max_depth': 6,
'learning_rate': 0.1
}
model = xgb.XGBClassifier(**params)
ml_pipeline = ML_PIPELINE(model = model,
preprocessor = PreProcessor())
Because this ml pipeline is built in the mlflow.pyfunc flavour. We can log it with rich metadata saved automatically by mlflow for downstream use. When deployed, we can feed the metadata as context for the model in the predict function as shown below. More info and demos are available in my previous article, which is linked at the beginning.
# train the ML pipeline
ml_pipeline.fit(X_train, y_train)
# use the trained pipeline for prediction
y_prob = ml_pipeline.predict(
context=None, # provide metadata for model in production
model_input=X_test
)
auc = roc_auc_score(y_test, y_prob)
print(f"auc: {auc:.3f}")
Pre-Processor (V2)
The above pre-processor has worked well so far, but let’s improve it in two ways below and then demonstrate how to swap between pre-processors easily.
- Allow users to customize the pre-processing process. For instance, to specify the impute strategy.
- Expand pre-processor capacity to handle categorical features.
class PreProcessor_v2(BaseEstimator, TransformerMixin):
"""
Custom transformer for data preprocessing.
- Scales numeric features
- Encodes categorical features
- Handles missing values via imputation
- Compatible with scikit-learn pipeline
Attributes:
num_impute_strategy (str): Numeric imputation strategy
cat_impute_strategy (str): Categorical imputation strategy
num_transformer (Pipeline): Numeric preprocessing pipeline
cat_transformer (Pipeline): Categorical preprocessing pipeline
transformed_cat_cols (List[str]): One-hot encoded column names
num_features (List[str]): Numeric feature names
cat_features (List[str]): Categorical feature names
"""
def __init__(self, num_impute_strategy='median',
cat_impute_strategy='most_frequent'):
"""
Initialize the transformer.
- Sets up numeric data transformer
- Sets up categorical data transformer
- Configures imputation strategies
Parameters:
num_impute_strategy (str): Strategy for numeric missing values
cat_impute_strategy (str): Strategy for categorical missing values
"""
self.num_impute_strategy = num_impute_strategy
self.cat_impute_strategy = cat_impute_strategy
def fit(self, X, y=None):
"""
Fit transformer on input data.
- Identifies feature types
- Configures feature scaling
- Sets up encoding
- Fits imputation strategies
Parameters:
X (pd.DataFrame): Input features
y (pd.Series, optional): Target variable, not used
Returns:
CustomTransformer: Fitted transformer
"""
self.num_features = X.select_dtypes(include=np.number).columns.tolist()
self.cat_features = X.select_dtypes(exclude=np.number).columns.tolist()
if self.num_features:
self.num_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy=self.num_impute_strategy)),
('scaler', StandardScaler())
])
self.num_transformer.fit(X[self.num_features])
if self.cat_features:
self.cat_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy=self.cat_impute_strategy)),
('encoder', OneHotEncoder(handle_unknown='ignore'))
])
self.cat_transformer.fit(X[self.cat_features])
return self
def get_transformed_cat_cols(self):
"""
Get transformed categorical column names.
- Creates names after one-hot encoding
- Combines category with encoded values
Returns:
List[str]: One-hot encoded column names
"""
cat_cols = []
cats = self.cat_features
cat_values = self.cat_transformer['encoder'].categories_
for cat, values in zip(cats, cat_values):
cat_cols += [f'{cat}_{value}' for value in values]
return cat_cols
def transform(self, X):
"""
Transform input data.
- Applies fitted scaling
- Applies fitted encoding
- Handles numeric and categorical features
Parameters:
X (pd.DataFrame): Input features
Returns:
pd.DataFrame: Transformed data
"""
X_transformed = pd.DataFrame()
if self.num_features:
transformed_num_data = self.num_transformer.transform(X[self.num_features])
X_transformed[self.num_features] = transformed_num_data
if self.cat_features:
transformed_cat_data = self.cat_transformer.transform(X[self.cat_features]).toarray()
self.transformed_cat_cols = self.get_transformed_cat_cols()
transformed_cat_df = pd.DataFrame(transformed_cat_data, columns=self.transformed_cat_cols)
X_transformed = pd.concat([X_transformed, transformed_cat_df], axis=1)
X_transformed.index = X.index
return X_transformed
def fit_transform(self, X, y=None):
"""
Fit and transform input data.
- Fits transformer to data
- Applies transformation
- Combines both operations
Parameters:
X (pd.DataFrame): Input features
y (pd.Series, optional): Target variable, not used
Returns:
pd.DataFrame: Transformed data
"""
self.fit(X, y)
return self.transform(X)
Easy Switch of Custom Pre-Processors
There you have it: a new preprocessor that is 1) more customizable and 2) handles both numerical and categorical features. Let’s define an ML pipeline instance with it.
# Define a PreProcessor (V2) instance while specifying impute strategy
preprocessor = PreProcessor_v2(
num_impute_strategy = 'mean'
)
# Define an ML Pipeline instance with this preprocessor
ml_pipeline = ML_PIPELINE(
model = xgb.XGBClassifier(), # switch ML algorithms
preprocessor = PreProcessor # switch pre-processors
)
Let’s test this new ML pipeline instance with another synthetic dataset containing both numerical and categorical features.
# add missings
np.random.seed(42)
missing_rate = 0.20
n_missing = int(np.floor(missing_rate * X.size))
rows = np.random.randint(0, X.shape[0], n_missing)
cols = np.random.randint(0, X.shape[1], n_missing)
X.values[rows, cols] = np.nan
actual_missing_rate = X.isna().sum().sum() / X.size
print(f"Target missing rate: {missing_rate:.2%}")
print(f"Actual missing rate: {actual_missing_rate:.2%}")
# change X['inf_1] to categorical
percentiles = [0, 0.1, 0.5, 0.9, 1]
labels = ['bottom', 'lower-mid', 'upper-mid', 'top']
X['inf_1'] = pd.qcut(X['inf_1'], q=percentiles, labels=labels)
There you have it—the ML pipeline runs smoothly with the new data. As expected, however, if we define the ML pipeline with the previous preprocessor and then run it on this dataset, we will encounter errors because the previous preprocessor was not designed to handle categorical features.
# create an ML pipeline instance with PreProcessor v1
ml_pipeline = ML_PIPELINE(
model = lgb.LGBMClassifier(verbose = -1),
preprocessor = PreProcessor()
)
try:
ml_pipeline.fit(X_train, y_train)
except Exception as e:
print(f"Error: {e}")
Error: Cannot use median strategy with non-numeric data:
could not convert string to float: 'lower-mid'
The Benefit of An Explainable ML Pipeline
Adding an explainer to an ML pipeline can be super helpful in several ways:
- Model Selection: It helps us select the best model by evaluating the soundness of its reasoning. Two algorithms may perform similarly on metrics like AUC or precision, but the key features they rely on may differ. Reviewing model reasoning with domain experts to discuss which model makes more sense in such scenarios is a good idea.
- Troubleshooting: One helpful strategy for model improvement is to analyze the reasoning behind mistakes. For example, in classification problems, we can identify false positives where the model was most confident (i.e., produced the highest predicted possibilities) and investigate what went wrong in the reasoning and what key features contributed to the mistakes.
- Model Monitoring: Besides the typical monitoring elements such as data drift and performance metrics, it is informative to monitor model reasoning as well. If there is a significant shift in key features that drive the decisions made by a model in production, I want to be alerted.
- Model Implementation: In some scenarios, supplying model reasoning along with model predictions can be highly beneficial to our end users. For example, to help a customer service agent best retain a churning customer, we can provide the churn score alongside the customer features that contributed to this score.
Adding An Explainer to the ML Pipeline
Because our ML pipeline is algorithm agnostic, it is imperative that the explainer can also work across algorithms.
SHAP (SHapley Additive exPlanations) values are an excellent choice for our purpose because they provide theoretically robust explanations based on game theory. They are designed to work consistently across algorithms, including both tree-based and non-tree-based models, with some approximations for the latter. Additionally, SHAP offers rich visualization capabilities and is widely regarded as an industry standard.
In the notebooks below, I have dug into the similarities and differences between SHAP implementations for various ML algorithms.
- SHAP for regressor
- SHAP for XGBoost Classifier
- SHAP for RandomForest Classifier
- SHAP for LightGBM Classifier
To create a generic explainer for our ML pipeline, the key differences to address are
1. Whether the model is directly supported by shap.Explainer
The model-specific SHAP explainers are significantly more efficient than the model-agnostic ones. Therefore, the approach we take here is
- first attempts to use the direct SHAP explainer for the model type,
- If that fails, falls back to a model-agnostic explainer using the predict function.
2. The shape of SHAP values
For binary classification problems, SHAP values can come in two formats/shapes.
- Format 1: Only shows impact on positive class
shape = (n_samples, n_features) # 2d array
- Format 2: Shows impact on both classes
shape = (n_samples, n_features, n_classes) # 3d array
- The explainer implementation below always shows the impact on the positive class. When the impact on both classes is available in SHAP values, it selects the ones on the positive class.
Please see the code below for the implementation of the approach discussed above.
class ML_PIPELINE(mlflow.pyfunc.PythonModel):
"""
Custom ML pipeline for classification and regression.
- Works with scikit-learn compatible models
- Handles data preprocessing
- Manages model training and predictions
- Provide global and local model explanation
- Compatible with MLflow tracking
- Supports MLflow deployment
Attributes:
model (BaseEstimator or None): A scikit-learn compatible model instance
preprocessor (Any or None): Data preprocessing pipeline
config (Any or None): Optional config for model settings
task(str): Type of ML task ('classification' or 'regression')
both_class (bool): Whether SHAP values include both classes
shap_values (shap.Explanation): SHAP values for model explanation
X_explain (pd.DataFrame): Processed features for SHAP explanation
"""
# ------- same code as above ---------
def explain_model(self,X):
"""
Generate SHAP values and plots for model interpretation.
This method:
1. Transforms the input data using the fitted preprocessor
2. Creates a SHAP explainer appropriate for the model type
3. Calculates SHAP values for feature importance
4. Generates a summary plot of feature importance
Parameters:
X : pd.DataFrame
Input features to generate explanations for.
Returns: None
The method stores the following attributes in the class:
- self.X_explain : pd.DataFrame
Transformed data with original numeric values for interpretation
- self.shap_values : shap.Explanation
SHAP values for each prediction
- self.both_class : bool
Whether the model outputs probabilities for both classes
"""
X_transformed = self.preprocessor.transform(X.copy())
self.X_explain = X_transformed.copy()
# get pre-transformed values for numeric features
self.X_explain[self.preprocessor.num_features] = X[self.preprocessor.num_features]
self.X_explain.reset_index(drop=True)
try:
# Attempt to create an explainer that directly supports the model
explainer = shap.Explainer(self.model)
except:
# Fallback for models or shap versions where direct support may be limited
explainer = shap.Explainer(self.model.predict, X_transformed)
self.shap_values = explainer(X_transformed)
# get the shape of shap values and extract accordingly
self.both_class = len(self.shap_values.values.shape) == 3
if self.both_class:
shap.summary_plot(self.shap_values[:,:,1])
elif self.both_class == False:
shap.summary_plot(self.shap_values)
def explain_case(self,n):
"""
Generate SHAP waterfall plot for one specific case.
- Shows feature contributions
- Starts from base value
- Ends at final prediction
- Shows original feature values for better interpretability
Parameters:
n (int): Case index (1-based)
e.g., n=1 explains the first case.
Returns:
None: Displays SHAP waterfall plot
Notes:
- Requires explain_model() first
- Shows positive class for binary tasks
"""
if self.shap_values is None:
print("""
Please explain model first by running
`explain_model()` using a selected dataset
""")
else:
self.shap_values.data = self.X_explain
if self.both_class:
shap.plots.waterfall(self.shap_values[:,:,1][n-1])
elif self.both_class == False:
shap.plots.waterfall(self.shap_values[n-1])
Now, the updated ML pipeline instance can create explanatory graphs for you in just one line of code. 😎
Log and Use the Model
Of course, you can log a trained ML pipeline using mlflow and enjoy all the metadata for model deployment and reproducibility. In the screenshot below, you can see that in addition to the pickled pyfunc model itself, the Python environment, metrics, and hyperparameters have all been logged in just a few lines of code below. To learn more, please refer to my previous article on mlflow.pyfunc, which is linked at the beginning.
# Log the model with MLflow
with mlflow.start_run() as run:
# Log the custom model with auto-captured conda environment
model_info = mlflow.pyfunc.log_model(
artifact_path="model",
python_model=ml_pipeline,
conda_env=mlflow.sklearn.get_default_conda_env()
)
# Log model parameters
mlflow.log_params(ml_pipeline.model.get_params())
# Log metrics
mlflow.log_metric("rmse", rmse)
# Get the run ID
run_id = run.info.run_id
Conclusions & Next Steps
This is it, a generic and explainable ML pipeline that works for both classification and regression algorithms. Take the code and extend it to suit your use case. 🤗 If you find this useful, please give me a clap 👏🥰
To further our journey on the mlflow.pyfunc series, below are some topics I am considering. Feel free to leave a comment and let me know what you would like to see. 🥰
- Feature selection
- Hyperparameter tuning
- If instead of choosing between off-the-shelf algorithms, one decides to ensemble multiple algorithms or have highly customized solutions, they can still enjoy a generic model representation and seamless migration via mlflow.pyfunc.
Stay tuned and follow me on Medium. 😁
💼LinkedIn | 😺GitHub | 🕊️Twitter/X
Unless otherwise noted, all images are by the author.
Explainable Generic ML Pipeline with MLflow was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.
An end-to-end demo to wrap a pre-processor and explainer into an algorithm-agnostic ML pipeline with mlflow.pyfuncPhoto by Hannah Murrell on UnsplashIntroOne common challenge in MLOps is the hassle of migrating between various algorithms or frameworks. To tackle the challenge, this is my second article on the topic of generic model building using mlflow.pyfunc.In my previous article, I offered a beginner-friendly step-by-step demo on creating a minimalist algorithm-agnostic model wrapper.Algorithm-Agnostic Model Building with MLflowTo further our journey, by the end of this article, we will build a much more sophisticated ML pipeline with the below functionalities:This pipeline supports both classification (binary) and regression tasks. It works with scikit-learn models and other algorithms that follow the scikit-learn interface (i.e., fit, predict/predict_proba).Incorporating a fully functional Pre-Processor that can be fitted on train data and then used to transform new data for model consumption. This pre-processor can handle both numeric and categorical features and handle missing values with various imputation strategies.Adding an explainer to shed light on the model’s reasoning, which is invaluable for model selection, monitoring and implementation. This task can be tricky due to the varying implementations of SHAP values across different ML algorithms. But, all good, we will address the challenge in this article. 😎Consistent with the previous article,You will see how easy it is to switch between different customized pre-processors, similar to switching between various ML algorithms.This ML pipeline then encapsulates any customized pipeline elements under the hood, yet still offers a unified model representation in pyfunc flavour to simplify model deployment, redeployment, and downstream scoring.🔗 All code and config are available on GitHub. 🧰The Pre-Processor (V1)Many machine learning algorithms — such as linear models (e.g., linear regression, SVM), distance-based models (e.g., KNN, PCA), and gradient-based models (e.g., gradient boosting methods or gradient descent optimization) — tend to perform better with scaled input features, because scaling prevents features with larger ranges from dominating the learning process. Additionally, real-world data often contains missing values. Therefore, in this first iteration, we will build a pre-processor that can be trained to scale new data and impute missing values, preparing it for model consumption.Once this pre-processor is built, I will then demo how to easily plug it into pyfunc ML pipeline. Sounds good? Let’s go. 🤠class PreProcessor(BaseEstimator, TransformerMixin): “”” Custom preprocessor for numeric features. – Handles scaling of numeric data – Performs imputation of missing values Attributes: transformer (Pipeline): Pipeline for numeric preprocessing features (List[str]): Names of input features “”” def __init__(self): “”” Initialize preprocessor. – Creates placeholder for transformer pipeline “”” self.transformer = None def fit(self, X, y=None): “”” Fits the transformer on the provided dataset. – Configures scaling for numeric features – Sets up imputation for missing values – Stores feature names for later use Parameters: X (pd.DataFrame): The input features to fit the transformer. y (pd.Series, optional): Target variable, not used in this method. Returns: PreProcessor: The fitted transformer instance. “”” self.features = X.columns.tolist() if self.features: self.transformer = Pipeline(steps=[ (‘imputer’, SimpleImputer(strategy=’median’)), (‘scaler’, StandardScaler()) ]) self.transformer.fit(X[self.features]) return self def transform(self, X): “”” Transform input data using fitted pipeline. – Applies scaling to numeric features – Handles missing values through imputation Parameters: X (pd.DataFrame): Input features to transform Returns: pd.DataFrame: Transformed data with scaled and imputed features “”” X_transformed = pd.DataFrame() if self.features: transformed_data = self.transformer.transform(X[self.features]) X_transformed[self.features] = transformed_data X_transformed.index = X.index return X_transformed def fit_transform(self, X, y=None): “”” Fits the transformer on the input data and then transforms it. Parameters: X (pd.DataFrame): The input features to fit and transform. y (pd.Series, optional): Target variable, not used in this method. Returns: pd.DataFrame: The transformed data. “”” self.fit(X, y) return self.transform(X)This pre-processor can be fitted on train data and then used to process any new data. It will become an element in the ML pipeline below, but of course, we can use or test it independently. Let’s create a synthetic dataset and use the pre-processor to transform it.# Set parameters for synthetic datan_feature = 10n_inform = 4n_redundant = 0n_samples = 1000# Generate synthetic classification dataX, y = make_classification( n_samples=n_samples, n_features=n_feature, n_informative=n_inform, n_redundant=n_redundant, shuffle=False, random_state=12)# Create feature namesfeat_names = [f’inf_{i+1}’ for i in range(n_inform)] + [f’rand_{i+1}’ for i in range(n_feature – n_inform)]# Convert to DataFrame with named featuresX = pd.DataFrame(X, columns=feat_names)# Split data into train and test setsX_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=22)Below are screenshots from {sweetViz} reports before vs after scaling; you can see that scaling didn’t change the underlying shape of each feature’s distribution but simply rescaled and shifted it. BTW, it takes two lines to generate a pretty comprehensive EDA report with {sweetViz}, code available in the GitHub repo linked above. 🥂Screenshots from SweetViz reports before vs after preprocessingML Pipeline with Pre-ProcessorNow, let’s create an ML pipeline in the mlflow.pyfunc flavour that can encapsulate this preprocessor.class ML_PIPELINE(mlflow.pyfunc.PythonModel): “”” Custom ML pipeline for classification and regression. – work with any scikit-learn compatible model – Combines preprocessing and model training – Handles model predictions – Compatible with MLflow tracking – Supports MLflow deployment Attributes: model (BaseEstimator or None): A scikit-learn compatible model instance preprocessor (Any or None): Data preprocessing pipeline config (Any or None): Optional config for model settings task(str): Type of ML task (‘classification’ or ‘regression’) “”” def __init__(self, model=None, preprocessor=None, config=None): “”” Initialize the ML_PIPELINE. Parameters: model (BaseEstimator, optional): – Scikit-learn compatible model – Defaults to None preprocessor (Any, optional): – Transformer or pipeline for data preprocessing – Defaults to None config (Any, optional): – Additional model settings – Defaults to None “”” self.model = model self.preprocessor = preprocessor self.config = config self.task = “classification” if hasattr(self.model, “predict_proba”) else “regression” def fit(self, X_train: pd.DataFrame, y_train: pd.Series): “”” Train the model on provided data. – Applies preprocessing to features – Fits model on transformed data Parameters: X_train (pd.DataFrame): Training features y_train (pd.Series): Target values “”” X_train_preprocessed = self.preprocessor.fit_transform(X_train.copy()) self.model.fit(X_train_preprocessed, y_train) def predict( self, context: Any, model_input: pd.DataFrame ) -> np.ndarray: “”” Generate predictions using trained model. – Applies preprocessing to new data – Uses model to make predictions Parameters: context (Any): Optional context information provided by MLflow during the prediction phase model_input (pd.DataFrame): Input features Returns: Any: Model predictions or probabilities “”” processed_model_input = self.preprocessor.transform(model_input.copy()) if self.task == “classification”: prediction = self.model.predict_proba(processed_model_input)[:,1] elif self.task == “regression”: prediction = self.model.predict(processed_model_input) return predictionThe ML pipeline defined above takes the preprocessor and ML algorithm as parameters. Usage example below# define the ML pipeline instance with lightGBM classifierml_pipeline = ML_PIPELINE(model = lgb.LGBMClassifier(), preprocessor = PreProcessor())It is as simple as that! 🎉 If you want to experiment with another algorithm, just swap it like shown below. As a wrapper, it can encapsulate both regression and classification algorithms. For the latter, predicted probabilities are returned, as shown in the example above.# define the ML pipeline instance with random forest regressorml_pipeline = ML_PIPELINE(model = RandomForestRegressor(), preprocessor = PreProcessor())As you can see from the code chunk below, passing hyperparameters to the algorithms is easy, making this ML pipeline a perfect instrument for hyperparameter tuning. I will elaborate on this topic in the following articles.params = { ‘n_estimators’: 100, ‘max_depth’: 6, ‘learning_rate’: 0.1 }model = xgb.XGBClassifier(**params)ml_pipeline = ML_PIPELINE(model = model, preprocessor = PreProcessor())Because this ml pipeline is built in the mlflow.pyfunc flavour. We can log it with rich metadata saved automatically by mlflow for downstream use. When deployed, we can feed the metadata as context for the model in the predict function as shown below. More info and demos are available in my previous article, which is linked at the beginning.# train the ML pipelineml_pipeline.fit(X_train, y_train)# use the trained pipeline for predictiony_prob = ml_pipeline.predict( context=None, # provide metadata for model in production model_input=X_test)auc = roc_auc_score(y_test, y_prob)print(f”auc: {auc:.3f}”)Pre-Processor (V2)The above pre-processor has worked well so far, but let’s improve it in two ways below and then demonstrate how to swap between pre-processors easily.Allow users to customize the pre-processing process. For instance, to specify the impute strategy.Expand pre-processor capacity to handle categorical features. class PreProcessor_v2(BaseEstimator, TransformerMixin): “”” Custom transformer for data preprocessing. – Scales numeric features – Encodes categorical features – Handles missing values via imputation – Compatible with scikit-learn pipeline Attributes: num_impute_strategy (str): Numeric imputation strategy cat_impute_strategy (str): Categorical imputation strategy num_transformer (Pipeline): Numeric preprocessing pipeline cat_transformer (Pipeline): Categorical preprocessing pipeline transformed_cat_cols (List[str]): One-hot encoded column names num_features (List[str]): Numeric feature names cat_features (List[str]): Categorical feature names “”” def __init__(self, num_impute_strategy=’median’, cat_impute_strategy=’most_frequent’): “”” Initialize the transformer. – Sets up numeric data transformer – Sets up categorical data transformer – Configures imputation strategies Parameters: num_impute_strategy (str): Strategy for numeric missing values cat_impute_strategy (str): Strategy for categorical missing values “”” self.num_impute_strategy = num_impute_strategy self.cat_impute_strategy = cat_impute_strategy def fit(self, X, y=None): “”” Fit transformer on input data. – Identifies feature types – Configures feature scaling – Sets up encoding – Fits imputation strategies Parameters: X (pd.DataFrame): Input features y (pd.Series, optional): Target variable, not used Returns: CustomTransformer: Fitted transformer “”” self.num_features = X.select_dtypes(include=np.number).columns.tolist() self.cat_features = X.select_dtypes(exclude=np.number).columns.tolist() if self.num_features: self.num_transformer = Pipeline(steps=[ (‘imputer’, SimpleImputer(strategy=self.num_impute_strategy)), (‘scaler’, StandardScaler()) ]) self.num_transformer.fit(X[self.num_features]) if self.cat_features: self.cat_transformer = Pipeline(steps=[ (‘imputer’, SimpleImputer(strategy=self.cat_impute_strategy)), (‘encoder’, OneHotEncoder(handle_unknown=’ignore’)) ]) self.cat_transformer.fit(X[self.cat_features]) return self def get_transformed_cat_cols(self): “”” Get transformed categorical column names. – Creates names after one-hot encoding – Combines category with encoded values Returns: List[str]: One-hot encoded column names “”” cat_cols = [] cats = self.cat_features cat_values = self.cat_transformer[‘encoder’].categories_ for cat, values in zip(cats, cat_values): cat_cols += [f'{cat}_{value}’ for value in values] return cat_cols def transform(self, X): “”” Transform input data. – Applies fitted scaling – Applies fitted encoding – Handles numeric and categorical features Parameters: X (pd.DataFrame): Input features Returns: pd.DataFrame: Transformed data “”” X_transformed = pd.DataFrame() if self.num_features: transformed_num_data = self.num_transformer.transform(X[self.num_features]) X_transformed[self.num_features] = transformed_num_data if self.cat_features: transformed_cat_data = self.cat_transformer.transform(X[self.cat_features]).toarray() self.transformed_cat_cols = self.get_transformed_cat_cols() transformed_cat_df = pd.DataFrame(transformed_cat_data, columns=self.transformed_cat_cols) X_transformed = pd.concat([X_transformed, transformed_cat_df], axis=1) X_transformed.index = X.index return X_transformed def fit_transform(self, X, y=None): “”” Fit and transform input data. – Fits transformer to data – Applies transformation – Combines both operations Parameters: X (pd.DataFrame): Input features y (pd.Series, optional): Target variable, not used Returns: pd.DataFrame: Transformed data “”” self.fit(X, y) return self.transform(X)Easy Switch of Custom Pre-ProcessorsThere you have it: a new preprocessor that is 1) more customizable and 2) handles both numerical and categorical features. Let’s define an ML pipeline instance with it.# Define a PreProcessor (V2) instance while specifying impute strategypreprocessor = PreProcessor_v2( num_impute_strategy = ‘mean’)# Define an ML Pipeline instance with this preprocessorml_pipeline = ML_PIPELINE( model = xgb.XGBClassifier(), # switch ML algorithms preprocessor = PreProcessor # switch pre-processors )Let’s test this new ML pipeline instance with another synthetic dataset containing both numerical and categorical features.# add missingsnp.random.seed(42) missing_rate = 0.20 n_missing = int(np.floor(missing_rate * X.size))rows = np.random.randint(0, X.shape[0], n_missing)cols = np.random.randint(0, X.shape[1], n_missing)X.values[rows, cols] = np.nanactual_missing_rate = X.isna().sum().sum() / X.sizeprint(f”Target missing rate: {missing_rate:.2%}”)print(f”Actual missing rate: {actual_missing_rate:.2%}”)# change X[‘inf_1] to categoricalpercentiles = [0, 0.1, 0.5, 0.9, 1]labels = [‘bottom’, ‘lower-mid’, ‘upper-mid’, ‘top’]X[‘inf_1’] = pd.qcut(X[‘inf_1’], q=percentiles, labels=labels)There you have it—the ML pipeline runs smoothly with the new data. As expected, however, if we define the ML pipeline with the previous preprocessor and then run it on this dataset, we will encounter errors because the previous preprocessor was not designed to handle categorical features.# create an ML pipeline instance with PreProcessor v1ml_pipeline = ML_PIPELINE( model = lgb.LGBMClassifier(verbose = -1), preprocessor = PreProcessor())try: ml_pipeline.fit(X_train, y_train)except Exception as e: print(f”Error: {e}”)Error: Cannot use median strategy with non-numeric data:could not convert string to float: ‘lower-mid’The Benefit of An Explainable ML PipelineAdding an explainer to an ML pipeline can be super helpful in several ways:Model Selection: It helps us select the best model by evaluating the soundness of its reasoning. Two algorithms may perform similarly on metrics like AUC or precision, but the key features they rely on may differ. Reviewing model reasoning with domain experts to discuss which model makes more sense in such scenarios is a good idea.Troubleshooting: One helpful strategy for model improvement is to analyze the reasoning behind mistakes. For example, in classification problems, we can identify false positives where the model was most confident (i.e., produced the highest predicted possibilities) and investigate what went wrong in the reasoning and what key features contributed to the mistakes.Model Monitoring: Besides the typical monitoring elements such as data drift and performance metrics, it is informative to monitor model reasoning as well. If there is a significant shift in key features that drive the decisions made by a model in production, I want to be alerted.Model Implementation: In some scenarios, supplying model reasoning along with model predictions can be highly beneficial to our end users. For example, to help a customer service agent best retain a churning customer, we can provide the churn score alongside the customer features that contributed to this score.Adding An Explainer to the ML PipelineBecause our ML pipeline is algorithm agnostic, it is imperative that the explainer can also work across algorithms.SHAP (SHapley Additive exPlanations) values are an excellent choice for our purpose because they provide theoretically robust explanations based on game theory. They are designed to work consistently across algorithms, including both tree-based and non-tree-based models, with some approximations for the latter. Additionally, SHAP offers rich visualization capabilities and is widely regarded as an industry standard.In the notebooks below, I have dug into the similarities and differences between SHAP implementations for various ML algorithms.SHAP for regressorSHAP for XGBoost ClassifierSHAP for RandomForest ClassifierSHAP for LightGBM ClassifierTo create a generic explainer for our ML pipeline, the key differences to address are1. Whether the model is directly supported by shap.ExplainerThe model-specific SHAP explainers are significantly more efficient than the model-agnostic ones. Therefore, the approach we take here isfirst attempts to use the direct SHAP explainer for the model type,If that fails, falls back to a model-agnostic explainer using the predict function.2. The shape of SHAP valuesFor binary classification problems, SHAP values can come in two formats/shapes.Format 1: Only shows impact on positive classshape = (n_samples, n_features) # 2d arrayFormat 2: Shows impact on both classesshape = (n_samples, n_features, n_classes) # 3d arrayThe explainer implementation below always shows the impact on the positive class. When the impact on both classes is available in SHAP values, it selects the ones on the positive class.Please see the code below for the implementation of the approach discussed above.class ML_PIPELINE(mlflow.pyfunc.PythonModel): “”” Custom ML pipeline for classification and regression. – Works with scikit-learn compatible models – Handles data preprocessing – Manages model training and predictions – Provide global and local model explanation – Compatible with MLflow tracking – Supports MLflow deployment Attributes: model (BaseEstimator or None): A scikit-learn compatible model instance preprocessor (Any or None): Data preprocessing pipeline config (Any or None): Optional config for model settings task(str): Type of ML task (‘classification’ or ‘regression’) both_class (bool): Whether SHAP values include both classes shap_values (shap.Explanation): SHAP values for model explanation X_explain (pd.DataFrame): Processed features for SHAP explanation “”” # ——- same code as above ——— def explain_model(self,X): “”” Generate SHAP values and plots for model interpretation. This method: 1. Transforms the input data using the fitted preprocessor 2. Creates a SHAP explainer appropriate for the model type 3. Calculates SHAP values for feature importance 4. Generates a summary plot of feature importance Parameters: X : pd.DataFrame Input features to generate explanations for. Returns: None The method stores the following attributes in the class: – self.X_explain : pd.DataFrame Transformed data with original numeric values for interpretation – self.shap_values : shap.Explanation SHAP values for each prediction – self.both_class : bool Whether the model outputs probabilities for both classes “”” X_transformed = self.preprocessor.transform(X.copy()) self.X_explain = X_transformed.copy() # get pre-transformed values for numeric features self.X_explain[self.preprocessor.num_features] = X[self.preprocessor.num_features] self.X_explain.reset_index(drop=True) try: # Attempt to create an explainer that directly supports the model explainer = shap.Explainer(self.model) except: # Fallback for models or shap versions where direct support may be limited explainer = shap.Explainer(self.model.predict, X_transformed) self.shap_values = explainer(X_transformed) # get the shape of shap values and extract accordingly self.both_class = len(self.shap_values.values.shape) == 3 if self.both_class: shap.summary_plot(self.shap_values[:,:,1]) elif self.both_class == False: shap.summary_plot(self.shap_values) def explain_case(self,n): “”” Generate SHAP waterfall plot for one specific case. – Shows feature contributions – Starts from base value – Ends at final prediction – Shows original feature values for better interpretability Parameters: n (int): Case index (1-based) e.g., n=1 explains the first case. Returns: None: Displays SHAP waterfall plot Notes: – Requires explain_model() first – Shows positive class for binary tasks “”” if self.shap_values is None: print(“”” Please explain model first by running `explain_model()` using a selected dataset “””) else: self.shap_values.data = self.X_explain if self.both_class: shap.plots.waterfall(self.shap_values[:,:,1][n-1]) elif self.both_class == False: shap.plots.waterfall(self.shap_values[n-1]) Now, the updated ML pipeline instance can create explanatory graphs for you in just one line of code. 😎SHAP plot for global explanation of the modelSHAP plot for local explanation of any specific caseLog and Use the ModelOf course, you can log a trained ML pipeline using mlflow and enjoy all the metadata for model deployment and reproducibility. In the screenshot below, you can see that in addition to the pickled pyfunc model itself, the Python environment, metrics, and hyperparameters have all been logged in just a few lines of code below. To learn more, please refer to my previous article on mlflow.pyfunc, which is linked at the beginning.# Log the model with MLflowwith mlflow.start_run() as run: # Log the custom model with auto-captured conda environment model_info = mlflow.pyfunc.log_model( artifact_path=”model”, python_model=ml_pipeline, conda_env=mlflow.sklearn.get_default_conda_env() ) # Log model parameters mlflow.log_params(ml_pipeline.model.get_params()) # Log metrics mlflow.log_metric(“rmse”, rmse) # Get the run ID run_id = run.info.run_idRich model metadata and artifacts logged with mlflowConclusions & Next StepsThis is it, a generic and explainable ML pipeline that works for both classification and regression algorithms. Take the code and extend it to suit your use case. 🤗 If you find this useful, please give me a clap 👏🥰To further our journey on the mlflow.pyfunc series, below are some topics I am considering. Feel free to leave a comment and let me know what you would like to see. 🥰Feature selectionHyperparameter tuningIf instead of choosing between off-the-shelf algorithms, one decides to ensemble multiple algorithms or have highly customized solutions, they can still enjoy a generic model representation and seamless migration via mlflow.pyfunc.Stay tuned and follow me on Medium. 😁💼LinkedIn | 😺GitHub | 🕊️Twitter/XUnless otherwise noted, all images are by the author.Explainable Generic ML Pipeline with MLflow was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story. machine-learning, mlflow, hands-on-tutorials, databricks, mlops Towards Data Science – MediumRead More
Add to favorites
0 Comments