Skip to content

TrainerSupervised

The TrainerSupervised class is part of the jarvais.trainer module.

jarvais.trainer.TrainerSupervised

TrainerSupervised is a class for automating the process of feature reduction, model training, and evaluation for various machine learning tasks.

Parameters:

Name Type Description Default
output_dir str | Path

The output directory for saving the trained model and data.

required
target_variable str | list[str]

The column name of the target variable, or a list of two column names for survival analysis.

required
task str

The type of task to perform, e.g. 'binary', 'multiclass', 'regression', or 'survival'.

required
stratify_on str | None

The column name of a variable to stratify the train-test split over. If None, no stratification will be performed.

None
test_size float

The proportion of data to use for testing. Default is 0.2.

0.2
k_folds int

The number of folds to use for cross-validation. Default is 5.

5
reduction_method str | None

The method to use for feature reduction. If None, no feature reduction will be performed.

None
keep_k int

The number of features to keep after reduction. Default is 2.

2
random_state int

The random state for reproducibility. Default is 42.

42
explain bool

Whether to generate explanations for the model. Default is False.

False
Source code in src/jarvais/trainer/trainer.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
class TrainerSupervised:
    """
    TrainerSupervised is a class for automating the process of feature reduction, 
    model training, and evaluation for various machine learning tasks.

    Parameters:
        output_dir (str | Path): The output directory for saving the trained model and data.
        target_variable (str | list[str]): The column name of the target variable, or a list of two column names for survival analysis.
        task (str): The type of task to perform, e.g. 'binary', 'multiclass', 'regression', or 'survival'.
        stratify_on (str | None): The column name of a variable to stratify the train-test split over. If None, no stratification will be performed.
        test_size (float): The proportion of data to use for testing. Default is 0.2.
        k_folds (int): The number of folds to use for cross-validation. Default is 5.
        reduction_method (str | None): The method to use for feature reduction. If None, no feature reduction will be performed.
        keep_k (int): The number of features to keep after reduction. Default is 2.
        random_state (int): The random state for reproducibility. Default is 42.
        explain (bool): Whether to generate explanations for the model. Default is False.
    """
    def __init__(
        self,
        output_dir: str | Path,
        target_variable: str | list[str],
        task: str,
        stratify_on: str | None = None,
        test_size: float = 0.2,
        k_folds: int = 5,
        reduction_method: str | None = None,
        keep_k: int = 2,
        random_state: int = 42,
        explain: bool = False
    ) -> None:

        self.encoding_module = OneHotEncodingModule.build()

        if task in {'binary', 'multiclass', 'regression'}:
            logger.warning(
                "One-hot encoding is disabled for binary and multiclass tasks due to autogluon's OneHotEncoder implementation. If you want to use one-hot encoding, edit the trainer settings manually." 
            )
            self.encoding_module.enabled = False

        self.reduction_module = FeatureReductionModule.build(
            method=reduction_method,
            task=task,
            keep_k=keep_k
        )

        if task == "survival":
            if set(target_variable) != {'time', 'event'}: 
                raise ValueError("Target variable must be a list of ['time', 'event'] for survival analysis.")

            self.trainer_module = SurvivalTrainerModule.build(
                output_dir=output_dir 
            )
        else:
            self.trainer_module = AutogluonTabularWrapper.build(
                output_dir=output_dir,
                target_variable=target_variable,
                task=task,
                k_folds=k_folds
            )

        self.settings = TrainerSettings(
            output_dir=Path(output_dir),
            target_variable=target_variable,
            task=task,
            stratify_on=stratify_on,
            test_size=test_size,
            random_state=random_state,
            explain=explain,
            encoding_module=self.encoding_module,
            reduction_module=self.reduction_module,
            trainer_module=self.trainer_module,
        )

    @classmethod
    def from_settings(
            cls, 
            settings_dict: dict,
        ) -> "TrainerSupervised":
        """
        Initialize a TrainerSupervised instance with a given settings dictionary.

        Args:
            dict: A dictionary containing the settings for the TrainerSupervised instance.

        Returns:
            TrainerSupervised: An instance of TrainerSupervised with the given settings.
        """
        settings = TrainerSettings.model_validate(settings_dict)

        trainer = cls(
            output_dir=settings.output_dir,
            target_variable=settings.target_variable,
            task=settings.task,
        )

        trainer.encoding_module = settings.encoding_module
        trainer.reduction_module = settings.reduction_module
        trainer.trainer_module = settings.trainer_module

        trainer.settings = settings

        return trainer

    def run(
            self,
            data: pd.DataFrame 
        ) -> None:
        self.input_data = data

        # Preprocess
        X = self.input_data.drop(self.settings.target_variable, axis=1)
        y = self.input_data[self.settings.target_variable]

        X = self.encoding_module(X)
        X, y = self.reduction_module(X, y)     

        if self.settings.task in {'binary', 'multiclass'}:
            stratify_col = (
                y.astype(str) + '_' + self.input_data[self.settings.stratify_on].astype(str)
                if self.settings.stratify_on is not None
                else y
            )
        else:
            stratify_col = None

        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, 
            y, 
            test_size=self.settings.test_size, 
            stratify=stratify_col, 
            random_state=self.settings.random_state
        )

        # Train
        self.predictor, self.X_val, self.y_val = self.trainer_module.fit(
            X_train=self.X_train, 
            y_train=self.y_train, 
            X_test=self.X_test, 
            y_test=self.y_test
        )

        self.X_train = self.X_train.drop(self.X_val.index)
        self.y_train = self.y_train.drop(self.y_val.index)

        data_dir = self.settings.output_dir / 'data'
        data_dir.mkdir(parents=True, exist_ok=True)
        self.input_data.to_csv((data_dir / 'input_data.csv'), index=False)
        self.X_train.to_csv((data_dir / 'X_train.csv'), index=False)
        self.X_test.to_csv((data_dir / 'X_test.csv'), index=False)
        self.X_val.to_csv((data_dir / 'X_val.csv'), index=False)
        self.y_train.to_csv((data_dir / 'y_train.csv'), index=False)
        self.y_test.to_csv((data_dir / 'y_test.csv'), index=False)
        self.y_val.to_csv((data_dir / 'y_val.csv'), index=False)

        if self.settings.explain:
            explainer = Explainer(self.settings.output_dir)
            explainer.run(self)

        # Save Settings
        schema_path = self.settings.output_dir / 'trainer_settings.schema.json'
        with schema_path.open("w") as f:
            json.dump(self.settings.model_json_schema(), f, indent=2)

        settings_path = self.settings.output_dir / 'trainer_settings.json'
        with settings_path.open('w') as f:
            json.dump({
                "$schema": str(schema_path.relative_to(self.settings.output_dir)),
                **self.settings.model_dump(mode="json") 
            }, f, indent=2)

    def model_names(self) -> list[str]:
        """
        Returns all trainer model names.

        This method retrieves the names of all models associated with the 
        current predictor. It requires that the predictor has been trained.

        Returns:
            list: A list of model names available in the predictor.
        """

        return self.predictor.model_names()

    def infer(self, data: pd.DataFrame, model: str | None = None) -> np.ndarray:
        """
        Make predictions on new data using the trained predictor.

        Args:
            data (pd.DataFrame): The new data to make predictions on.
            model (str | None): The model to use for prediction. If None, the best model will be used.

        Returns:
            np.ndarray: The predicted values.
        """

        if self.settings.task == 'survival':
            inference = self.predictor.predict(data, model)
        elif self.predictor.can_predict_proba:
            inference = self.predictor.predict_proba(data, model, as_pandas=False)[:, 1]
        else:
            inference = self.predictor.predict(data, model, as_pandas=False)

        return inference

    @classmethod
    def load_trainer(
            cls, 
            project_dir: str | Path
        ) -> "TrainerSupervised":

        from autogluon.tabular import TabularPredictor

        from jarvais.trainer.modules.survival_trainer import SurvivalPredictor

        project_dir = Path(project_dir)
        with (project_dir / 'trainer_settings.json').open('r') as f:
            trainer_config = json.load(f)

        trainer = cls.from_settings(trainer_config)

        if trainer.settings.task == 'survival':
            model_dir = (project_dir / 'survival_models')
            trainer.predictor = SurvivalPredictor.load(model_dir)
        else:
            model_dir = (project_dir / 'autogluon_models' / 'autogluon_models_best_fold')
            trainer.predictor = TabularPredictor.load(model_dir, verbosity=1)

        trainer.input_data = pd.read_csv(project_dir / 'data' / 'input_data.csv')
        trainer.X_test = pd.read_csv(project_dir / 'data' / 'X_test.csv')
        trainer.X_val = pd.read_csv(project_dir / 'data' / 'X_val.csv')
        trainer.X_train = pd.read_csv(project_dir / 'data' / 'X_train.csv')
        trainer.y_test = pd.read_csv(project_dir / 'data' / 'y_test.csv').squeeze()
        trainer.y_val = pd.read_csv(project_dir / 'data' / 'y_val.csv').squeeze()
        trainer.y_train = pd.read_csv(project_dir / 'data' / 'y_train.csv').squeeze()

        return trainer

    def __rich_repr__(self) -> rich.repr.Result:
        yield self.settings

    def __repr__(self) -> str:
        return f"TrainerSupervised(settings={self.settings.model_dump_json(indent=2)})"

