Created
March 24, 2025 06:55
-
-
Save chienhsiang-hung/5497a59c4692b5f3b4eeeeb0069211df to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from sklearn.model_selection import KFold | |
from sklearn.model_selection._split import _BaseKFold, indexable, _num_samples | |
from sklearn.utils.validation import _deprecate_positional_args | |
# modified code for group gaps; source | |
# https://github.com/getgaurav2/scikit-learn/blob/d4a3af5cc9da3a76f0266932644b884c99724c57/sklearn/model_selection/_split.py#L2243 | |
class PurgedGroupTimeSeriesSplit(_BaseKFold): | |
"""Time Series cross-validator variant with non-overlapping groups. | |
Allows for a gap in groups to avoid potentially leaking info from | |
train into test if the model has windowed or lag features. | |
Provides train/test indices to split time series data samples | |
that are observed at fixed time intervals according to a | |
third-party provided group. | |
In each split, test indices must be higher than before, and thus shuffling | |
in cross validator is inappropriate. | |
This cross-validation object is a variation of :class:`KFold`. | |
In the kth split, it returns first k folds as train set and the | |
(k+1)th fold as test set. | |
The same group will not appear in two different folds (the number of | |
distinct groups has to be at least equal to the number of folds). | |
Note that unlike standard cross-validation methods, successive | |
training sets are supersets of those that come before them. | |
Read more in the :ref:`User Guide <cross_validation>`. | |
Parameters | |
---------- | |
n_splits : int, default=5 | |
Number of splits. Must be at least 2. | |
max_train_group_size : int, default=Inf | |
Maximum group size for a single training set. | |
group_gap : int, default=None | |
Gap between train and test | |
max_test_group_size : int, default=Inf | |
We discard this number of groups from the end of each train split | |
""" | |
@_deprecate_positional_args | |
def __init__(self, | |
n_splits=5, | |
*, | |
max_train_group_size=np.inf, | |
max_test_group_size=np.inf, | |
group_gap=None, | |
verbose=False | |
): | |
super().__init__(n_splits, shuffle=False, random_state=None) | |
self.max_train_group_size = max_train_group_size | |
self.group_gap = group_gap | |
self.max_test_group_size = max_test_group_size | |
self.verbose = verbose | |
def split(self, X, y=None, groups=None): | |
"""Generate indices to split data into training and test set. | |
Parameters | |
---------- | |
X : array-like of shape (n_samples, n_features) | |
Training data, where n_samples is the number of samples | |
and n_features is the number of features. | |
y : array-like of shape (n_samples,) | |
Always ignored, exists for compatibility. | |
groups : array-like of shape (n_samples,) | |
Group labels for the samples used while splitting the dataset into | |
train/test set. | |
Yields | |
------ | |
train : ndarray | |
The training set indices for that split. | |
test : ndarray | |
The testing set indices for that split. | |
""" | |
if groups is None: | |
raise ValueError( | |
"The 'groups' parameter should not be None") | |
X, y, groups = indexable(X, y, groups) | |
n_samples = _num_samples(X) | |
n_splits = self.n_splits | |
group_gap = self.group_gap | |
max_test_group_size = self.max_test_group_size | |
max_train_group_size = self.max_train_group_size | |
n_folds = n_splits + 1 | |
group_dict = {} | |
u, ind = np.unique(groups, return_index=True) | |
unique_groups = u[np.argsort(ind)] | |
n_samples = _num_samples(X) | |
n_groups = _num_samples(unique_groups) | |
for idx in np.arange(n_samples): | |
if (groups[idx] in group_dict): | |
group_dict[groups[idx]].append(idx) | |
else: | |
group_dict[groups[idx]] = [idx] | |
if n_folds > n_groups: | |
raise ValueError( | |
("Cannot have number of folds={0} greater than" | |
" the number of groups={1}").format(n_folds, | |
n_groups)) | |
group_test_size = min(n_groups // n_folds, max_test_group_size) | |
group_test_starts = range(n_groups - n_splits * group_test_size, | |
n_groups, group_test_size) | |
for group_test_start in group_test_starts: | |
train_array = [] | |
test_array = [] | |
group_st = max(0, group_test_start - group_gap - max_train_group_size) | |
for train_group_idx in unique_groups[group_st:(group_test_start - group_gap)]: | |
train_array_tmp = group_dict[train_group_idx] | |
train_array = np.sort(np.unique( | |
np.concatenate((train_array, | |
train_array_tmp)), | |
axis=None), axis=None) | |
train_end = train_array.size | |
for test_group_idx in unique_groups[group_test_start: | |
group_test_start + | |
group_test_size]: | |
test_array_tmp = group_dict[test_group_idx] | |
test_array = np.sort(np.unique( | |
np.concatenate((test_array, | |
test_array_tmp)), | |
axis=None), axis=None) | |
test_array = test_array[group_gap:] | |
if self.verbose > 0: | |
pass | |
yield [int(i) for i in train_array], [int(i) for i in test_array] | |
class CopilotPurgedGroupTimeSeriesSplit(_BaseKFold): | |
@_deprecate_positional_args | |
def __init__(self, n_splits=5, *, max_train_group_size=np.inf, max_test_group_size=np.inf, group_gap=None, verbose=False): | |
super().__init__(n_splits, shuffle=False, random_state=None) | |
self.max_train_group_size = max_train_group_size | |
self.group_gap = group_gap | |
self.max_test_group_size = max_test_group_size | |
self.verbose = verbose | |
def split(self, X, y=None, groups=None): | |
if groups is None: | |
raise ValueError("The 'groups' parameter should not be None") | |
X, y, groups = indexable(X, y, groups) | |
n_splits = self.n_splits | |
group_gap = self.group_gap | |
max_test_group_size = self.max_test_group_size | |
max_train_group_size = self.max_train_group_size | |
unique_groups, group_indices = np.unique(groups, return_inverse=True) | |
n_groups = len(unique_groups) | |
n_folds = n_splits + 1 | |
if n_folds > n_groups: | |
raise ValueError(f"Cannot have number of folds={n_folds} greater than the number of groups={n_groups}") | |
group_test_size = min(n_groups // n_folds, max_test_group_size) | |
group_test_starts = range(n_groups - n_splits * group_test_size, n_groups, group_test_size) | |
for group_test_start in group_test_starts: | |
test_indices = np.where((group_indices >= group_test_start) & | |
(group_indices < group_test_start + group_test_size))[0] | |
if group_gap is not None: | |
test_indices = test_indices[group_gap:] | |
if max_train_group_size is not None: | |
train_start = max(0, group_test_start - group_gap - max_train_group_size) | |
else: | |
train_start = 0 | |
train_indices = np.where(group_indices >= train_start)[0] | |
train_indices = train_indices[train_indices < group_test_start - group_gap] | |
if self.verbose: | |
print(f"Train indices: {train_indices}, Test indices: {test_indices}") | |
yield train_indices, test_indices |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment