Module recsys.utils

Expand source code Browse git
import functools
import gc
import json
import logging
import os
import subprocess
import sys
from datetime import datetime
from os import listdir
from os.path import isfile
from typing import List, Dict

import GPUtil as GPU
import humanize
import pandas as pd
import psutil
import torch
from torch.utils.data import DataLoader

if 'ipykernel' in sys.modules:
    from tqdm.notebook import tqdm
else:
    from tqdm import tqdm

from recsys.dataset_loader import BookingDataset, get_dataset_and_dataloader
from recsys.encoders import DatasetEncoder
from recsys.model import get_model_predictions, BookingNet
from recsys.paths import get_resources_path, get_path, get_model_ckpt_paths, get_model_arch_path
from recsys import config


def print_gpu_usage(gpu_id: int = 0):
    """
    Display GPU usage.
    """
    gpu_list = GPU.getGPUs()
    gpu = gpu_list[gpu_id]
    process = psutil.Process(os.getpid())
    logging.info(f"Gen RAM Free: {humanize.naturalsize(psutil.virtual_memory().available)}"
                 f" | Proc size: {humanize.naturalsize(process.memory_info().rss)}")
    logging.info("GPU RAM Free: {0:.0f}MB | Used: {1:.0f}MB | Util {2:3.0f}% | Total {3:.0f}MB".format(gpu.memoryFree,
                                                                                                       gpu.memoryUsed,
                                                                                                       gpu.memoryUtil * 100,
                                                                                                       gpu.memoryTotal))


def accuracy_at_k(submission: pd.DataFrame,
                  ground_truth: pd.DataFrame) -> Dict:
    """
    Calculates accuracy@k for k in {1, 4, 10} by group length and overall.
    """
    data_to_eval = submission.join(ground_truth, on='utrip_id')

    for k in [1, 4, 10]:
        data_to_eval[f'hits_at_{k}'] = data_to_eval.apply(
            lambda row: row['city_id'] in row[[f'city_id_{i}' for i in range(1, k + 1)]].values, axis=1)
    return {
        'accuracy@1': data_to_eval['hits_at_1'].mean(),
        'accuracy@4': data_to_eval['hits_at_4'].mean(),
        'accuracy@10': data_to_eval['hits_at_10'].mean(),
        'accuracy@4_by_pos': data_to_eval.groupby('group_length')['hits_at_4'].mean().to_dict()
    }


def get_submission(dataset: BookingDataset,
                   data_loader: DataLoader,
                   model: BookingNet,
                   checkpoint_path_list: List[str],
                   dataset_encoder: DatasetEncoder) -> pd.DataFrame:
    """
    Get submission from dataset.
    """
    assert len(checkpoint_path_list) > 0

    ensemble_batch_probs = None
    for checkpoint_path in tqdm(checkpoint_path_list):
        batch_probs_generator = get_model_predictions(model,
                                                      data_loader,
                                                      checkpoint_path)
        if ensemble_batch_probs is None:
            ensemble_batch_probs = list(batch_probs_generator)
        else:
            for i, batch_probs in enumerate(batch_probs_generator):
                ensemble_batch_probs[i] += batch_probs

    top_cities = torch.cat(
        [torch.topk(batch_submission, 10, dim=1).indices + 1
         for batch_submission in ensemble_batch_probs],
        axis=0
    )
    del ensemble_batch_probs
    cities_prediction = pd.DataFrame(top_cities.numpy(),
                                     columns=[f'city_id_{i}' for i in range(1, 11)])
    del top_cities
    gc.collect()

    for city_id in range(1, 11):
        cities_prediction[f'city_id_{city_id}'] = dataset_encoder.label_encoders['city_id'].inverse_transform(
            cities_prediction[f'city_id_{city_id}'] - 1).astype(int)

    submission = pd.concat([dataset.get_ids(), cities_prediction], axis=1)
    return submission


def get_ground_truth_from_dataset(df: pd.DataFrame,
                                  booking_dataset: BookingDataset,
                                  dataset_encoder: DatasetEncoder) -> pd.DataFrame:
    """
    Get ground truth from dataset. Assumes the df is sorted by checkin ASC.
    """
    ground_truth = df.groupby('utrip_id').tail(1)[['utrip_id', 'next_city_id']].set_index('utrip_id')
    ground_truth['city_id'] = (dataset_encoder
                               .label_encoders['city_id']
                               .inverse_transform(ground_truth['next_city_id'] - 1))
    if not ground_truth['city_id'].isnull().values.any():
        ground_truth['city_id'] = ground_truth['city_id'].astype(int)
    else:
        logging.warning("Warning: next_city_id has nulls")

    ground_truth = ground_truth.loc[booking_dataset.utrip_ids]  # reorder obs like batches
    ground_truth.drop(columns="next_city_id", inplace=True)
    return ground_truth