from_settings(settings_dict) classmethod

Initialize a TrainerSupervised instance with a given settings dictionary.

Parameters:

Name Type Description Default
dict

A dictionary containing the settings for the TrainerSupervised instance.

required

Returns:

Name Type Description
TrainerSupervised TrainerSupervised

An instance of TrainerSupervised with the given settings.

Source code in src/jarvais/trainer/trainer.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
@classmethod
def from_settings(
        cls, 
        settings_dict: dict,
    ) -> "TrainerSupervised":
    """
    Initialize a TrainerSupervised instance with a given settings dictionary.

    Args:
        dict: A dictionary containing the settings for the TrainerSupervised instance.

    Returns:
        TrainerSupervised: An instance of TrainerSupervised with the given settings.
    """
    settings = TrainerSettings.model_validate(settings_dict)

    trainer = cls(
        output_dir=settings.output_dir,
        target_variable=settings.target_variable,
        task=settings.task,
    )

    trainer.encoding_module = settings.encoding_module
    trainer.reduction_module = settings.reduction_module
    trainer.trainer_module = settings.trainer_module

    trainer.settings = settings

    return trainer

model_names()

Returns all trainer model names.

This method retrieves the names of all models associated with the current predictor. It requires that the predictor has been trained.

Returns:

Name Type Description
list list[str]

