Skip to content

Trainer & Collectors

Trainer

Cross-Validation Workflow

Trainer runs cross-validation training independently of the main Experimenter experiment loop. It is created through Experimenter.add_trainer() and shares the same Pipeline and data cache.

trainer = exp.add_trainer(
    name='cv',
    data=None,              # None → use Experimenter's data
    splitter='same',        # 'same' → use exp.sp_v (inner splitter)
    splitter_params=None,   # None when splitter='same'
    aug_data=None,          # external DataFrame appended to inner train split at DataSource level
)

splitter='same' reuses the inner splitter (sp_v) configured on the Experimenter. Pass a scikit-learn splitter object to use a different split strategy. splitter=None trains on the entire dataset without splitting.

aug_data is appended to the inner training split at the DataSource level before any Stage processing. It is not persisted — pass it again on add_trainer after loading. The Experimenter constructor and create() also accept aug_data for the same purpose in the experiment loop.

select_head, train, process

select_head(nodes) specifies which Head nodes to train. All upstream Stage nodes are collected automatically.

trainer.select_head(['lgbm_v1', 'lgbm_v2'])

train() trains all unbuilt nodes in topological order. Each node is trained across all splits before moving to the next node.

trainer.train()

process(data, v=None) is a generator that applies the trained processors to new data, yielding one result per split.

for split_output in trainer.process(test_df):
    # split_output: concatenated Head outputs for this split
    ...

v optionally filters output columns from the Head nodes. If multiple Heads are selected, their outputs are concatenated column-wise.

to_inferencer

Once training is complete, convert the Trainer to an Inferencer for deployment:

inferencer = trainer.to_inferencer(v=None)
inferencer.save('./inferencer')

to_inferencer() copies the fitted processors out of the Trainer, so the resulting Inferencer is independent of the Trainer and Experimenter.



Sampler

Sampler applies resampling to training data before each fit() call. This is useful for class imbalance correction.

ImbLearnSampler

Wraps any imbalanced-learn sampler:

from imblearn.over_sampling import SMOTE
from mllabs.sampler import ImbLearnSampler

sampler = ImbLearnSampler(SMOTE(random_state=42))

To apply a sampler to a node, set the mllab_sampler key in params:

exp.set_node('lgbm_smote', grp='lgbm_grp', params={
    'n_estimators': 300,
    'mllab_sampler': ImbLearnSampler(SMOTE(random_state=42)),
})

mllab_sampler is intercepted by _node_processor before fit() / fit_process() — the key is stripped before the remaining params are passed to the estimator.

Custom Sampler

from mllabs.sampler import Sampler

class MySampler(Sampler):
    def sample(self, fit_params):
        X = fit_params['X']
        y = fit_params['y']
        # ... resample ...
        return {**fit_params, 'X': X_resampled, 'y': y_resampled}

Collectors

Collectors capture data from Head nodes during exp(). Each Collector uses a Connector to select which nodes it observes.

Error Handling

Collector lifecycle methods (_start, _collect, _end_idx, _end) are wrapped in try/except. If an error occurs, it is stored in collector.warnings as a dict:

# each entry in collector.warnings:
{'method': '_collect', 'node': 'lgbm_v1', 'type': 'ValueError', 'message': '...', 'traceback': '...'}

The error is logged as a warning but does not interrupt the experiment. Check collector.warnings after exp() if results are missing.

Connector-Based Matching

Connector controls which nodes a Collector attaches to. All three criteria are optional — only the ones provided are checked:

from mllabs import Connector

Connector()                                  # matches all nodes
Connector(node_query='lgbm')                 # regex match on node name
Connector(node_query=['lgbm_v1', 'lgbm_v2']) # exact list match
Connector(processor=LGBMClassifier)          # processor class match
Connector(edges={'y': [(None, 'target')]})   # edges contain-based match
Connector(role='head')                       # head nodes only
Connector(role='stage')                      # stage nodes only

Multiple criteria are combined with AND logic.


MetricCollector

Computes a metric function against ground truth y for each fold.

from mllabs.collector import MetricCollector
from sklearn.metrics import log_loss

mc = MetricCollector(
    name='metrics',
    connector=Connector(),
    output_var=None,          # None → all output columns
    metric_func=log_loss,     # func(y_true, y_pred) → scalar
    include_train=True,       # also compute on train/inner-valid folds
)
exp.add_collector(mc)

Querying results:

mc = exp.get_collector('metrics')

mc.get_metric('lgbm_v1')          # Series of per-fold metrics
mc.get_metrics(['lgbm_v1', 'lgbm_v2'])   # DataFrame

# Aggregate across folds
mean, std = mc.get_metrics_agg(
    nodes=None,         # None → all collected nodes
    inner_fold=True,    # aggregate inner folds (mean)
    outer_fold=True,    # then aggregate outer folds (mean)
    include_std=True,   # also return std DataFrame
)

