The Python Oracle

Custom transformer for sklearn Pipeline that alters both X and y

This video explains
Custom transformer for sklearn Pipeline that alters both X and y

--

Become part of the top 3% of the developers by applying to Toptal
https://topt.al/25cXVn

--

Music by Eric Matyas
https://www.soundimage.org
Track title: Luau

--

Chapters
00:00 Question
01:31 Accepted answer (Score 17)
02:24 Answer 2 (Score 4)
05:31 Answer 3 (Score 4)
06:15 Answer 4 (Score 1)
06:51 Thank you

--

Full question
https://stackoverflow.com/questions/2553...

Answer 2 links:
[FunctionSampler]: https://imbalanced-learn.org/stable/refe...

--

Content licensed under CC BY-SA
https://meta.stackexchange.com/help/lice...

--

Tags
#python #pandas #numpy #machinelearning #scikitlearn

#avk47



ACCEPTED ANSWER

Score 17


Modifying the sample axis, e.g. removing samples, does not (yet?) comply with the scikit-learn transformer API. So if you need to do this, you should do it outside any calls to scikit learn, as preprocessing.

As it is now, the transformer API is used to transform the features of a given sample into something new. This can implicitly contain information from other samples, but samples are never deleted.

Another option is to attempt to impute the missing values. But again, if you need to delete samples, treat it as preprocessing before using scikit learn.




ANSWER 2

Score 12


You have to modify the internal code of sklearn Pipeline.

We define a transformer that removes samples where at least the value of a feature or the target is NaN during fitting (fit_transform). While it removes the samples where at least the value of a feature is NaN during inference (transform). Important to note that our transformer returns X and y in fit_transform so we need to handle this behaviour in the sklearn Pipeline.

class Dropna():

    def fit(self, X, y):
        return self

    def fit_transform(self, X, y):
        
        mask = (np.isnan(X).any(-1) | np.isnan(y))
        if hasattr(X, 'loc'):
            X = X.loc[~mask]
        else:
            X = X[~mask]
        if hasattr(y, 'loc'):
            y = y.loc[~mask]
        else:
            y = y[~mask]
        
        return X, y   ###### make fit_transform return X and y
    
    def transform(self, X):
        
        mask = np.isnan(X).any(-1)
        if hasattr(X, 'loc'):
            X = X.loc[~mask]
        else:
            X = X[~mask]
        
        return X

We only have to modify the original sklearn Pipeline in only two specific points in fit and in _fit method. The rest remains unchanged.

from sklearn import pipeline
from sklearn.base import clone
from sklearn.utils import _print_elapsed_time
from sklearn.utils.validation import check_memory

class Pipeline(pipeline.Pipeline):

    def _fit(self, X, y=None, **fit_params_steps):
        self.steps = list(self.steps)
        self._validate_steps()
        memory = check_memory(self.memory)

        fit_transform_one_cached = memory.cache(pipeline._fit_transform_one)

        for (step_idx, name, transformer) in self._iter(
            with_final=False, filter_passthrough=False
        ):
                        
            if transformer is None or transformer == "passthrough":
                with _print_elapsed_time("Pipeline", self._log_message(step_idx)):
                    continue

            try:
                # joblib >= 0.12
                mem = memory.location
            except AttributeError:
                mem = memory.cachedir
            finally:
                cloned_transformer = clone(transformer) if mem else transformer

            X, fitted_transformer = fit_transform_one_cached(
                cloned_transformer,
                X,
                y,
                None,
                message_clsname="Pipeline",
                message=self._log_message(step_idx),
                **fit_params_steps[name],
            )
            
            if isinstance(X, tuple):    ###### unpack X if is tuple: X = (X,y)
                X, y = X
            
            self.steps[step_idx] = (name, fitted_transformer)
        
        return X, y
    
    def fit(self, X, y=None, **fit_params):
        fit_params_steps = self._check_fit_params(**fit_params)
        Xt = self._fit(X, y, **fit_params_steps)
        
        if isinstance(Xt, tuple):    ###### unpack X if is tuple: X = (X,y)
            Xt, y = Xt 
        
        with _print_elapsed_time("Pipeline", self._log_message(len(self.steps) - 1)):
            if self._final_estimator != "passthrough":
                fit_params_last_step = fit_params_steps[self.steps[-1][0]]
                self._final_estimator.fit(Xt, y, **fit_params_last_step)

        return self

This is required in order to unpack the values generated by Dropna().fit_transform(X, y) in the new X and y.

Here is the full pipeline at work:

from sklearn.linear_model import Ridge

X = np.random.uniform(0,1, (100,3))
y = np.random.uniform(0,1, (100,))
X[np.random.uniform(0,1, (100)) < 0.1] = np.nan
y[np.random.uniform(0,1, (100)) < 0.1] = np.nan

pipe = Pipeline([('dropna', Dropna()), ('model', Ridge())])
pipe.fit(X, y)

pipe.predict(X).shape

Another trial with a further intermediate preprocessing step:

from sklearn.preprocessing import StandardScaler

pipe = Pipeline([('dropna', Dropna()), ('scaler', StandardScaler()), ('model', Ridge())])
pipe.fit(X, y)

pipe.predict(X).shape

More complex behaviors can be achieved with other simple modifications according to the needs. If you are interested also in Pipeline().fit_transform or Pipeline().fit_predict you need to operate the same changes.




ANSWER 3

Score 8


The package imblearn, which is built on top of sklearn, contains an estimator FunctionSampler that allows manipulating both the features array, X, and target array, y, in a pipeline step.

Note that using it in a pipeline step requires using the Pipeline class in imblearn that inherits from the one in sklearn. Furthermore, by default, in the context of Pipeline, the method resample does nothing when it is not called immediately after fit (as in fit_resample). So, read the documentation ahead of time.




ANSWER 4

Score 1


You can solve this easily by using the sklearn.preprocessing.FunctionTransformer method (http://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer.html)

You just need to put your alternations to X in a function

def drop_nans(X, y=None):
    total = X.shape[1]                                           
    new_thresh = total - thresh
    df = pd.DataFrame(X)
    df.dropna(thresh=new_thresh, inplace=True)
    return df.values

then you get your transformer by calling

transformer = FunctionTransformer(drop_nans, validate=False)

which you can use in the pipeline. The threshold can be set outside the drop_nans function.