A list of model names available in the predictor.

Source code in src/jarvais/trainer/trainer.py
190
191
192
193
194
195
196
197
198
199
200
201
def model_names(self) -> list[str]:
    """
    Returns all trainer model names.

    This method retrieves the names of all models associated with the 
    current predictor. It requires that the predictor has been trained.

    Returns:
        list: A list of model names available in the predictor.
    """

    return self.predictor.model_names()

infer(data, model=None)

Make predictions on new data using the trained predictor.

Parameters:

Name Type Description Default
data DataFrame

The new data to make predictions on.

required
model str | None

The model to use for prediction. If None, the best model will be used.

None

Returns:

Type Description
ndarray

np.ndarray: The predicted values.

Source code in src/jarvais/trainer/trainer.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def infer(self, data: pd.DataFrame, model: str | None = None) -> np.ndarray:
    """
    Make predictions on new data using the trained predictor.

    Args:
        data (pd.DataFrame): The new data to make predictions on.
        model (str | None): The model to use for prediction. If None, the best model will be used.

    Returns:
        np.ndarray: The predicted values.
    """

    if self.settings.task == 'survival':
        inference = self.predictor.predict(data, model)
    elif self.predictor.can_predict_proba:
        inference = self.predictor.predict_proba(data, model, as_pandas=False)[:, 1]
    else:
        inference = self.predictor.predict(data, model, as_pandas=False)

    return inference

Trainer Modules

jarvais.trainer.modules.FeatureReductionModule

Bases: BaseModel