def get_count_distribution(df: pd.DataFrame,
                           by: str = 'utrip_id') -> pd.DataFrame:
    """
    Get count distribution from dataset.
    """
    df_dist = df.groupby(by)[by].count().value_counts(sort=True)
    df_dist /= df_dist.sum()
    return df_dist


def get_distribution_by_pos(**kwargs) -> pd.DataFrame:
    """
    Get distribution by pos from a list of key: dataframe pairs.
    """
    return functools.reduce(lambda a, b: a.join(b),
                            [get_count_distribution(df).to_frame(name)
                             for name, df in kwargs.items()]).sort_index()


def check_device() -> None:
    """
    Check if we are using GPU acceleration and warn the user.
    """
    if config.DEVICE != 'cuda':
        logging.warning('You are not using a GPU. If you are using colab, go to Runtime -> Change runtime type')
    else:
        current_gpu = subprocess.check_output(['nvidia-smi', '-L']).strip().decode('ascii')
        logging.info(f"Using {current_gpu}")


def get_trained_models() -> Dict:
    """
    Get dictionary of all models trained
    """
    base_path = get_path("architectures")
    model_paths = [f"{base_path}/{f}" for f in listdir(base_path) if isfile(f"{base_path}/{f}")]

    d = {}

    for path in model_paths:
        with open(path) as f:
            model_hash = path[-13:-5]
            d[model_hash] = json.load(f)
    return d


def get_final_submission(submission_set: pd.DataFrame,
                         model_hash: str,
                         dataset_encoder: DatasetEncoder) -> None:
    """
    Get final submission from model hash.
    """
    # create final submission
    dataset_submission, data_loader_submission = get_dataset_and_dataloader(
        df=submission_set,
        features=config.FEATURES_EMBEDDING
    )

    # get model parameters from hash
    with open(get_model_arch_path(model_hash)) as fhandle:
        model_parameters = json.load(fhandle)
    ckpt_list = get_model_ckpt_paths(model_hash=model_hash,
                                     checkpoint_type='accuracy_at_k')

    # load model and get predictions
    model = BookingNet(**model_parameters).to(config.DEVICE)
    predictions = get_submission(dataset_submission,
                                 data_loader_submission,
                                 model,
                                 ckpt_list,
                                 dataset_encoder)

    # build final csv and run sanity checks
    timestamp = datetime.now().strftime("%d_%m_%Y_%Hh_%Mm_%Ss")
    cols = ["utrip_id", "city_id_1", "city_id_2", "city_id_3", "city_id_4"]
    filename = f'submission_{model_hash}_{timestamp}'
    final_submission = predictions[cols]
    final_submission.to_csv(get_path(dirs="submissions",
                                     filename=filename,
                                     format='csv'),
                            index=False)
    submission_sanity_checks(final_submission)


def submission_sanity_checks(submission: pd.DataFrame) -> None:
    """
    Run submission sanity checks to make sure our dataframe is healthy.
    """
    _TOTAL_SUBMISSION_ROWS = 70662
    df = pd.read_csv(get_resources_path('booking_test_set.csv'),
                     dtype={'user_id': 'int32', 'city_id': 'int32'},
                     parse_dates=['checkin', 'checkout'])

    utrip_ids = set(df.utrip_id.unique())
    assert len(set(submission.utrip_id.unique()).intersection(utrip_ids)) == _TOTAL_SUBMISSION_ROWS
    assert submission.shape == (_TOTAL_SUBMISSION_ROWS, 5)
    assert submission.notna().values.all()

    df = pd.read_csv(get_resources_path('booking_train_set.csv'),
                     dtype={'user_id': 'int32', 'city_id': 'int32'},
                     parse_dates=['checkin', 'checkout'])

    # verify city ids
    city_ids = set(df.city_id.unique().astype(int))
    assert len(set(submission.city_id_1.unique()).difference(city_ids)) == 0
    assert len(set(submission.city_id_2.unique()).difference(city_ids)) == 0
    assert len(set(submission.city_id_3.unique()).difference(city_ids)) == 0
    assert len(set(submission.city_id_4.unique()).difference(city_ids)) == 0
    logging.info("Passed all sanity checks!")

Functions

def accuracy_at_k(submission: pandas.core.frame.DataFrame, ground_truth: pandas.core.frame.DataFrame) ‑> Dict

Calculates accuracy@k for k in {1, 4, 10} by group length and overall.