StackingCollector

Collects out-of-fold (OOF) predictions for stacking.

from mllabs.collector import StackingCollector

sc = StackingCollector(
    name='stacking',
    connector=Connector(edges={'y': [(None, 'target')]}),
    output_var=None,          # columns to collect from output
    experimenter=exp,         # used to build index and target
    method='mean',            # how to aggregate inner folds: 'mean', 'mode', 'simple'
)
exp.add_collector(sc)

The connector.edges 'y' entry is used to extract the target column into the dataset.

Querying results:

sc = exp.get_collector('stacking')

df = sc.get_dataset(
    nodes=None,           # None → all collected nodes
    include_target=True,  # append target column
)
# Returns a DataFrame with OOF predictions + target, indexed to match original data

ModelAttrCollector

Collects model attributes such as feature importances for each fold.

from mllabs.collector import ModelAttrCollector

mac = ModelAttrCollector(
    name='importance',
    connector=Connector(processor=LGBMClassifier),
    result_key='feature_importances',  # key in adapter.result_objs
    # adapter is inferred from connector.processor automatically
)
exp.add_collector(mac)

Querying results:

mac = exp.get_collector('importance')

mac.get_attr('lgbm_v1')           # raw results: list of outer folds, each a list of inner folds
mac.get_attr('lgbm_v1', idx=0)    # results for outer fold 0

# Aggregate (only for mergeable result types like feature importances)
series = mac.get_attrs_agg(
    node='lgbm_v1',
    agg_inner=True,   # mean across inner folds
    agg_outer=True,   # then mean across outer folds → returns Series
)
# agg_inner=True, agg_outer=False → returns DataFrame (one column per outer fold)
# agg_inner=False → raises ValueError

SHAPCollector

Computes SHAP values using a tree explainer for each fold.

from mllabs.collector import SHAPCollector
from mllabs.filter import RandomFilter

shap_c = SHAPCollector(
    name='shap',
    connector=Connector(processor=LGBMClassifier),
    explainer_cls=None,       # None → shap.TreeExplainer
    data_filter=RandomFilter(n=500, random_state=0),  # subsample for speed
)
exp.add_collector(shap_c)

data_filter is applied to both train and valid data before computing SHAP values.

Querying results:

shap_c = exp.get_collector('shap')

# Per outer fold: list of pd.Series (one per inner fold)
series_list = shap_c.get_feature_importance('lgbm_v1', idx=0)

# Aggregated across all folds
importance = shap_c.get_feature_importance_agg(
    node='lgbm_v1',
    agg_inner='mean',   # aggregate inner folds; None → keep MultiIndex
    agg_outer='mean',   # aggregate outer folds; None → return DataFrame
)
# Both set → Series; agg_outer=None → DataFrame; agg_inner=None → MultiIndex DataFrame

Multiclass SHAP arrays (n_samples, n_features, n_classes) are automatically averaged across the class axis before computing feature importance.


OutputCollector

Saves raw output_train and output_valid arrays to disk for each fold.

from mllabs.collector import OutputCollector

oc = OutputCollector(
    name='outputs',
    connector=Connector(),
    output_var=None,        # columns to capture
    include_target=True,
)
exp.add_collector(oc)

Querying results:

oc = exp.get_collector('outputs')

entry = oc.get_output('lgbm_v1', idx=0, inner_idx=0)
# entry: {'output_train': (train_arr, valid_sub_arr), 'output_valid': arr, 'columns': [...]}

all_entries = oc.get_outputs('lgbm_v1')
# {(idx, inner_idx): entry, ...}

ProcessCollector

Collects predictions on external (test) data for each matched Head node. During exp(), it passes the external data through the same fitted upstream Stage processors as the experiment and calls the Head processor to produce predictions. Inner-fold predictions are aggregated per outer fold; outer-fold predictions are aggregated on query.

from mllabs.collector import ProcessCollector

pc = ProcessCollector(
    name='test_preds',
    connector=Connector(role='head'),  # head nodes only
    ext_data=test_df,                  # external dataset to predict on
    experimenter=exp,                  # used to run upstream stage transforms
    output_var=None,                   # column selector for processor output
    method='mean',                     # inner-fold aggregation: 'mean', 'mode', 'simple'
)
exp.add_collector(pc)

ext_data and experimenter are not persisted — pass them again if you reload the collector after saving.

Querying results:

pc = exp.get_collector('test_preds')

# Aggregate across all outer folds, columns from all matched nodes concatenated
df = pc.get_output(
    nodes=None,    # None → all collected nodes; list or regex str to filter
    agg='mean',    # outer-fold aggregation: 'mean', 'mode', 'simple'
)

# Single node
df_lgbm = pc.get_output(nodes=['lgbm_v1'], agg='mean')

Internally, exp.process_ext(ext_data, node, idx) is called to assemble the correctly transformed input for each node per outer fold, iterating over inner splits.