Source code in src/jarvais/trainer/modules/feature_reduction.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class FeatureReductionModule(BaseModel):
    method: Literal['mrmr', 'variance_threshold', 'corr', 'chi2', None] = Field(
        description="Feature reduction strategy to apply."
    )
    task: Literal['binary', 'multiclass', 'regression', 'survival'] = Field(
        description="Supervised learning task type."
    )
    keep_k: int = Field(
        default=10,
        description="Number of features to retain for relevant methods."
    )
    enabled: bool = Field(
        default=True,
        description="Whether to perform feature reduction."
    )

    @classmethod
    def build(cls, method: str | None, task: str, keep_k: int = 10) -> "FeatureReductionModule":
        return cls(method=method, task=task, keep_k=keep_k) # type: ignore

    def __call__(self, X: pd.DataFrame, y: pd.Series) -> tuple[pd.DataFrame, pd.Series]:
        if not self.enabled or self.method is None:
            logger.info("Skipping feature reduction.")
            return X, y

        if self.task == 'survival':
            logger.warning("Survival analysis is not supported for feature reduction. Skipping feature reduction.")
            return X, y

        logger.info(f"Applying feature reduction: {self.method}")

        X = X.copy()
        y = y.copy()

        # Step 1: Ordinal encode categoricals
        categorical_columns = X.select_dtypes(include=['object', 'category']).columns.tolist()
        mappings = {}

        for col in categorical_columns:
            mapping = {k: i for i, k in enumerate(X[col].dropna().unique())}
            X[col] = X[col].map(mapping)
            mappings[col] = mapping

        # Step 2: Perform reduction
        if self.method == "variance_threshold":
            X_reduced = self._variance_threshold(X, y)
        elif self.method == "chi2":
            if self.task not in ['binary', 'multiclass']:
                raise ValueError("Chi2 only supports classification tasks.")
            X_reduced = self._chi2(X, y)
        elif self.method == "corr":
            X_reduced = self._kbest(X, y)
        elif self.method == "mrmr":
            X_reduced = self._mrmr(X, y)
        else:
            msg = f"Unsupported method: {self.method}"
            raise ValueError(msg)

        # Step 3: Reverse mappings for categorical columns
        for col in categorical_columns:
            if col in X_reduced.columns:
                inverse_map = {v: k for k, v in mappings[col].items()}
                X_reduced[col] = X_reduced[col].round().astype(int).map(inverse_map).astype("category")

        logger.info(f"Feature reduction complete. Remaining features: {X_reduced.columns}")

        return X_reduced, y

    def _variance_threshold(self, X: pd.DataFrame, y: pd.Series) -> pd.DataFrame:
        selector = VarianceThreshold()
        _ = selector.fit_transform(X, y)
        return X[X.columns[selector.get_support(indices=True)]]

    def _chi2(self, X: pd.DataFrame, y: pd.Series) -> pd.DataFrame:
        selector = SelectKBest(score_func=chi2, k=self.keep_k)
        _ = selector.fit_transform(X, y)
        return X[X.columns[selector.get_support(indices=True)]]

    def _kbest(self, X: pd.DataFrame, y: pd.Series) -> pd.DataFrame:
        score_func = f_classif if self.task in ['binary', 'multiclass'] else f_regression
        selector = SelectKBest(score_func=score_func, k=self.keep_k)
        _ = selector.fit_transform(X, y)
        return X[X.columns[selector.get_support(indices=True)]]

    def _mrmr(self, X: pd.DataFrame, y: pd.Series) -> pd.DataFrame:
        mrmr_fn = mrmr_classif if self.task in ['binary', 'multiclass'] else mrmr_regression
        selected_features = mrmr_fn(X=X, y=y, K=self.keep_k, n_jobs=1)
        return X[selected_features]

jarvais.trainer.modules.OneHotEncodingModule

Bases: BaseModel

Source code in src/jarvais/trainer/modules/encoding.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class OneHotEncodingModule(BaseModel):
    columns: list[str] | None = Field(
        default=None,
        description="List of categorical columns to one-hot encode. If None, all columns are used."
    )
    prefix_sep: str = Field(
        default="|",
        description="Prefix separator used in encoded feature names."
    )
    enabled: bool = Field(
        default=True,
        description="Whether to perform one-hot encoding."
    )

    @classmethod
    def build(
        cls,
        categorical_columns: list[str] | None = None,
        prefix_sep: str = "|",
    ) -> "OneHotEncodingModule":
        return cls(
            columns=categorical_columns,
            prefix_sep=prefix_sep
        )

    def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
        if not self.enabled:
            logger.warning("One-hot encoding is disabled.")
            return df

        df = df.copy()
        return pd.get_dummies(
            df,
            columns=self.columns,
            dtype=float,
            prefix_sep=self.prefix_sep
        )

jarvais.trainer.modules.SurvivalTrainerModule

Bases: BaseModel