Expand source code Browse git
def accuracy_at_k(submission: pd.DataFrame,
                  ground_truth: pd.DataFrame) -> Dict:
    """
    Calculates accuracy@k for k in {1, 4, 10} by group length and overall.
    """
    data_to_eval = submission.join(ground_truth, on='utrip_id')

    for k in [1, 4, 10]:
        data_to_eval[f'hits_at_{k}'] = data_to_eval.apply(
            lambda row: row['city_id'] in row[[f'city_id_{i}' for i in range(1, k + 1)]].values, axis=1)
    return {
        'accuracy@1': data_to_eval['hits_at_1'].mean(),
        'accuracy@4': data_to_eval['hits_at_4'].mean(),
        'accuracy@10': data_to_eval['hits_at_10'].mean(),
        'accuracy@4_by_pos': data_to_eval.groupby('group_length')['hits_at_4'].mean().to_dict()
    }
def check_device() ‑> NoneType

Check if we are using GPU acceleration and warn the user.

Expand source code Browse git
def check_device() -> None:
    """
    Check if we are using GPU acceleration and warn the user.
    """
    if config.DEVICE != 'cuda':
        logging.warning('You are not using a GPU. If you are using colab, go to Runtime -> Change runtime type')
    else:
        current_gpu = subprocess.check_output(['nvidia-smi', '-L']).strip().decode('ascii')
        logging.info(f"Using {current_gpu}")
def get_count_distribution(df: pandas.core.frame.DataFrame, by: str = 'utrip_id') ‑> pandas.core.frame.DataFrame

Get count distribution from dataset.

Expand source code Browse git
def get_count_distribution(df: pd.DataFrame,
                           by: str = 'utrip_id') -> pd.DataFrame:
    """
    Get count distribution from dataset.
    """
    df_dist = df.groupby(by)[by].count().value_counts(sort=True)
    df_dist /= df_dist.sum()
    return df_dist
def get_distribution_by_pos(**kwargs) ‑> pandas.core.frame.DataFrame

Get distribution by pos from a list of key: dataframe pairs.

Expand source code Browse git
def get_distribution_by_pos(**kwargs) -> pd.DataFrame:
    """
    Get distribution by pos from a list of key: dataframe pairs.
    """
    return functools.reduce(lambda a, b: a.join(b),
                            [get_count_distribution(df).to_frame(name)
                             for name, df in kwargs.items()]).sort_index()
def get_final_submission(submission_set: pandas.core.frame.DataFrame, model_hash: str, dataset_encoder: DatasetEncoder) ‑> NoneType

Get final submission from model hash.

Expand source code Browse git
def get_final_submission(submission_set: pd.DataFrame,
                         model_hash: str,
                         dataset_encoder: DatasetEncoder) -> None:
    """
    Get final submission from model hash.
    """
    # create final submission
    dataset_submission, data_loader_submission = get_dataset_and_dataloader(
        df=submission_set,
        features=config.FEATURES_EMBEDDING
    )

    # get model parameters from hash
    with open(get_model_arch_path(model_hash)) as fhandle:
        model_parameters = json.load(fhandle)
    ckpt_list = get_model_ckpt_paths(model_hash=model_hash,
                                     checkpoint_type='accuracy_at_k')

    # load model and get predictions
    model = BookingNet(**model_parameters).to(config.DEVICE)
    predictions = get_submission(dataset_submission,
                                 data_loader_submission,
                                 model,
                                 ckpt_list,
                                 dataset_encoder)

    # build final csv and run sanity checks
    timestamp = datetime.now().strftime("%d_%m_%Y_%Hh_%Mm_%Ss")
    cols = ["utrip_id", "city_id_1", "city_id_2", "city_id_3", "city_id_4"]
    filename = f'submission_{model_hash}_{timestamp}'
    final_submission = predictions[cols]
    final_submission.to_csv(get_path(dirs="submissions",
                                     filename=filename,
                                     format='csv'),
                            index=False)
    submission_sanity_checks(final_submission)
def get_ground_truth_from_dataset(df: pandas.core.frame.DataFrame, booking_dataset: BookingDataset, dataset_encoder: DatasetEncoder) ‑> pandas.core.frame.DataFrame

Get ground truth from dataset. Assumes the df is sorted by checkin ASC.

Expand source code Browse git
def get_ground_truth_from_dataset(df: pd.DataFrame,
                                  booking_dataset: BookingDataset,
                                  dataset_encoder: DatasetEncoder) -> pd.DataFrame:
    """
    Get ground truth from dataset. Assumes the df is sorted by checkin ASC.
    """
    ground_truth = df.groupby('utrip_id').tail(1)[['utrip_id', 'next_city_id']].set_index('utrip_id')
    ground_truth['city_id'] = (dataset_encoder
                               .label_encoders['city_id']
                               .inverse_transform(ground_truth['next_city_id'] - 1))
    if not ground_truth['city_id'].isnull().values.any():
        ground_truth['city_id'] = ground_truth['city_id'].astype(int)
    else:
        logging.warning("Warning: next_city_id has nulls")

    ground_truth = ground_truth.loc[booking_dataset.utrip_ids]  # reorder obs like batches
    ground_truth.drop(columns="next_city_id", inplace=True)
    return ground_truth
