Module recsys.dataset_loader

Expand source code Browse git
import sys
from collections import defaultdict
from typing import List, Tuple

import numpy as np
import pandas as pd
import torch
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader

if 'ipykernel' in sys.modules:
    from tqdm.notebook import tqdm
else:
    from tqdm import tqdm

from recsys.config import DEVICE
from recsys.types import BatchType


class BookingDataset(Dataset):

    def __init__(self,
                 df: pd.DataFrame,
                 features: List[str],
                 group_var='utrip_id'):
        sorted_groups = sorted(df.groupby(group_var), key=lambda g: len(g[1]), reverse=True)
        self.trips = [BookingDataset.pre_process(group, features) for _, group in tqdm(sorted_groups)]
        self.utrip_ids = [utrip_id for utrip_id, _ in sorted_groups]
        self.group_lengths = [len(g[1]) for g in sorted_groups]

    def __len__(self):
        return len(self.trips)

    def __getitem__(self, idx):
        return self.trips[idx]

    def get_ids(self):
        return pd.DataFrame({'utrip_id': self.utrip_ids,
                             'group_length': self.group_lengths})

    @staticmethod
    def pre_process(group: pd.DataFrame, features: List[str]):
        g = group[features].to_dict(orient='list')
        return {k: torch.LongTensor(np.array(v)) for k, v in g.items()}


def pad_collate(batch: List[BatchType]):
    """
    Unify observations in a padded batch dictionary.
    """
    batch_dict = defaultdict(list)
    lengths = []
    for d in batch:
        for k, v in d.items():
            batch_dict[k].append(v)
        # add the next city id if we are training
        if 'next_city_id' in d:
            batch_dict['last_city'].append(d['next_city_id'][-1])
        lengths.append(v.size())

    res = {k: pad_sequence(v, batch_first=True, padding_value=0)
           for k, v in batch_dict.items() if k != 'last_city'}

    # add last city id if we are training
    if 'next_city_id' in d:
        res['last_city'] = torch.tensor(batch_dict['last_city'])

    lengths = torch.tensor(lengths, dtype=torch.int64).squeeze()
    return res, lengths


def get_dataset_and_dataloader(df: pd.DataFrame,
                               features: List[str],
                               batch_size: int = 256) -> Tuple[BookingDataset, DataLoader]:
    """
    Get dataset and dataloader.
    """
    dataset = BookingDataset(df, features)
    data_loader = DataLoader(dataset,
                             batch_size=batch_size,
                             shuffle=False,
                             collate_fn=pad_collate)
    return dataset, data_loader


def batches_to_device(data_loader: DataLoader) -> np.array:
    """
    Batches to device.
    By pre-loading all batches in GPU for training, we avoid transferring data
    from memory to GPU on every fold. The risk of doing this is biasing the gradients,
    reason why we are then careful with the distribution of batches on each fold,
    also shuffling the batches every time we train a model.
    """
    if DEVICE == 'cpu':
        batches = np.array([({k: v for k, v in d.items()}, seq_len)
                            for (d, seq_len) in data_loader])
    else:
        batches = np.array([({k: v.cuda(non_blocking=True)
                              for k, v in d.items()}, seq_len) for (d, seq_len) in data_loader])

    return batches


def filter_batches_by_length(batches: List[BatchType], min_length: int = 3):
    """
    Filter batches to have a minimum length of `min_length`.
    """
    return list(filter(lambda b: b[1].min().item() > min_length, batches))

Functions

def batches_to_device(data_loader: torch.utils.data.dataloader.DataLoader) ‑> 

Batches to device. By pre-loading all batches in GPU for training, we avoid transferring data from memory to GPU on every fold. The risk of doing this is biasing the gradients, reason why we are then careful with the distribution of batches on each fold, also shuffling the batches every time we train a model.

Expand source code Browse git
def batches_to_device(data_loader: DataLoader) -> np.array:
    """
    Batches to device.
    By pre-loading all batches in GPU for training, we avoid transferring data
    from memory to GPU on every fold. The risk of doing this is biasing the gradients,
    reason why we are then careful with the distribution of batches on each fold,
    also shuffling the batches every time we train a model.
    """
    if DEVICE == 'cpu':
        batches = np.array([({k: v for k, v in d.items()}, seq_len)
                            for (d, seq_len) in data_loader])
    else:
        batches = np.array([({k: v.cuda(non_blocking=True)
                              for k, v in d.items()}, seq_len) for (d, seq_len) in data_loader])

    return batches