Source code in src/jarvais/trainer/modules/survival_trainer.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
class SurvivalTrainerModule(BaseModel):
    output_dir: Path = Field(
        description="Output directory.",
        title="Output Directory",
        examples=["output"]
    )
    classical_models: list[str] = Field(
        description="List of classical machine learning models to train.",
        title="Classical Machine Learning Models",
        examples=["CoxPH", "RandomForest", "GradientBoosting", "SVM"]
    )
    deep_models: list[str] = Field(
        description="List of deep learning models to train.",
        title="Deep Learning Models",
        examples=["MTLR", "DeepSurv"]
    )
    eval_metric: Literal["c_index"] = Field(
        default="c_index",
        description="Evaluation metric.",
        title="Evaluation Metric"
    )
    random_seed: int = Field(
        default=42,
        description="Random seed for reproducibility.",
        title="Random Seed"
    )     

    @classmethod
    def validate_classical_models(cls, models: list[str]) -> list[str]:
        model_registry = ["CoxPH", "RandomForest", "GradientBoosting", "SVM"]
        invalid = [m for m in models if m not in model_registry]
        if invalid:
            msg = f"Invalid models: {invalid}. Available: {model_registry}"
            logger.error(msg)
            raise ValueError(msg)
        return models

    @classmethod
    def validate_deep_models(cls, models: list[str]) -> list[str]:
        model_registry = ["MTLR", "DeepSurv"]
        invalid = [m for m in models if m not in model_registry]
        if invalid:
            msg = f"Invalid models: {invalid}. Available: {model_registry}"
            logger.error(msg)
            raise ValueError(msg)
        return models

    @classmethod
    def build(
        cls,
        output_dir: str | Path,
    ) -> "SurvivalTrainerModule":
        return cls(
            output_dir=Path(output_dir),
            deep_models=["MTLR", "DeepSurv"],
            classical_models=["CoxPH", "RandomForest", "GradientBoosting", "SVM"]
        )

    def fit(
            self,
            X_train: pd.DataFrame, 
            y_train: pd.DataFrame,
            X_test: pd.DataFrame,
            y_test: pd.DataFrame
        ) -> tuple[SurvivalPredictor, pd.DataFrame, pd.DataFrame]:

        """Train both deep and traditional survival models, consolidate fitted models and C-index scores."""
        (self.output_dir / 'survival_models').mkdir(exist_ok=True, parents=True)

        trained_models = {}

        # Deep Models

        data_train, data_val = train_test_split(
            pd.concat([X_train, y_train], axis=1), 
            test_size=0.1, 
            stratify=y_train['event'], 
            random_state=self.random_seed
        )

        if "MTLR" in self.deep_models:
            try:
                trained_models['MTLR'] = train_mtlr(
                    data_train,
                    data_val,
                    self.output_dir / 'survival_models',
                    self.random_seed)
            except Exception as e:
                logger.error(f"Error training MTLR model: {e}")
        else:
            logger.info("Skipping MTLR model training.")

        if "DeepSurv" in self.deep_models:
            try:
                trained_models['DeepSurv'] = train_deepsurv(
                    data_train,
                    data_val,
                    self.output_dir / 'survival_models',
                    self.random_seed)
            except Exception as e:
                logger.error(f"Error training DeepSurv model: {e}")
        else:
            logger.info("Skipping DeepSurv model training.")

        # Basic Models

        models = {
            "CoxPH": CoxnetSurvivalAnalysis(fit_baseline_model=True),
            "GradientBoosting": GradientBoostingSurvivalAnalysis(),
            "RandomForest": RandomSurvivalForest(n_estimators=100, random_state=self.random_seed),
            "SVM": FastSurvivalSVM(max_iter=1000, tol=1e-5, random_state=self.random_seed),
        }

        y_train_surv = Surv.from_dataframe('event', 'time', y_train)
        for name, model in models.items():
            if name not in self.classical_models:
                logger.info(f"Skipping {name} model training.")
                continue

            try:
                logger.info(f"Training {name} model...")
                model.fit(X_train.astype(float), y_train_surv)
                trained_models[name] = model

                model_path = self.output_dir / 'survival_models' / f"{name}.pkl"
                with model_path.open("wb") as f:
                    pickle.dump(model, f)

            except Exception as e:
                logger.error(f"Error training {name} model: {e}")

        X_val = data_val.drop(["time", "event"], axis=1)
        y_val = data_val[["time", "event"]]

        predictor = self._evaluate(trained_models, X_train, y_train, X_val, y_val, X_test, y_test)

        survival_metadata = {
            "model_scores": predictor.model_scores,
            "best_model": predictor.best_model
        }
        with (self.output_dir / "survival_models" / "survival_metadata.json").open("w") as f:
            json.dump(survival_metadata, f)

        return predictor, X_val, y_val

    def _evaluate(
            self, 
            trained_models: dict[str, MODELType], 
            X_train: pd.DataFrame, 
            y_train: pd.DataFrame, 
            X_val: pd.DataFrame, 
            y_val: pd.DataFrame, 
            X_test: pd.DataFrame, 
            y_test: pd.DataFrame
        ) -> SurvivalPredictor:

        leaderboard = []
        test_scores = {}
        for model in self.classical_models:  
            trained_model = trained_models.get(model)
            if not trained_model:
                continue

            train_score = concordance_index_censored( # Classical models don't have a validation set
                pd.concat([y_train['event'], y_val['event']], axis=0).astype(bool),
                pd.concat([y_train['time'], y_val['time']], axis=0),
                trained_models[model].predict(pd.concat([X_train, X_val], axis=0))
            )[0]
            test_score = concordance_index_censored(
                y_test['event'].astype(bool),
                y_test['time'], 
                trained_models[model].predict(X_test)
            )[0]
            leaderboard.append({
                'model': model,
                'test_score': f"{self.eval_metric.upper()}: {round(test_score, 3)}",
                'val_score': 'N/A', # no validation for classical models
                'train_score': f"{self.eval_metric.upper()}: {round(train_score, 3)}",
            })

            test_scores[model] = test_score

        for model in self.deep_models:
            trained_model = trained_models.get(model)
            if not trained_model:
                continue

            train_score = concordance_index_censored(
                y_train['event'].astype(bool), 
                y_train['time'], 
                trained_model.predict(X_train)
            )[0]
            val_score = concordance_index_censored(
                y_val['event'].astype(bool), 
                y_val['time'], 
                trained_model.predict(X_val)
            )[0]
            test_score = concordance_index_censored(
                y_test['event'].astype(bool), 
                y_test['time'], 
                trained_model.predict(X_test)
            )[0]
            leaderboard.append({
                'model': model,
                'test_score': f"{self.eval_metric.upper()}: {round(test_score, 3)}",
                'val_score': f"{self.eval_metric.upper()}: {round(val_score, 3)}",
                'train_score': f"{self.eval_metric.upper()}: {round(train_score, 3)}",
            })

            test_scores[model] = test_score

        leaderboard_df = pd.DataFrame(leaderboard).sort_values(by='test_score', ascending=False)
        leaderboard_df.to_csv(self.output_dir / 'leaderboard.csv', index=False)

        print('\nModel Leaderboard\n----------------') # noqa: T201
        print(tabulate( # noqa: T201
            leaderboard_df,
            tablefmt = "grid",
            headers="keys",
            showindex=False))

        return SurvivalPredictor(
                models=trained_models, 
                model_scores=test_scores,
                best_model=max(test_scores, key=test_scores.get) # type: ignore
            )

