End-to-End Sklearn Pipeline

🔩
Full Production Pipeline
Preprocessing + model in one object — train once, predict safely, serialize cleanly
PipelineColumnTransformerproduction
Full Example
Why Pipeline
Serialization
python
import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report

# ── 1. Define feature groups ──────────────────────────────
num_features = ['age', 'income', 'balance']
cat_features = ['job', 'education', 'marital']
target       = 'churn'

X = df[num_features + cat_features]
y = df[target]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

# ── 2. Sub-pipelines per feature type ────────────────────
num_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler',  StandardScaler())
])

cat_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('ohe',     OneHotEncoder(
        handle_unknown='ignore',  # unseen categories → zero vector
        sparse_output=False
    ))
])

# ── 3. ColumnTransformer ─────────────────────────────────
preprocessor = ColumnTransformer([
    ('num', num_pipe, num_features),
    ('cat', cat_pipe, cat_features),
], remainder='drop')

# ── 4. Full pipeline ──────────────────────────────────────
clf = Pipeline([
    ('preprocessor', preprocessor),
    ('model',        HistGradientBoostingClassifier(
        max_iter=300, learning_rate=0.05,
        max_depth=6, early_stopping=True, random_state=42
    ))
])

# ── 5. Cross-validate on training set ────────────────────
scores = cross_val_score(clf, X_train, y_train,
                          cv=5, scoring='roc_auc', n_jobs=-1)
print(f'CV ROC-AUC: {scores.mean():.4f} ± {scores.std():.4f}')

# ── 6. Fit on full training set, evaluate on test ────────
clf.fit(X_train, y_train)
y_prob = clf.predict_proba(X_test)[:, 1]
y_pred = clf.predict(X_test)
print(classification_report(y_test, y_pred))
  • Prevents data leakage: All transformers (imputer, scaler, encoder) are fit only on training folds inside cross-validation. They never see validation data during fitting.
  • Single object to serve: clf.predict(X_new) automatically applies all preprocessing. No need to manually pre-process new data at inference time.
  • Safe for GridSearchCV: GridSearchCV(clf, param_grid) tunes the model inside the pipeline with leakage-safe CV.
  • Introspect via named_steps: clf.named_steps['model'].feature_importances_ and clf['preprocessor'].transformers_ allow post-fit inspection.
python
import joblib

# Serialize entire pipeline (preprocessing + model)
joblib.dump(clf, 'churn_clf_v1.joblib', compress=3)

# Load and predict in production
loaded_clf = joblib.load('churn_clf_v1.joblib')

# New raw data — pipeline handles all preprocessing
new_customer = pd.DataFrame([{
    'age': 35, 'income': 58000, 'balance': 12000,
    'job': 'technician', 'education': 'university.degree',
    'marital': 'married'
}])

churn_prob = loaded_clf.predict_proba(new_customer)[0, 1]
print(f'Churn probability: {churn_prob:.2%}')

# Version metadata alongside model
import json
metadata = {
    'model_version': '1.0',
    'features': num_features + cat_features,
    'cv_roc_auc': float(scores.mean()),
}
with open('model_metadata.json', 'w') as f:
    json.dump(metadata, f, indent=2)

Custom Transformers

🛠️
BaseEstimator + TransformerMixin
Build custom feature engineering steps that plug into sklearn Pipelines
custom transformerBaseEstimatorfit/transform
Example
Internals
python
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np

class LogTransformer(BaseEstimator, TransformerMixin):
    """Apply log1p transform to specified features (for skewed distributions)."""
    
    def __init__(self, columns=None):
        self.columns = columns  # list of column names or indices
    
    def fit(self, X, y=None):
        return self    # stateless transform — nothing to fit
    
    def transform(self, X):
        X_out = X.copy()
        if self.columns is not None:
            X_out[self.columns] = np.log1p(X_out[self.columns])
        else:
            X_out = np.log1p(X_out)
        return X_out


class DomainFeatures(BaseEstimator, TransformerMixin):
    """Create domain-specific engineered features."""
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X_out = X.copy()
        X_out['balance_to_income'] = X['balance'] / (X['income'] + 1e-9)
        X_out['age_group'] = pd.cut(X['age'],
            bins=[0, 30, 50, 100],
            labels=['young', 'mid', 'senior']
        )
        return X_out


# Plug into a Pipeline — fully compatible
pipe = Pipeline([
    ('domain_feats', DomainFeatures()),
    ('log_skewed',   LogTransformer(columns=['income', 'balance'])),
    ('preprocessor', preprocessor),
    ('model',        HistGradientBoostingClassifier(random_state=42))
])

pipe.fit(X_train, y_train)
print(pipe.score(X_test, y_test))
  • BaseEstimator: Provides get_params() / set_params() automatically from __init__ arguments — required for GridSearchCV compatibility.
  • TransformerMixin: Provides fit_transform(X, y) = fit(X, y).transform(X) for free.
  • Stateless transformers: If no data is learned during fit(), just return self. Stateful: compute stats in fit (e.g., log-transform bounds learned from training data), store as self.attr_ (trailing underscore convention).
  • set_output API: Call pipe.set_output(transform='pandas') to propagate DataFrame column names through all transforms (sklearn ≥1.2).

Keras Model in Sklearn Pipeline

🔗
SciKeras — Wrap Keras in a Sklearn Estimator
Use a Keras model inside a sklearn Pipeline for cross-validation and hyperparameter search
SciKerasKerasClassifierintegration
Setup
Example
# Install: pip install scikeras from scikeras.wrappers import KerasClassifier, KerasRegressor # Define model factory function def build_model(n_units=128, dropout=0.3): model = keras.Sequential([...]) model.compile(...) return model # Wrap as sklearn estimator clf = KerasClassifier( model=build_model, n_units=128, dropout=0.3, epochs=50, batch_size=32, verbose=0 )
python
from scikeras.wrappers import KerasClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import RandomizedSearchCV

def build_model(n_units=64, dropout=0.2, lr=1e-3):
    model = keras.Sequential([
        keras.layers.Input(shape=(30,)),
        keras.layers.Dense(n_units, activation='relu'),
        keras.layers.Dropout(dropout),
        keras.layers.Dense(1, activation='sigmoid')
    ])
    model.compile(
        optimizer=keras.optimizers.Adam(lr),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    return model

# Wrap + pipeline
keras_clf = KerasClassifier(
    model=build_model, epochs=50, batch_size=32, verbose=0
)
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('model',  keras_clf)
])

# Tune Keras hyperparams via RandomizedSearchCV
param_dist = {
    'model__n_units':  [32, 64, 128, 256],
    'model__dropout': [0.1, 0.2, 0.3, 0.4],
    'model__lr':      [1e-4, 1e-3, 5e-4]
}
rs = RandomizedSearchCV(
    pipe, param_dist, n_iter=15, cv=5,
    scoring='roc_auc', n_jobs=1  # n_jobs=1 with GPU-based Keras
)
rs.fit(X_train, y_train)
print(f'Best ROC-AUC: {rs.best_score_:.4f}')
print(rs.best_params_)