def get_submission(dataset: BookingDataset, data_loader: torch.utils.data.dataloader.DataLoader, model: BookingNet, checkpoint_path_list: List[str], dataset_encoder: DatasetEncoder) ‑> pandas.core.frame.DataFrame

Get submission from dataset.

Expand source code Browse git
def get_submission(dataset: BookingDataset,
                   data_loader: DataLoader,
                   model: BookingNet,
                   checkpoint_path_list: List[str],
                   dataset_encoder: DatasetEncoder) -> pd.DataFrame:
    """
    Get submission from dataset.
    """
    assert len(checkpoint_path_list) > 0

    ensemble_batch_probs = None
    for checkpoint_path in tqdm(checkpoint_path_list):
        batch_probs_generator = get_model_predictions(model,
                                                      data_loader,
                                                      checkpoint_path)
        if ensemble_batch_probs is None:
            ensemble_batch_probs = list(batch_probs_generator)
        else:
            for i, batch_probs in enumerate(batch_probs_generator):
                ensemble_batch_probs[i] += batch_probs

    top_cities = torch.cat(
        [torch.topk(batch_submission, 10, dim=1).indices + 1
         for batch_submission in ensemble_batch_probs],
        axis=0
    )
    del ensemble_batch_probs
    cities_prediction = pd.DataFrame(top_cities.numpy(),
                                     columns=[f'city_id_{i}' for i in range(1, 11)])
    del top_cities
    gc.collect()

    for city_id in range(1, 11):
        cities_prediction[f'city_id_{city_id}'] = dataset_encoder.label_encoders['city_id'].inverse_transform(
            cities_prediction[f'city_id_{city_id}'] - 1).astype(int)

    submission = pd.concat([dataset.get_ids(), cities_prediction], axis=1)
    return submission
def get_trained_models() ‑> Dict

Get dictionary of all models trained

Expand source code Browse git
def get_trained_models() -> Dict:
    """
    Get dictionary of all models trained
    """
    base_path = get_path("architectures")
    model_paths = [f"{base_path}/{f}" for f in listdir(base_path) if isfile(f"{base_path}/{f}")]

    d = {}

    for path in model_paths:
        with open(path) as f:
            model_hash = path[-13:-5]
            d[model_hash] = json.load(f)
    return d
def print_gpu_usage(gpu_id: int = 0)

Display GPU usage.

Expand source code Browse git
def print_gpu_usage(gpu_id: int = 0):
    """
    Display GPU usage.
    """
    gpu_list = GPU.getGPUs()
    gpu = gpu_list[gpu_id]
    process = psutil.Process(os.getpid())
    logging.info(f"Gen RAM Free: {humanize.naturalsize(psutil.virtual_memory().available)}"
                 f" | Proc size: {humanize.naturalsize(process.memory_info().rss)}")
    logging.info("GPU RAM Free: {0:.0f}MB | Used: {1:.0f}MB | Util {2:3.0f}% | Total {3:.0f}MB".format(gpu.memoryFree,
                                                                                                       gpu.memoryUsed,
                                                                                                       gpu.memoryUtil * 100,
                                                                                                       gpu.memoryTotal))
def submission_sanity_checks(submission: pandas.core.frame.DataFrame) ‑> NoneType

Run submission sanity checks to make sure our dataframe is healthy.

Expand source code Browse git
def submission_sanity_checks(submission: pd.DataFrame) -> None:
    """
    Run submission sanity checks to make sure our dataframe is healthy.
    """
    _TOTAL_SUBMISSION_ROWS = 70662
    df = pd.read_csv(get_resources_path('booking_test_set.csv'),
                     dtype={'user_id': 'int32', 'city_id': 'int32'},
                     parse_dates=['checkin', 'checkout'])

    utrip_ids = set(df.utrip_id.unique())
    assert len(set(submission.utrip_id.unique()).intersection(utrip_ids)) == _TOTAL_SUBMISSION_ROWS
    assert submission.shape == (_TOTAL_SUBMISSION_ROWS, 5)
    assert submission.notna().values.all()

    df = pd.read_csv(get_resources_path('booking_train_set.csv'),
                     dtype={'user_id': 'int32', 'city_id': 'int32'},
                     parse_dates=['checkin', 'checkout'])

    # verify city ids
    city_ids = set(df.city_id.unique().astype(int))
    assert len(set(submission.city_id_1.unique()).difference(city_ids)) == 0
    assert len(set(submission.city_id_2.unique()).difference(city_ids)) == 0
    assert len(set(submission.city_id_3.unique()).difference(city_ids)) == 0
    assert len(set(submission.city_id_4.unique()).difference(city_ids)) == 0
    logging.info("Passed all sanity checks!")