fit(X_train, y_train, X_test, y_test)

Train both deep and traditional survival models, consolidate fitted models and C-index scores.

Source code in src/jarvais/trainer/modules/survival_trainer.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def fit(
        self,
        X_train: pd.DataFrame, 
        y_train: pd.DataFrame,
        X_test: pd.DataFrame,
        y_test: pd.DataFrame
    ) -> tuple[SurvivalPredictor, pd.DataFrame, pd.DataFrame]:

    """Train both deep and traditional survival models, consolidate fitted models and C-index scores."""
    (self.output_dir / 'survival_models').mkdir(exist_ok=True, parents=True)

    trained_models = {}

    # Deep Models

    data_train, data_val = train_test_split(
        pd.concat([X_train, y_train], axis=1), 
        test_size=0.1, 
        stratify=y_train['event'], 
        random_state=self.random_seed
    )

    if "MTLR" in self.deep_models:
        try:
            trained_models['MTLR'] = train_mtlr(
                data_train,
                data_val,
                self.output_dir / 'survival_models',
                self.random_seed)
        except Exception as e:
            logger.error(f"Error training MTLR model: {e}")
    else:
        logger.info("Skipping MTLR model training.")

    if "DeepSurv" in self.deep_models:
        try:
            trained_models['DeepSurv'] = train_deepsurv(
                data_train,
                data_val,
                self.output_dir / 'survival_models',
                self.random_seed)
        except Exception as e:
            logger.error(f"Error training DeepSurv model: {e}")
    else:
        logger.info("Skipping DeepSurv model training.")

    # Basic Models

    models = {
        "CoxPH": CoxnetSurvivalAnalysis(fit_baseline_model=True),
        "GradientBoosting": GradientBoostingSurvivalAnalysis(),
        "RandomForest": RandomSurvivalForest(n_estimators=100, random_state=self.random_seed),
        "SVM": FastSurvivalSVM(max_iter=1000, tol=1e-5, random_state=self.random_seed),
    }

    y_train_surv = Surv.from_dataframe('event', 'time', y_train)
    for name, model in models.items():
        if name not in self.classical_models:
            logger.info(f"Skipping {name} model training.")
            continue

        try:
            logger.info(f"Training {name} model...")
            model.fit(X_train.astype(float), y_train_surv)
            trained_models[name] = model

            model_path = self.output_dir / 'survival_models' / f"{name}.pkl"
            with model_path.open("wb") as f:
                pickle.dump(model, f)

        except Exception as e:
            logger.error(f"Error training {name} model: {e}")

    X_val = data_val.drop(["time", "event"], axis=1)
    y_val = data_val[["time", "event"]]

    predictor = self._evaluate(trained_models, X_train, y_train, X_val, y_val, X_test, y_test)

    survival_metadata = {
        "model_scores": predictor.model_scores,
        "best_model": predictor.best_model
    }
    with (self.output_dir / "survival_models" / "survival_metadata.json").open("w") as f:
        json.dump(survival_metadata, f)

    return predictor, X_val, y_val

jarvais.trainer.modules.AutogluonTabularWrapper

Bases: BaseModel