def filter_batches_by_length(batches: List[Dict[str, torch.Tensor]], min_length: int = 3)

Filter batches to have a minimum length of min_length.

Expand source code Browse git
def filter_batches_by_length(batches: List[BatchType], min_length: int = 3):
    """
    Filter batches to have a minimum length of `min_length`.
    """
    return list(filter(lambda b: b[1].min().item() > min_length, batches))
def get_dataset_and_dataloader(df: pandas.core.frame.DataFrame, features: List[str], batch_size: int = 256) ‑> Tuple[BookingDataset, torch.utils.data.dataloader.DataLoader]

Get dataset and dataloader.

Expand source code Browse git
def get_dataset_and_dataloader(df: pd.DataFrame,
                               features: List[str],
                               batch_size: int = 256) -> Tuple[BookingDataset, DataLoader]:
    """
    Get dataset and dataloader.
    """
    dataset = BookingDataset(df, features)
    data_loader = DataLoader(dataset,
                             batch_size=batch_size,
                             shuffle=False,
                             collate_fn=pad_collate)
    return dataset, data_loader
def pad_collate(batch: List[Dict[str, torch.Tensor]])

Unify observations in a padded batch dictionary.

Expand source code Browse git
def pad_collate(batch: List[BatchType]):
    """
    Unify observations in a padded batch dictionary.
    """
    batch_dict = defaultdict(list)
    lengths = []
    for d in batch:
        for k, v in d.items():
            batch_dict[k].append(v)
        # add the next city id if we are training
        if 'next_city_id' in d:
            batch_dict['last_city'].append(d['next_city_id'][-1])
        lengths.append(v.size())

    res = {k: pad_sequence(v, batch_first=True, padding_value=0)
           for k, v in batch_dict.items() if k != 'last_city'}

    # add last city id if we are training
    if 'next_city_id' in d:
        res['last_city'] = torch.tensor(batch_dict['last_city'])

    lengths = torch.tensor(lengths, dtype=torch.int64).squeeze()
    return res, lengths

Classes

class BookingDataset (df: pandas.core.frame.DataFrame, features: List[str], group_var='utrip_id')

An abstract class representing a :class:Dataset.

All datasets that represent a map from keys to data samples should subclass it. All subclasses should overwrite :meth:__getitem__, supporting fetching a data sample for a given key. Subclasses could also optionally overwrite :meth:__len__, which is expected to return the size of the dataset by many :class:~torch.utils.data.Sampler implementations and the default options of :class:~torch.utils.data.DataLoader.

Note

:class:~torch.utils.data.DataLoader by default constructs a index sampler that yields integral indices. To make it work with a map-style dataset with non-integral indices/keys, a custom sampler must be provided.

Expand source code Browse git
class BookingDataset(Dataset):

    def __init__(self,
                 df: pd.DataFrame,
                 features: List[str],
                 group_var='utrip_id'):
        sorted_groups = sorted(df.groupby(group_var), key=lambda g: len(g[1]), reverse=True)
        self.trips = [BookingDataset.pre_process(group, features) for _, group in tqdm(sorted_groups)]
        self.utrip_ids = [utrip_id for utrip_id, _ in sorted_groups]
        self.group_lengths = [len(g[1]) for g in sorted_groups]

    def __len__(self):
        return len(self.trips)

    def __getitem__(self, idx):
        return self.trips[idx]

    def get_ids(self):
        return pd.DataFrame({'utrip_id': self.utrip_ids,
                             'group_length': self.group_lengths})

    @staticmethod
    def pre_process(group: pd.DataFrame, features: List[str]):
        g = group[features].to_dict(orient='list')
        return {k: torch.LongTensor(np.array(v)) for k, v in g.items()}

Ancestors

  • torch.utils.data.dataset.Dataset
  • typing.Generic

Static methods

def pre_process(group: pandas.core.frame.DataFrame, features: List[str])
Expand source code Browse git
@staticmethod
def pre_process(group: pd.DataFrame, features: List[str]):
    g = group[features].to_dict(orient='list')
    return {k: torch.LongTensor(np.array(v)) for k, v in g.items()}

Methods

def get_ids(self)
Expand source code Browse git
def get_ids(self):
    return pd.DataFrame({'utrip_id': self.utrip_ids,
                         'group_length': self.group_lengths})