"""
Class to perform over-sampling using Geometric SMOTE. This is a modified
version of the original Geometric SMOTE implementation.
"""
# Author: Georgios Douzas <gdouzas@icloud.com>
# Joao Fonseca <jpmrfonseca@gmail.com>
# License: BSD 3 clause
import math
import numpy as np
from collections import Counter
from numpy.linalg import norm
from scipy import sparse
from sklearn.utils import check_random_state, check_array
from sklearn.utils.multiclass import check_classification_targets
from sklearn.utils.validation import _check_sample_weight
from sklearn.utils.sparsefuncs_fast import (
csr_mean_variance_axis0,
csc_mean_variance_axis0,
)
from sklearn.preprocessing import OneHotEncoder, label_binarize
from imblearn.over_sampling.base import BaseOverSampler
from imblearn.utils import (
check_neighbors_object,
Substitution,
check_target_type,
check_sampling_strategy,
)
from imblearn.utils._docstring import _random_state_docstring
from imblearn.utils._validation import ArraysTransformer
SELECTION_STRATEGY = ("combined", "majority", "minority")
def _make_geometric_sample(
center, surface_point, truncation_factor, deformation_factor, random_state
):
"""A support function that returns an artificial point inside
the geometric region defined by the center and surface points.
Parameters
----------
center : ndarray, shape (n_features, )
Center point of the geometric region.
surface_point : ndarray, shape (n_features, )
Surface point of the geometric region.
truncation_factor : float, optional (default=0.0)
The type of truncation. The values should be in the [-1.0, 1.0] range.
deformation_factor : float, optional (default=0.0)
The type of geometry. The values should be in the [0.0, 1.0] range.
random_state : int, RandomState instance or None
Control the randomization of the algorithm.
Returns
-------
point : ndarray, shape (n_features, )
Synthetically generated sample.
"""
# Zero radius case
if np.array_equal(center, surface_point):
return center
# Generate a point on the surface of a unit hyper-sphere
radius = norm(center - surface_point)
normal_samples = random_state.normal(size=center.size)
point_on_unit_sphere = normal_samples / norm(normal_samples)
point = (random_state.uniform(size=1) ** (1 / center.size)) * point_on_unit_sphere
# Parallel unit vector
parallel_unit_vector = (surface_point - center) / norm(surface_point - center)
# Truncation
close_to_opposite_boundary = (
truncation_factor > 0
and np.dot(point, parallel_unit_vector) < truncation_factor - 1
)
close_to_boundary = (
truncation_factor < 0
and np.dot(point, parallel_unit_vector) > truncation_factor + 1
)
if close_to_opposite_boundary or close_to_boundary:
point -= 2 * np.dot(point, parallel_unit_vector) * parallel_unit_vector
# Deformation
parallel_point_position = np.dot(point, parallel_unit_vector) * parallel_unit_vector
perpendicular_point_position = point - parallel_point_position
point = (
parallel_point_position
+ (1 - deformation_factor) * perpendicular_point_position
)
# Translation
point = center + radius * point
return point
def _make_categorical_sample(X_new, all_neighbors, categories_size, random_state):
"""A support function that populates categorical features' values
in an artificial point.
Parameters
----------
X_new : ndarray, shape (n_features, )
Artificial point to populate categorical features.
all_neighbors: ndarray, shape (n_features, k_neighbors)
Nearest neighbors used for majority voting.
categories_size: list
Used to tell apart one-hot encoded features.
random_state : int, RandomState instance or None
Control the randomization of the algorithm. Used
for tie breaking when there are two majority values.
Returns
-------
point : ndarray, shape (n_features, )
Synthetically generated sample.
"""
for start_idx, end_idx in zip(
np.cumsum(categories_size)[:-1], np.cumsum(categories_size)[1:]
):
col_maxs = all_neighbors[:, start_idx:end_idx].sum(axis=0)
# tie breaking argmax
is_max = np.isclose(col_maxs, col_maxs.max(axis=0))
max_idxs = random_state.permutation(np.argwhere(is_max))
col_sels = max_idxs[0]
ys = start_idx + col_sels
X_new[start_idx:end_idx] = 0
X_new[ys] = 1
return X_new
[docs]
@Substitution(
sampling_strategy=BaseOverSampler._sampling_strategy_docstring,
random_state=_random_state_docstring,
)
class GeometricSMOTE(BaseOverSampler):
"""Class to to perform over-sampling using Geometric SMOTE.
This algorithm is an implementation of Geometric SMOTE, a geometrically
enhanced drop-in replacement for SMOTE as presented in [1]_.
Parameters
----------
categorical_features : ndarray of shape (n_cat_features,) or (n_features,)
Specified which features are categorical. Can either be:
- array of indices specifying the categorical features;
- mask array of shape (n_features, ) and ``bool`` dtype for which
``True`` indicates the categorical features.
{sampling_strategy}
{random_state}
truncation_factor : float, optional (default=0.0)
The type of truncation. The values should be in the [-1.0, 1.0] range.
deformation_factor : float, optional (default=0.0)
The type of geometry. The values should be in the [0.0, 1.0] range.
selection_strategy : str, optional (default='combined')
The type of Geometric SMOTE algorithm with the following options:
``'combined'``, ``'majority'``, ``'minority'``.
k_neighbors : int or object, optional (default=5)
If ``int``, number of nearest neighbours to use when synthetic
samples are constructed for the minority method. If object, an estimator
that inherits from :class:`sklearn.neighbors.base.KNeighborsMixin` that
will be used to find the k_neighbors.
n_jobs : int, optional (default=1)
The number of threads to open if possible.
Attributes
----------
sampling_strategy_ : dict
Dictionary containing the information to sample the dataset. The keys
corresponds to the class labels from which to sample and the values
are the number of samples to sample.
n_features_in_ : int
Number of features in the input dataset.
nns_pos_ : estimator object
Validated k-nearest neighbours created from the `k_neighbors` parameter. It is
used to find the nearest neighbors of the same class of a selected
observation.
nn_neg_ : estimator object
Validated k-nearest neighbours created from the `k_neighbors` parameter. It is
used to find the nearest neighbor of the remaining classes (k=1) of a selected
observation.
random_state_ : instance of RandomState
If the `random_state` parameter is None, it is a RandomState singleton used by
np.random. If `random_state` is an int, it is a RandomState instance seeded with
seed. If `random_state` is already a RandomState instance, it is the same
object.
Notes
-----
See the original paper: [1]_ for more details.
Supports multi-class resampling. A one-vs.-rest scheme is used as
originally proposed in [2]_.
References
----------
.. [1] G. Douzas, F. Bacao, "Geometric SMOTE:
a geometrically enhanced drop-in replacement for SMOTE",
Information Sciences, vol. 501, pp. 118-135, 2019.
.. [2] N. V. Chawla, K. W. Bowyer, L. O. Hall, W. P. Kegelmeyer, "SMOTE:
synthetic minority over-sampling technique", Journal of Artificial
Intelligence Research, vol. 16, pp. 321-357, 2002.
Examples
--------
>>> from collections import Counter
>>> from sklearn.datasets import make_classification
>>> from gsmote import GeometricSMOTE # doctest: +NORMALIZE_WHITESPACE
>>> X, y = make_classification(n_classes=2, class_sep=2,
... weights=[0.1, 0.9], n_informative=3, n_redundant=1, flip_y=0,
... n_features=20, n_clusters_per_class=1, n_samples=1000, random_state=10)
>>> print('Original dataset shape %s' % Counter(y))
Original dataset shape Counter({{1: 900, 0: 100}})
>>> gsmote = GeometricSMOTE(random_state=1)
>>> X_res, y_res = gsmote.fit_resample(X, y)
>>> print('Resampled dataset shape %s' % Counter(y_res))
Resampled dataset shape Counter({{0: 900, 1: 900}})
"""
def __init__(
self,
sampling_strategy="auto",
random_state=None,
truncation_factor=1.0,
deformation_factor=0.0,
selection_strategy="combined",
k_neighbors=5,
categorical_features=None,
n_jobs=1,
):
super(GeometricSMOTE, self).__init__(sampling_strategy=sampling_strategy)
self.random_state = random_state
self.truncation_factor = truncation_factor
self.deformation_factor = deformation_factor
self.selection_strategy = selection_strategy
self.k_neighbors = k_neighbors
self.categorical_features = categorical_features
self.n_jobs = n_jobs
def _validate_estimator(self):
"""Create the necessary attributes for Geometric SMOTE."""
# Check random state
self.random_state_ = check_random_state(self.random_state)
# Validate strategy
if self.selection_strategy not in SELECTION_STRATEGY:
error_msg = (
"Unknown selection_strategy for Geometric SMOTE algorithm. "
"Choices are {}. Got {} instead."
)
raise ValueError(
error_msg.format(SELECTION_STRATEGY, self.selection_strategy)
)
# Create nearest neighbors object for positive class
if self.selection_strategy in ("minority", "combined"):
self.nns_pos_ = check_neighbors_object(
"nns_positive", self.k_neighbors, additional_neighbor=1
)
self.nns_pos_.set_params(n_jobs=self.n_jobs)
# Create nearest neighbors object for negative class
if self.selection_strategy in ("majority", "combined"):
self.nn_neg_ = check_neighbors_object("nn_negative", nn_object=1)
self.nn_neg_.set_params(n_jobs=self.n_jobs)
def _validate_categorical(self):
"""Create the necessary attributes for Geometric SMOTE
with categorical features"""
if self.categorical_features is None:
return self
categorical_features = np.asarray(self.categorical_features)
if categorical_features.dtype.name == "bool":
self.categorical_features_ = np.flatnonzero(categorical_features)
else:
if any(
[
cat not in np.arange(self.n_features_in_)
for cat in categorical_features
]
):
raise ValueError(
"Some of the categorical indices are out of range. Indices"
" should be between 0 and {}".format(self.n_features_in_)
)
self.categorical_features_ = categorical_features
self.continuous_features_ = np.setdiff1d(
np.arange(self.n_features_in_), self.categorical_features_
)
if self.categorical_features_.size == self.n_features_in_:
raise ValueError(
"GeometricSMOTE is not designed to work only with categorical "
"features. It requires some numerical features."
)
return self
def _check_X_y(self, X, y):
"""Overwrite the checking to let pass some string for categorical
features.
"""
y, binarize_y = check_target_type(y, indicate_one_vs_all=True)
X, y = self._validate_data(
X, y, reset=True, dtype=None, accept_sparse=["csr", "csc"]
)
return X, y, binarize_y
def _make_geometric_samples(
self, X, y, pos_class_label, n_samples, sample_weight=None
):
"""A support function that returns an artificials samples inside
the geometric region defined by nearest neighbors.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Matrix containing the data which have to be sampled.
y : array-like, shape (n_samples, )
Corresponding label for each sample in X.
pos_class_label : str or int
The minority class (positive class) target value.
n_samples : int
The number of samples to generate.
sample_weight : 1-D array-like, optional
The probabilities associated with each entry in a.
If not given, the sample assumes a uniform distribution over all
entries.
Returns
-------
X_new : ndarray, shape (n_samples_new, n_features)
Synthetically generated samples.
y_new : ndarray, shape (n_samples_new, )
Target values for synthetic samples.
"""
# Return zero new samples
if n_samples == 0:
return (
np.array([], dtype=X.dtype).reshape(0, X.shape[1]),
np.array([], dtype=y.dtype),
np.array([], dtype=X.dtype),
)
# Select positive class samples
X_pos = X[y == pos_class_label]
if sample_weight is not None:
sample_weight_pos = (
sample_weight[y == pos_class_label]
if sample_weight[y == pos_class_label].sum() != 0
else None
)
else:
sample_weight_pos = None
# Force minority strategy if no negative class samples are present
self.selection_strategy_ = (
"minority" if X.shape[0] == X_pos.shape[0] else self.selection_strategy
)
# Minority or combined strategy
if self.selection_strategy_ in ("minority", "combined"):
self.nns_pos_.fit(X_pos)
points_pos = self.nns_pos_.kneighbors(X_pos)[1][:, 1:]
weight_pos = (
np.repeat(sample_weight_pos, self.k_neighbors)
/ (sample_weight_pos.sum() * self.k_neighbors)
if sample_weight_pos is not None
else None
)
samples_indices = self.random_state_.choice(
range(0, len(points_pos.flatten())), size=n_samples, p=weight_pos
)
rows = np.floor_divide(samples_indices, points_pos.shape[1])
cols = np.mod(samples_indices, points_pos.shape[1])
# Majority or combined strategy
if self.selection_strategy_ in ("majority", "combined"):
X_neg = X[y != pos_class_label]
self.nn_neg_.fit(X_neg)
points_neg = self.nn_neg_.kneighbors(X_pos)[1]
weight_neg = (
sample_weight_pos / sample_weight_pos.sum()
if sample_weight_pos is not None
else None
)
if self.selection_strategy_ == "majority":
samples_indices = self.random_state_.choice(
range(0, len(points_neg.flatten())), size=n_samples, p=weight_neg
)
rows = np.floor_divide(samples_indices, points_neg.shape[1])
cols = np.mod(samples_indices, points_neg.shape[1])
# In the case that the median std was equal to zeros, we have to
# create non-null entry based on the encoded of OHE
if self.categorical_features is not None:
if math.isclose(self.median_std_, 0):
X[:, self.continuous_features_.size :] = self._X_categorical_encoded
# Select positive class samples
X_pos = X[y == pos_class_label]
if self.selection_strategy_ in ("majority", "combined"):
X_neg = X[y != pos_class_label]
# Generate new samples
X_new = np.zeros((n_samples, X.shape[1]))
all_neighbors_ = []
for ind, (row, col) in enumerate(zip(rows, cols)):
# Define center point
center = X_pos[row]
# Minority strategy
if self.selection_strategy_ == "minority":
surface_point = X_pos[points_pos[row, col]]
all_neighbors = (
(X_pos[points_pos[row]])
if self.categorical_features is not None
else None
)
# Majority strategy
elif self.selection_strategy_ == "majority":
surface_point = X_neg[points_neg[row, col]]
all_neighbors = (
(X_neg[points_neg[row]])
if self.categorical_features is not None
else None
)
# Combined strategy
else:
surface_point_pos = X_pos[points_pos[row, col]]
surface_point_neg = X_neg[points_neg[row, 0]]
radius_pos = norm(center - surface_point_pos)
radius_neg = norm(center - surface_point_neg)
surface_point = (
surface_point_neg if radius_pos > radius_neg else surface_point_pos
)
all_neighbors = (
np.vstack([X_pos[points_pos[row]], X_neg[points_neg[row]]])
if self.categorical_features is not None
else None
)
if self.categorical_features is not None:
all_neighbors_.append(all_neighbors)
# Append new sample - no categorical features
X_new[ind] = _make_geometric_sample(
center,
surface_point,
self.truncation_factor,
self.deformation_factor,
self.random_state_,
)
# Create new samples for target variable
y_new = np.array([pos_class_label] * len(samples_indices))
return X_new, y_new, all_neighbors_
def _make_categorical_samples(self, X_new, y_new, categories_size, all_neighbors_):
for ind, all_neighbors in enumerate(all_neighbors_):
# Append new sample - continuous features
X_new[ind] = _make_categorical_sample(
X_new[ind], all_neighbors, categories_size, self.random_state_
)
return X_new, y_new
def _encode_categorical(self, X, y):
"""
One-Hot encodes categorical features and replaces the 1 entries with the median
of the standard deviations divided by 2.
"""
# compute the median of the standard deviation of the minority class
target_stats = Counter(y)
class_minority = min(target_stats, key=target_stats.get)
# Separate categorical features from continuous features
X_continuous = X[:, self.continuous_features_]
X_continuous = check_array(X_continuous, accept_sparse=["csr", "csc"])
X_categorical = X[:, self.categorical_features_].copy()
X_minority = X_continuous[np.flatnonzero(y == class_minority)]
if sparse.issparse(X):
if X.format == "csr":
_, var = csr_mean_variance_axis0(X_minority)
else:
_, var = csc_mean_variance_axis0(X_minority)
else:
var = X_minority.var(axis=0)
self.median_std_ = np.median(np.sqrt(var))
if X_continuous.dtype.name != "object":
dtype_ohe = X_continuous.dtype
else:
dtype_ohe = np.float64
self.ohe_ = OneHotEncoder(
sparse_output=True, handle_unknown="ignore", dtype=dtype_ohe
)
# the input of the OneHotEncoder needs to be dense
X_ohe = self.ohe_.fit_transform(
X_categorical.toarray() if sparse.issparse(X_categorical) else X_categorical
)
# we can replace the 1 entries of the categorical features with the
# median of the standard deviation. It will ensure that whenever
# distance is computed between 2 samples, the difference will be equal
# to the median of the standard deviation as in the original paper.
# In the edge case where the median of the std is equal to 0, the 1s
# entries will be also nullified. In this case, we store the original
# categorical encoding which will be later used for inversing the OHE
if math.isclose(self.median_std_, 0):
self._X_categorical_encoded = X_ohe.toarray()
X_ohe.data = np.ones_like(X_ohe.data, dtype=X_ohe.dtype) * self.median_std_ / 2
if self._issparse:
X_encoded = np.hstack([X_continuous.toarray(), X_ohe.toarray()])
else:
X_encoded = np.hstack([X_continuous, X_ohe.toarray()])
return X_encoded
def _decode_categorical(self, X_resampled):
"""Reverses the encoding of the categorical features to match
the dataset's original structure."""
if math.isclose(self.median_std_, 0):
X_resampled[
: self._X_categorical_encoded.shape[0], self.continuous_features_.size :
] = self._X_categorical_encoded
X_resampled = sparse.csr_matrix(X_resampled)
X_res_cat = X_resampled[:, self.continuous_features_.size :]
X_res_cat.data = np.ones_like(X_res_cat.data)
X_res_cat_dec = self.ohe_.inverse_transform(X_res_cat)
if self._issparse:
X_resampled = sparse.hstack(
(X_resampled[:, : self.continuous_features_.size], X_res_cat_dec),
format="csr",
)
else:
X_resampled = np.hstack(
(
X_resampled[:, : self.continuous_features_.size].toarray(),
X_res_cat_dec,
)
)
indices_reordered = np.argsort(
np.hstack((self.continuous_features_, self.categorical_features_))
)
if sparse.issparse(X_resampled):
col_indices = X_resampled.indices.copy()
for idx, col_idx in enumerate(indices_reordered):
mask = X_resampled.indices == col_idx
col_indices[mask] = idx
X_resampled.indices = col_indices
else:
X_resampled = X_resampled[:, indices_reordered]
return X_resampled
def _fit_resample(self, X, y, sample_weight=None):
# Save basic data
self._issparse = sparse.issparse(X)
X_dtype = X.dtype
# Validate estimator's parameters
self._validate_categorical()._validate_estimator()
# Preprocess categorical data
if self.categorical_features is not None:
X = self._encode_categorical(X, y)
categories_size = [self.continuous_features_.size] + [
cat.size for cat in self.ohe_.categories_
]
# Copy data
X_resampled, y_resampled = X.copy(), y.copy()
# Resample
for class_label, n_samples in self.sampling_strategy_.items():
# Apply gsmote mechanism
X_new, y_new, all_neighbors_ = self._make_geometric_samples(
X, y, class_label, n_samples, sample_weight=sample_weight
)
# Apply smotenc mechanism
if self.categorical_features is not None:
X_new, y_new = self._make_categorical_samples(
X_new, y_new, categories_size, all_neighbors_
)
# Append new data
X_resampled, y_resampled = (
np.vstack((X_resampled, X_new)),
np.hstack((y_resampled, y_new)),
)
# reverse the encoding of the categorical features
if self.categorical_features is not None:
X_resampled = self._decode_categorical(X_resampled).astype(X_dtype)
else:
X_resampled = X_resampled.astype(X_dtype)
return X_resampled, y_resampled
[docs]
def fit_resample(self, X, y, sample_weight=None):
"""Resample the dataset.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
Matrix containing the data which have to be sampled.
y : array-like of shape (n_samples,)
Corresponding label for each sample in X.
sample_weight : array-like of shape (n_samples,), default=None
Individual weights for each sample. Assigns probabilities for selecting a
sample as a center point.
Returns
-------
X_resampled : {array-like, sparse matrix} of shape \
(n_samples_new, n_features)
The array containing the resampled data.
y_resampled : array-like of shape (n_samples_new,)
The corresponding label of `X_resampled`.
"""
check_classification_targets(y)
arrays_transformer = ArraysTransformer(X, y)
X, y, binarize_y = self._check_X_y(X, y)
if sample_weight is not None:
sample_weight = _check_sample_weight(sample_weight, X, dtype=X.dtype)
self.sampling_strategy_ = check_sampling_strategy(
self.sampling_strategy, y, self._sampling_type
)
output = self._fit_resample(X, y, sample_weight)
y_ = (
label_binarize(output[1], classes=np.unique(y)) if binarize_y else output[1]
)
X_, y_ = arrays_transformer.transform(output[0], y_)
return (X_, y_) if len(output) == 2 else (X_, y_, output[2])