Source code in src/jarvais/trainer/modules/autogluon_trainer.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
class AutogluonTabularWrapper(BaseModel):
    output_dir: Path = Field(
        description="Output directory.",
        title="Output Directory",
        examples=["output"]
    )
    target_variable: str = Field(
        description="Target variable.",
        title="Target Variable",
        examples=["tumor_stage"]
    )
    task: Literal["binary", "multiclass", "regression", "survival"] = Field(
        description="Task to perform.",
        title="Task",
        examples=["binary", "multiclass", "regression", "survival"]
    )
    eval_metric: str  = Field(
        description="Evaluation metric.",
        title="Evaluation Metric"
    )
    k_folds: int = Field(
        default=5,
        description="Number of folds.",
        title="Number of Folds"
    )
    extra_metrics: list = Field(
        default_factory=list,
        description="List of extra metrics to evaluate.",
        title="Extra Metrics",
        examples=["accuracy"]
    )
    kwargs: dict[str, Any] = Field(
        default_factory=dict,
        description="Additional arguments to pass to the model.",
        title="Additional Arguments",
        examples=[{"presets": "best_quality"}]
    )   

    _cv_scores: list = PrivateAttr(default_factory=list)
    _extra_metrics: list = PrivateAttr(default_factory=list) # Copy of extra_metrics to store the auprc scorer
    _kwargs: dict[str, Any] = PrivateAttr(default_factory=dict)
    _predictor: TabularPredictor | None = PrivateAttr(default=None)
    _predictors: list[TabularPredictor] = PrivateAttr(default_factory=list)

    def model_post_init(self, context: Any) -> None: # noqa: ANN001

        self._kwargs = self.kwargs.copy()
        custom_hyperparameters = get_hyperparameter_config('default')
        custom_hyperparameters[SimpleRegressionModel] = {}
        self._kwargs['hyperparameters'] = custom_hyperparameters

        self._extra_metrics = self.extra_metrics.copy()
        if 'auprc' in self.extra_metrics:
            ag_auprc_scorer = make_scorer(
                name='auprc', # Move this to a seperate file?
                score_func=auprc,
                optimum=1,
                greater_is_better=True,
                needs_class=True)

            self._extra_metrics.remove('auprc')
            self._extra_metrics.append(ag_auprc_scorer)

    @classmethod
    def build(
        cls,
        output_dir: str | Path,
        target_variable: str,
        task: str,
        k_folds: int = 5,
    ) -> "AutogluonTabularWrapper":  
        if task in {"binary", "multiclass"}:
            eval_metric = "roc_auc"
            extra_metrics = ['f1', 'auprc']
        elif task == "regression":
            eval_metric = "r2"
            extra_metrics = ['root_mean_squared_error']

        return cls(
            output_dir=Path(output_dir),
            target_variable=target_variable,
            task=task, # type:ignore
            k_folds=k_folds,
            eval_metric=eval_metric,
            extra_metrics=extra_metrics
        )

    def fit(
            self,
            X_train: pd.DataFrame,
            y_train: pd.Series,
            X_test: pd.DataFrame,
            y_test: pd.Series
        ) -> tuple[TabularPredictor, pd.DataFrame, pd.Series]:

        if self.k_folds > 1:
            self._predictor, X_val, y_val = self._train_autogluon_with_cv(
                X_train, 
                y_train,
            )

            train_leaderboards, val_leaderboards, test_leaderboards = [], [], []

            for predictor in self._predictors:
                train_leaderboards.append(predictor.leaderboard(pd.concat([X_train, y_train], axis=1), extra_metrics=self._extra_metrics))
                val_leaderboards.append(predictor.leaderboard(pd.concat([X_val, y_val], axis=1), extra_metrics=self._extra_metrics))
                test_leaderboards.append(predictor.leaderboard(pd.concat([X_test, y_test], axis=1), extra_metrics=self._extra_metrics))

            train_leaderboard = aggregate_folds(pd.concat(train_leaderboards, ignore_index=True), self._extra_metrics)
            val_leaderboard = aggregate_folds(pd.concat(val_leaderboards, ignore_index=True), self._extra_metrics)
            test_leaderboard = aggregate_folds(pd.concat(test_leaderboards, ignore_index=True), self._extra_metrics)
        else:
            self._predictor = TabularPredictor(
                label=self.target_variable, 
                problem_type=self.task, 
                eval_metric=self.eval_metric,
                path=(self.output_dir / 'autogluon_models' / 'autogluon_models_best_fold'),
                log_to_file=False,
            ).fit(
                pd.concat([X_train, y_train], axis=1),
                **self._kwargs
            )

            X_val, y_val = self._predictor.load_data_internal(data='val', return_y=True)

            train_leaderboard = self._predictor.leaderboard(
                pd.concat([X_train, y_train], axis=1),
                extra_metrics=self._extra_metrics).round(2)
            val_leaderboard = self._predictor.leaderboard(
                pd.concat([X_val, y_val], axis=1),
                extra_metrics=self._extra_metrics).round(2)
            test_leaderboard = self._predictor.leaderboard(
                pd.concat([X_test, y_test], axis=1),
                extra_metrics=self._extra_metrics).round(2)

        final_leaderboard = pd.merge(
            pd.merge(
                format_leaderboard(train_leaderboard, self.eval_metric, self._extra_metrics, 'score_train'),
                format_leaderboard(val_leaderboard, self.eval_metric, self._extra_metrics, 'score_val'),
                on='model'
            ),
            format_leaderboard(test_leaderboard, self.eval_metric, self._extra_metrics, 'score_test'),
            on='model'
        )

        final_leaderboard.to_csv(self.output_dir / 'leaderboard.csv', index=False)

        print('\nModel Leaderboard\n----------------') # noqa: T201
        print(tabulate( # noqa: T201
            final_leaderboard.sort_values(by='score_test', ascending=False),
            tablefmt = "grid",
            headers="keys",
            showindex=False))

        return self._predictor, X_val, y_val

    def _train_autogluon_with_cv(
            self,
            X_train: pd.DataFrame,
            y_train: pd.Series,
        ) -> tuple[TabularPredictor, pd.DataFrame, pd.Series]:

        kf = KFold(n_splits=self.k_folds, shuffle=True, random_state=42)

        val_indices = []    

        for fold, (train_index, val_index) in enumerate(kf.split(X_train)):
            X_train_cv, X_val_cv = X_train.iloc[train_index], X_train.iloc[val_index]
            y_train_cv, y_val_cv = y_train.iloc[train_index], y_train.iloc[val_index]

            val_indices.append(val_index)

            logger.info(f"Training fold {fold+1}/{self.k_folds}...")

            predictor = TabularPredictor(
                label=self.target_variable, 
                problem_type=self.task, 
                eval_metric=self.eval_metric,
                path=(self.output_dir / 'autogluon_models' / f'autogluon_models_fold_{fold}'),
                log_to_file=False,
                verbosity=0
            ).fit(
                pd.concat([X_train_cv, y_train_cv], axis=1),
                **self._kwargs
            )

            self._predictors.append(predictor)

            score = predictor.evaluate(pd.concat([X_val_cv, y_val_cv], axis=1))[self.eval_metric]
            logger.info(f"Fold {fold+1}/{self.k_folds} score: {score} ({self.eval_metric})")
            self._cv_scores.append(score)

        best_fold = self._cv_scores.index(max(self._cv_scores))

        shutil.copytree(
            self.output_dir / 'autogluon_models' / f'autogluon_models_fold_{best_fold}',
            self.output_dir / 'autogluon_models' / 'autogluon_models_best_fold', dirs_exist_ok=True
        )

        return self._predictors[best_fold], X_train.iloc[val_indices[best_fold]], y_train.iloc[val_indices[best_fold]]

