-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathml_utils.py
More file actions
234 lines (185 loc) · 8.09 KB
/
Copy pathml_utils.py
File metadata and controls
234 lines (185 loc) · 8.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
"""
ml_utils.py - Common utilities for ML experiments
"""
import json
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple, TYPE_CHECKING
import numpy as np
import pandas as pd
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from sklearn.base import BaseEstimator
import torch
if TYPE_CHECKING:
from scenarios import SubExperiment
def load_config(config_path: str) -> Dict[str, Any]:
"""Load configuration from JSON file."""
with open(config_path, 'r') as f:
return json.load(f)
def merge_configs(base_config: Dict[str, Any], overrides: Dict[str, Any]) -> Dict[str, Any]:
"""Merge configuration with command line overrides."""
config = base_config.copy()
# Handle nested updates for param_grids
if 'param_grids' in overrides and 'param_grids' in config:
config['param_grids'] = base_config.get('param_grids', {}).copy()
config['param_grids'].update(overrides['param_grids'])
overrides = {k: v for k, v in overrides.items() if k != 'param_grids'}
# Update top-level configs
config.update(overrides)
return config
def check_gpu_availability() -> bool:
"""Check if CUDA GPU is available."""
try:
return torch.cuda.is_available()
except:
return False
def prepare_param_grid(base_params: Dict[str, Any],
early_stopping: bool = False,
use_gpu: bool = False,
model_type: str = "") -> Dict[str, Any]:
"""Prepare parameter grid with runtime modifications."""
params = base_params.copy()
# Model-specific modifications
if model_type == "xgboost":
if early_stopping:
params["n_estimators"] = [1000]
if use_gpu:
params["device"] = ["cuda"]
else:
params["device"] = ["cpu"]
elif model_type == "catboost":
if early_stopping:
params["iterations"] = [1000]
elif model_type == "mlp":
if early_stopping:
params["early_stopping"] = [True]
else:
params["early_stopping"] = [False]
elif model_type == "logisticregression":
# Ensure solver compatibility with penalty
if "penalty" in params and "solver" in params:
# If L1 is in penalties, ensure compatible solvers
if "l1" in params["penalty"]:
compatible_solvers = ["liblinear", "saga"]
params["solver"] = [s for s in params["solver"] if s in compatible_solvers]
return params
# def perform_grid_search(model_class: BaseEstimator,
# param_grid: Dict[str, Any],
# X_train: np.ndarray,
# y_train: np.ndarray,
# cv_folds: int = 2,
# random_seed: int = 42,
# n_jobs: int = -1) -> Tuple[BaseEstimator, Dict[str, Any]]:
# """Perform grid search and return best model and parameters."""
# min_samples_per_class = np.bincount(y_train).min()
# if min_samples_per_class < cv_folds:
# # Not enough samples for cross-validation
# # Use default parameters or first value from grid
# default_params = {}
# for key, values in param_grid.items():
# if isinstance(values, list) and len(values) > 0:
# default_params[key] = values[0]
# else:
# default_params[key] = values
# model = model_class(**default_params)
# model.fit(X_train, y_train)
# return model, default_params
def perform_grid_search(model_instance: BaseEstimator, # <-- Changed parameter name for clarity
param_grid: Dict[str, Any],
X_train: np.ndarray,
y_train: np.ndarray,
cv_folds: int = 2,
random_seed: int = 42,
n_jobs: int = -1) -> Tuple[BaseEstimator, Dict[str, Any], bool]:
"""Perform grid search and return best model and parameters."""
min_samples_per_class = np.bincount(y_train).min()
if min_samples_per_class < cv_folds:
# Not enough samples for cross-validation
# Use the provided instance directly
# Only return the params that were in the original param_grid
default_params = {}
for key, values in param_grid.items():
if isinstance(values, list) and len(values) > 0:
default_params[key] = values[0]
else:
default_params[key] = values
model_instance.fit(X_train, y_train)
return model_instance, default_params, False
# Perform grid search
cv = StratifiedKFold(n_splits=cv_folds, shuffle=True, random_state=random_seed)
grid_search = GridSearchCV(
model_instance,
param_grid,
cv=cv,
scoring="accuracy",
n_jobs=n_jobs,
verbose=0
)
grid_search.fit(X_train, y_train)
return grid_search.best_estimator_, grid_search.best_params_, True
def get_experiment_filters(experiment_config: Dict[str, Any],
df_columns: List[str]) -> Tuple[List[Any], Any, str]:
"""
Parse experiment configuration to get train/test filters.
Returns:
tuple: (train_values, test_value, column_name)
"""
# Check experiment type
if experiment_config.get("session", False):
return (experiment_config["train"],
experiment_config["test"],
"session_id")
elif "platform" in experiment_config:
return (experiment_config["train"],
experiment_config["test"],
"platform_id")
else:
raise ValueError(f"Unknown experiment type in config: {experiment_config}")
def validate_dataset(df, required_columns: List[str]) -> None:
"""Validate that dataset contains required columns."""
missing = set(required_columns) - set(df.columns)
if missing:
raise ValueError(f"Dataset missing required columns: {missing}")
def get_feature_columns(df_columns: List[str]) -> List[str]:
"""Get feature columns by excluding metadata columns."""
metadata_cols = {"user_id", "platform_id", "session_id", "video_id", "sequence_id",
"key1", "key2", "key1_press", "key1_release", "key2_press", "key2_release",
"key1_timestamp", "valid", "error_description"}
return [col for col in df_columns if col not in metadata_cols]
def apply_sub_experiment_filters(df: pd.DataFrame, sub_experiment: "SubExperiment") -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Apply train and test filters from a SubExperiment to a DataFrame.
Args:
df: The full dataset as a pandas DataFrame
sub_experiment: A SubExperiment object containing train and test filters
Returns:
Tuple of (train_df, test_df)
"""
# Build train mask
train_mask = pd.Series([True] * len(df), index=df.index)
for col, values in sub_experiment.train_filter.items():
if col in df.columns:
train_mask &= df[col].isin(values)
# Build test mask
test_mask = pd.Series([True] * len(df), index=df.index)
for col, values in sub_experiment.test_filter.items():
if col in df.columns:
test_mask &= df[col].isin(values)
train_df = df[train_mask]
test_df = df[test_mask]
return train_df, test_df
def get_sub_experiment_data(df: pd.DataFrame, sub_experiment: "SubExperiment") -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Extract train/test X and y arrays for a SubExperiment.
Args:
df: The full dataset as a pandas DataFrame
sub_experiment: A SubExperiment object
Returns:
Tuple of (X_train, X_test, y_train, y_test)
"""
train_df, test_df = apply_sub_experiment_filters(df, sub_experiment)
feature_cols = get_feature_columns(df.columns.tolist())
X_train = train_df[feature_cols].values
X_test = test_df[feature_cols].values
y_train = train_df["user_id"].values
y_test = test_df["user_id"].values
return X_train, X_test, y_train, y_test