Source code for rankfm.rankfm

"""
rankfm main modeling class
"""

import numpy as np
import pandas as pd

from rankfm._rankfm import _fit, _predict, _recommend
from rankfm.utils import get_data

[docs]class RankFM(): """Factorization Machines for Ranking Problems with Implicit Feedback Data"""
[docs] def __init__(self, factors=10, loss='bpr', max_samples=10, alpha=0.01, beta=0.1, sigma=0.1, learning_rate=0.1, learning_schedule='constant', learning_exponent=0.25): """store hyperparameters and initialize internal model state :param factors: latent factor rank :param loss: optimization/loss function to use for training: ['bpr', 'warp'] :param max_samples: maximum number of negative samples to draw for WARP loss :param alpha: L2 regularization penalty on [user, item] model weights :param beta: L2 regularization penalty on [user-feature, item-feature] model weights :param sigma: standard deviation to use for random initialization of factor weights :param learning_rate: initial learning rate for gradient step updates :param learning_schedule: schedule for adjusting learning rates by training epoch: ['constant', 'invscaling'] :param learning_exponent: exponent applied to epoch number to adjust learning rate: scaling = 1 / pow(epoch + 1, learning_exponent) :return: None """ # validate user input assert isinstance(factors, int) and factors >= 1, "[factors] must be a positive integer" assert isinstance(loss, str) and loss in ('bpr', 'warp'), "[loss] must be in ('bpr', 'warp')" assert isinstance(max_samples, int) and max_samples > 0, "[max_samples] must be a positive integer" assert isinstance(alpha, float) and alpha > 0.0, "[alpha] must be a positive float" assert isinstance(beta, float) and beta > 0.0, "[beta] must be a positive float" assert isinstance(sigma, float) and sigma > 0.0, "[sigma] must be a positive float" assert isinstance(learning_rate, float) and learning_rate > 0.0, "[learning_rate] must be a positive float" assert isinstance(learning_schedule, str) and learning_schedule in ('constant', 'invscaling'), "[learning_schedule] must be in ('constant', 'invscaling')" assert isinstance(learning_exponent, float) and learning_exponent > 0.0, "[learning_exponent] must be a positive float" # store model hyperparameters self.factors = factors self.loss = loss self.max_samples = max_samples self.alpha = alpha self.beta = beta self.sigma = sigma self.learning_rate = learning_rate self.learning_schedule = learning_schedule self.learning_exponent = learning_exponent # set/clear initial model state self._reset_state()
# -------------------------------- # begin private method definitions # -------------------------------- def _reset_state(self): """initialize or reset internal model state""" # [ID, IDX] arrays self.user_id = None self.item_id = None self.user_idx = None self.item_idx = None # [ID <-> IDX] mappings self.index_to_user = None self.index_to_item = None self.user_to_index = None self.item_to_index = None # user/item interactions and importance weights self.interactions = None self.sample_weight = None # set of observed items for each user self.user_items = None # [user, item] features self.x_uf = None self.x_if = None # [item, item-feature] scalar weights self.w_i = None self.w_if = None # [user, item, user-feature, item-feature] latent factors self.v_u = None self.v_i = None self.v_uf = None self.v_if = None # internal model state indicator self.is_fit = False def _init_all(self, interactions, user_features=None, item_features=None, sample_weight=None): """index the interaction data and user/item features and initialize model weights :param interactions: dataframe of observed user/item interactions: [user_id, item_id] :param user_features: dataframe of user metadata features: [user_id, uf_1, ..., uf_n] :param item_features: dataframe of item metadata features: [item_id, if_1, ..., if_n] :param sample_weight: vector of importance weights for each observed interaction :return: None """ assert isinstance(interactions, (np.ndarray, pd.DataFrame)), "[interactions] must be np.ndarray or pd.dataframe" assert interactions.shape[1] == 2, "[interactions] should be: [user_id, item_id]" # save unique arrays of users/items in terms of original identifiers interactions_df = pd.DataFrame(get_data(interactions), columns=['user_id', 'item_id']) self.user_id = pd.Series(np.sort(np.unique(interactions_df['user_id']))) self.item_id = pd.Series(np.sort(np.unique(interactions_df['item_id']))) # create zero-based index to identifier mappings self.index_to_user = self.user_id self.index_to_item = self.item_id # create reverse mappings from identifiers to zero-based index positions self.user_to_index = pd.Series(data=self.index_to_user.index, index=self.index_to_user.values) self.item_to_index = pd.Series(data=self.index_to_item.index, index=self.index_to_item.values) # store unique values of user/item indexes and observed interactions for each user self.user_idx = np.arange(len(self.user_id), dtype=np.int32) self.item_idx = np.arange(len(self.item_id), dtype=np.int32) # map the interactions to internal index positions self._init_interactions(interactions, sample_weight) # map the user/item features to internal index positions self._init_features(user_features, item_features) # initialize the model weights after the user/item/feature dimensions have been established self._init_weights(user_features, item_features) def _init_interactions(self, interactions, sample_weight): """map new interaction data to existing internal user/item indexes :param interactions: dataframe of observed user/item interactions: [user_id, item_id] :param sample_weight: vector of importance weights for each observed interaction :return: None """ assert isinstance(interactions, (np.ndarray, pd.DataFrame)), "[interactions] must be np.ndarray or pd.dataframe" assert interactions.shape[1] == 2, "[interactions] should be: [user_id, item_id]" # map the raw user/item identifiers to internal zero-based index positions # NOTE: any user/item pairs not found in the existing indexes will be dropped self.interactions = pd.DataFrame(get_data(interactions).copy(), columns=['user_id', 'item_id']) self.interactions['user_id'] = self.interactions['user_id'].map(self.user_to_index).astype(np.int32) self.interactions['item_id'] = self.interactions['item_id'].map(self.item_to_index).astype(np.int32) self.interactions = self.interactions.rename({'user_id': 'user_idx', 'item_id': 'item_idx'}, axis=1).dropna() # store the sample weights internally or generate a vector of ones if not given if sample_weight is not None: assert isinstance(sample_weight, (np.ndarray, pd.Series)), "[sample_weight] must be np.ndarray or pd.series" assert sample_weight.ndim == 1, "[sample_weight] must a vector (ndim=1)" assert len(sample_weight) == len(interactions), "[sample_weight] must have the same length as [interactions]" self.sample_weight = np.ascontiguousarray(get_data(sample_weight), dtype=np.float32) else: self.sample_weight = np.ones(len(self.interactions), dtype=np.float32) # create a dictionary containing the set of observed items for each user # NOTE: if the model has been previously fit extend rather than replace the itemset for each user if self.is_fit: new_user_items = self.interactions.groupby('user_idx')['item_idx'].apply(set).to_dict() self.user_items = {user: np.sort(np.array(list(set(self.user_items[user]) | set(new_user_items[user])), dtype=np.int32)) for user in self.user_items.keys()} else: self.user_items = self.interactions.sort_values(['user_idx', 'item_idx']).groupby('user_idx')['item_idx'].apply(np.array, dtype=np.int32).to_dict() # format the interactions data as a c-contiguous integer array for cython use self.interactions = np.ascontiguousarray(self.interactions, dtype=np.int32) def _init_features(self, user_features=None, item_features=None): """initialize the user/item features given existing internal user/item indexes :param user_features: dataframe of user metadata features: [user_id, uf_1, ... , uf_n] :param item_features: dataframe of item metadata features: [item_id, if_1, ... , if_n] :return: None """ # store the user features as a ndarray [UxP] row-ordered by user index position if user_features is not None: x_uf = pd.DataFrame(user_features.copy()) x_uf = x_uf.set_index(x_uf.columns[0]) x_uf.index = x_uf.index.map(self.user_to_index) if np.array_equal(sorted(x_uf.index.values), self.user_idx): self.x_uf = np.ascontiguousarray(x_uf.sort_index(), dtype=np.float32) else: raise KeyError('the users in [user_features] do not match the users in [interactions]') else: self.x_uf = np.zeros([len(self.user_idx), 1], dtype=np.float32) # store the item features as a ndarray [IxQ] row-ordered by item index position if item_features is not None: x_if = pd.DataFrame(item_features.copy()) x_if = x_if.set_index(x_if.columns[0]) x_if.index = x_if.index.map(self.item_to_index) if np.array_equal(sorted(x_if.index.values), self.item_idx): self.x_if = np.ascontiguousarray(x_if.sort_index(), dtype=np.float32) else: raise KeyError('the items in [item_features] do not match the items in [interactions]') else: self.x_if = np.zeros([len(self.item_idx), 1], dtype=np.float32) def _init_weights(self, user_features=None, item_features=None): """initialize model weights given user/item and user-feature/item-feature indexes/shapes :param user_features: dataframe of user metadata features: [user_id, uf_1, ... , uf_n] :param item_features: dataframe of item metadata features: [item_id, if_1, ... , if_n] :return: None """ # initialize scalar weights as ndarrays of zeros self.w_i = np.zeros(len(self.item_idx)).astype(np.float32) self.w_if = np.zeros(self.x_if.shape[1]).astype(np.float32) # initialize latent factors by drawing random samples from a normal distribution self.v_u = np.random.normal(loc=0, scale=self.sigma, size=(len(self.user_idx), self.factors)).astype(np.float32) self.v_i = np.random.normal(loc=0, scale=self.sigma, size=(len(self.item_idx), self.factors)).astype(np.float32) # randomly initialize user feature factors if user features were supplied # NOTE: set all user feature factor weights to zero to prevent random scoring influence otherwise if user_features is not None: scale = (self.alpha / self.beta) * self.sigma self.v_uf = np.random.normal(loc=0, scale=scale, size=[self.x_uf.shape[1], self.factors]).astype(np.float32) else: self.v_uf = np.zeros([self.x_uf.shape[1], self.factors], dtype=np.float32) # randomly initialize item feature factors if item features were supplied # NOTE: set all item feature factor weights to zero to prevent random scoring influence otherwise if item_features is not None: scale = (self.alpha / self.beta) * self.sigma self.v_if = np.random.normal(loc=0, scale=scale, size=[self.x_if.shape[1], self.factors]).astype(np.float32) else: self.v_if = np.zeros([self.x_if.shape[1], self.factors], dtype=np.float32) # ------------------------------- # begin public method definitions # -------------------------------
[docs] def fit(self, interactions, user_features=None, item_features=None, sample_weight=None, epochs=1, verbose=False): """clear previous model state and learn new model weights using the input data :param interactions: dataframe of observed user/item interactions: [user_id, item_id] :param user_features: dataframe of user metadata features: [user_id, uf_1, ... , uf_n] :param item_features: dataframe of item metadata features: [item_id, if_1, ... , if_n] :param sample_weight: vector of importance weights for each observed interaction :param epochs: number of training epochs (full passes through observed interactions) :param verbose: whether to print epoch number and log-likelihood during training :return: self """ self._reset_state() self.fit_partial(interactions, user_features, item_features, sample_weight, epochs, verbose) return self
[docs] def fit_partial(self, interactions, user_features=None, item_features=None, sample_weight=None, epochs=1, verbose=False): """learn or update model weights using the input data and resuming from the current model state :param interactions: dataframe of observed user/item interactions: [user_id, item_id] :param user_features: dataframe of user metadata features: [user_id, uf_1, ... , uf_n] :param item_features: dataframe of item metadata features: [item_id, if_1, ... , if_n] :param sample_weight: vector of importance weights for each observed interaction :param epochs: number of training epochs (full passes through observed interactions) :param verbose: whether to print epoch number and log-likelihood during training :return: self """ assert isinstance(epochs, int) and epochs >= 1, "[epochs] must be a positive integer" assert isinstance(verbose, bool), "[verbose] must be a boolean value" if self.is_fit: self._init_interactions(interactions, sample_weight) self._init_features(user_features, item_features) else: self._init_all(interactions, user_features, item_features, sample_weight) # determine the number of negative samples to draw depending on the loss function # NOTE: if [loss == 'bpr'] -> [max_samples == 1] and [multiplier ~= 1] for all updates # NOTE: the [multiplier] is scaled by total number of items so it's always [0, 1] if self.loss == 'bpr': max_samples = 1 elif self.loss == 'warp': max_samples = self.max_samples else: raise ValueError('[loss] function not recognized') # NOTE: the cython private _fit() method updates the model weights in-place via typed memoryviews # NOTE: therefore there's nothing returned explicitly by either method _fit( self.interactions, self.sample_weight, self.user_items, self.x_uf, self.x_if, self.w_i, self.w_if, self.v_u, self.v_i, self.v_uf, self.v_if, self.alpha, self.beta, self.learning_rate, self.learning_schedule, self.learning_exponent, max_samples, epochs, verbose ) self.is_fit = True return self
[docs] def predict(self, pairs, cold_start='nan'): """calculate the predicted pointwise utilities for all (user, item) pairs :param pairs: dataframe of [user, item] pairs to score :param cold_start: whether to generate missing values ('nan') or drop ('drop') user/item pairs not found in training data :return: np.array of real-valued model scores """ assert isinstance(pairs, (np.ndarray, pd.DataFrame)), "[pairs] must be np.ndarray or pd.dataframe" assert pairs.shape[1] == 2, "[pairs] should be: [user_id, item_id]" assert self.is_fit, "you must fit the model prior to generating predictions" pred_pairs = pd.DataFrame(get_data(pairs).copy(), columns=['user_id', 'item_id']) pred_pairs['user_id'] = pred_pairs['user_id'].map(self.user_to_index) pred_pairs['item_id'] = pred_pairs['item_id'].map(self.item_to_index) pred_pairs = np.ascontiguousarray(pred_pairs, dtype=np.float32) scores = _predict( pred_pairs, self.x_uf, self.x_if, self.w_i, self.w_if, self.v_u, self.v_i, self.v_uf, self.v_if ) if cold_start == 'nan': return scores elif cold_start == 'drop': return scores[~np.isnan(scores)] else: raise ValueError("param [cold_start] must be set to either 'nan' or 'drop'")
[docs] def recommend(self, users, n_items=10, filter_previous=False, cold_start='nan'): """calculate the topN items for each user :param users: iterable of user identifiers for which to generate recommendations :param n_items: number of recommended items to generate for each user :param filter_previous: remove observed training items from generated recommendations :param cold_start: whether to generate missing values ('nan') or drop ('drop') users not found in training data :return: pandas dataframe where the index values are user identifiers and the columns are recommended items """ assert getattr(users, '__iter__', False), "[users] must be an iterable (e.g. list, array, series)" assert self.is_fit, "you must fit the model prior to generating recommendations" user_idx = np.ascontiguousarray(pd.Series(users).map(self.user_to_index), dtype=np.float32) rec_items = _recommend( user_idx, self.user_items, n_items, filter_previous, self.x_uf, self.x_if, self.w_i, self.w_if, self.v_u, self.v_i, self.v_uf, self.v_if ) rec_items = pd.DataFrame(rec_items, index=users).apply(lambda c: c.map(self.index_to_item)) if cold_start == 'nan': return rec_items elif cold_start == 'drop': return rec_items.dropna(how='any') else: raise ValueError("param [cold_start] must be set to either 'nan' or 'drop'")
[docs] def similar_items(self, item_id, n_items=10): """find the most similar items wrt latent factor space representation :param item_id: item to search :param n_items: number of similar items to return :return: np.array of topN most similar items wrt latent factor representations """ assert item_id in self.item_id.values, "you must select an [item_id] present in the training data" assert self.is_fit, "you must fit the model prior to generating similarities" try: item_idx = self.item_to_index.loc[item_id] except (KeyError, TypeError): print("item_id={} not found in training data".format(item_id)) # calculate item latent representations in F dimensional factor space lr_item = self.v_i[item_idx] + np.dot(self.v_if.T, self.x_if[item_idx]) lr_all_items = self.v_i + np.dot(self.x_if, self.v_if) # calculate the most similar N items excluding the search item similarities = pd.Series(np.dot(lr_all_items, lr_item)).drop(item_idx).sort_values(ascending=False)[:n_items] most_similar = pd.Series(similarities.index).map(self.index_to_item).values return most_similar
[docs] def similar_users(self, user_id, n_users=10): """find the most similar users wrt latent factor space representation :param user_id: user to search :param n_users: number of similar users to return :return: np.array of topN most similar users wrt latent factor representations """ assert user_id in self.user_id.values, "you must select an [user_id] present in the training data" assert self.is_fit, "you must fit the model prior to generating similarities" try: user_idx = self.user_to_index.loc[user_id] except (KeyError, TypeError): print("user_id={} not found in training data".format(user_id)) # calculate user latent representations in F dimensional factor space lr_user = self.v_u[user_idx] + np.dot(self.v_uf.T, self.x_uf[user_idx]) lr_all_users = self.v_u + np.dot(self.x_uf, self.v_uf) # calculate the most similar N users excluding the search user similarities = pd.Series(np.dot(lr_all_users, lr_user)).drop(user_idx).sort_values(ascending=False)[:n_users] most_similar = pd.Series(similarities.index).map(self.index_to_user).values return most_similar