Trainer Settings

jarvais.trainer.settings.TrainerSettings

Bases: BaseModel

Source code in src/jarvais/trainer/settings.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class TrainerSettings(BaseModel):

    output_dir: Path = Field(
        description="Output directory.",
        title="Output Directory",
        examples=["output"]
    )
    target_variable: str | list[str] = Field(
        description="Target variable. Can be a list only for survival analysis.",
        title="Target Variable",
        examples=["tumor_stage", ["time", "event"]]
    )
    task: str = Field(
        description="Task to perform.",
        title="Task",
        examples=["binary", "multiclass", "regression", "survival"]
    )
    stratify_on: str | None = Field(
        description="Variable to stratify on.",
        title="Stratify On",
        examples=["gender"]
    )
    test_size: float = Field(
        default=0.2,
        description="Test size.",
        title="Test Size"
    )
    random_state: int = Field(
        default=42,
        description="Random state.",
        title="Random State"
    )
    explain: bool = Field(
        default=False,
        description="Whether to generate explainability reports for the model.",
        title="Generate Explainability Model"
    )

    encoding_module: OneHotEncodingModule
    reduction_module: FeatureReductionModule
    trainer_module: SurvivalTrainerModule | AutogluonTabularWrapper

    def model_post_init(self, __context) -> None: # type: ignore # noqa: ANN001
        self.output_dir = Path(self.output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)

    @classmethod
    def validate_task(cls, task: str) -> str:
        if task not in ['binary', 'multiclass', 'regression', 'survival', None]:
            raise ValueError("Invalid task parameter. Choose one of: 'binary', 'multiclass', 'regression', 'survival'.")
        return task