Source code for featuretools.entityset.entity

import logging
import warnings

import dask.dataframe as dd
import numpy as np
import pandas as pd
import pandas.api.types as pdtypes

from featuretools import variable_types as vtypes
from featuretools.utils.entity_utils import (
    col_is_datetime,
    convert_all_variable_data,
    convert_variable_data,
    get_linked_vars,
    infer_variable_types
)
from featuretools.utils.wrangle import (
    _check_time_type,
    _check_timedelta,
    _dataframes_equal
)
from featuretools.variable_types import find_variable_types

logger = logging.getLogger('featuretools.entityset')

_numeric_types = vtypes.PandasTypes._pandas_numerics
_categorical_types = [vtypes.PandasTypes._categorical]
_datetime_types = vtypes.PandasTypes._pandas_datetimes


[docs]class Entity(object): """Represents an entity in a Entityset, and stores relevant metadata and data An Entity is analogous to a table in a relational database See Also: :class:`.Relationship`, :class:`.Variable`, :class:`.EntitySet` """
[docs] def __init__(self, id, df, entityset, variable_types=None, index=None, time_index=None, secondary_time_index=None, last_time_index=None, already_sorted=False, make_index=False, verbose=False): """ Create Entity Args: id (str): Id of Entity. df (pd.DataFrame): Dataframe providing the data for the entity. entityset (EntitySet): Entityset for this Entity. variable_types (dict[str -> type/str/dict[str -> type]]) : An entity's variable_types dict maps string variable ids to types (:class:`.Variable`) or type_string (str) or (type, kwargs) to pass keyword arguments to the Variable. index (str): Name of id column in the dataframe. time_index (str): Name of time column in the dataframe. secondary_time_index (dict[str -> str]): Dictionary mapping columns in the dataframe to the time index column they are associated with. last_time_index (pd.Series): Time index of the last event for each instance across all child entities. make_index (bool, optional) : If True, assume index does not exist as a column in dataframe, and create a new column of that name using integers the (0, len(dataframe)). Otherwise, assume index exists in dataframe. """ _validate_entity_params(id, df, time_index) created_index, index, df = _create_index(index, make_index, df) self.id = id self.entityset = entityset self.data = {'df': df, 'last_time_index': last_time_index} self.created_index = created_index self._verbose = verbose secondary_time_index = secondary_time_index or {} self._create_variables(variable_types, index, time_index, secondary_time_index) self.df = df[[v.id for v in self.variables]] self.set_index(index) self.time_index = None if time_index: self.set_time_index(time_index, already_sorted=already_sorted) self.set_secondary_time_index(secondary_time_index)
def __repr__(self): repr_out = u"Entity: {}\n".format(self.id) repr_out += u" Variables:" for v in self.variables: repr_out += u"\n {} (dtype: {})".format(v.id, v.type_string) shape = self.shape repr_out += u"\n Shape:\n (Rows: {}, Columns: {})".format( shape[0], shape[1]) return repr_out @property def shape(self): '''Shape of the entity's dataframe''' return self.df.shape def __eq__(self, other, deep=False): if self.index != other.index: return False if self.time_index != other.time_index: return False if self.secondary_time_index != other.secondary_time_index: return False if len(self.variables) != len(other.variables): return False if set(self.variables) != set(other.variables): return False if deep: if self.last_time_index is None and other.last_time_index is not None: return False elif self.last_time_index is not None and other.last_time_index is None: return False elif self.last_time_index is not None and other.last_time_index is not None: if not self.last_time_index.equals(other.last_time_index): return False if not _dataframes_equal(self.df, other.df): return False variables = {variable: (variable, ) for variable in self.variables} for variable in other.variables: variables[variable] += (variable, ) for self_var, other_var in variables.values(): if not self_var.__eq__(other_var, deep=True): return False return True def __sizeof__(self): return sum([value.__sizeof__() for value in self.data.values()]) @property def df(self): '''Dataframe providing the data for the entity.''' return self.data["df"] @df.setter def df(self, _df): self.data["df"] = _df @property def last_time_index(self): ''' Time index of the last event for each instance across all child entities. ''' return self.data["last_time_index"] @last_time_index.setter def last_time_index(self, lti): self.data["last_time_index"] = lti def __hash__(self): return id(self.id) def __getitem__(self, variable_id): return self._get_variable(variable_id) def _get_variable(self, variable_id): """Get variable instance Args: variable_id (str) : Id of variable to get. Returns: :class:`.Variable` : Instance of variable. Raises: RuntimeError : if no variable exist with provided id """ for v in self.variables: if v.id == variable_id: return v raise KeyError("Variable: %s not found in entity" % (variable_id)) @property def variable_types(self): '''Dictionary mapping variable id's to variable types''' return {v.id: type(v) for v in self.variables}
[docs] def convert_variable_type(self, variable_id, new_type, convert_data=True, **kwargs): """Convert variable in dataframe to different type Args: variable_id (str) : Id of variable to convert. new_type (subclass of `Variable`) : Type of variable to convert to. entityset (:class:`.BaseEntitySet`) : EntitySet associated with this entity. convert_data (bool) : If True, convert underlying data in the EntitySet. Raises: RuntimeError : Raises if it cannot convert the underlying data Examples: >>> from featuretools.tests.testing_utils import make_ecommerce_entityset >>> es = make_ecommerce_entityset() >>> es["customers"].convert_variable_type("engagement_level", vtypes.Categorical) """ if convert_data: # first, convert the underlying data (or at least try to) self.df = convert_variable_data(df=self.df, column_id=variable_id, new_type=new_type, **kwargs) # replace the old variable with the new one, maintaining order variable = self._get_variable(variable_id) new_variable = new_type.create_from(variable) self.variables[self.variables.index(variable)] = new_variable
def query_by_values(self, instance_vals, variable_id=None, columns=None, time_last=None, training_window=None, include_cutoff_time=True): """Query instances that have variable with given value Args: instance_vals (pd.Dataframe, pd.Series, list[str] or str) : Instance(s) to match. variable_id (str) : Variable to query on. If None, query on index. columns (list[str]) : Columns to return. Return all columns if None. time_last (pd.TimeStamp) : Query data up to and including this time. Only applies if entity has a time index. training_window (Timedelta, optional): Window defining how much time before the cutoff time data can be used when calculating features. If None, all data before cutoff time is used. include_cutoff_time (bool): If True, data at cutoff time are included in calculating features Returns: pd.DataFrame : instances that match constraints with ids in order of underlying dataframe """ if not variable_id: variable_id = self.index instance_vals = self._vals_to_series(instance_vals, variable_id) training_window = _check_timedelta(training_window) if training_window is not None: assert training_window.has_no_observations(), "Training window cannot be in observations" if instance_vals is None: df = self.df.copy() elif isinstance(instance_vals, pd.Series) and instance_vals.empty: df = self.df.head(0) else: if isinstance(instance_vals, dd.Series): df = self.df.merge(instance_vals.to_frame(), how="inner", on=variable_id) else: df = self.df[self.df[variable_id].isin(instance_vals)] if isinstance(self.df, pd.DataFrame): df = df.set_index(self.index, drop=False) # ensure filtered df has same categories as original # workaround for issue below # github.com/pandas-dev/pandas/issues/22501#issuecomment-415982538 if pdtypes.is_categorical_dtype(self.df[variable_id]): categories = pd.api.types.CategoricalDtype(categories=self.df[variable_id].cat.categories) df[variable_id] = df[variable_id].astype(categories) df = self._handle_time(df=df, time_last=time_last, training_window=training_window, include_cutoff_time=include_cutoff_time) if columns is not None: df = df[columns] return df def _create_variables(self, variable_types, index, time_index, secondary_time_index): """Extracts the variables from a dataframe Args: variable_types (dict[str -> types/str/dict[str -> type]]) : An entity's variable_types dict maps string variable ids to types (:class:`.Variable`) or type_strings (str) or (type, kwargs) to pass keyword arguments to the Variable. index (str): Name of index column time_index (str or None): Name of time_index column secondary_time_index (dict[str: [str]]): Dictionary of secondary time columns that each map to a list of columns that depend on that secondary time """ variables = [] variable_types = variable_types.copy() or {} string_to_class_map = find_variable_types() for vid in variable_types.copy(): vtype = variable_types[vid] if isinstance(vtype, str): if vtype in string_to_class_map: variable_types[vid] = string_to_class_map[vtype] else: variable_types[vid] = string_to_class_map['unknown'] warnings.warn("Variable type {} was unrecognized, Unknown variable type was used instead".format(vtype)) if index not in variable_types: variable_types[index] = vtypes.Index link_vars = get_linked_vars(self) inferred_variable_types = infer_variable_types(self.df, link_vars, variable_types, time_index, secondary_time_index) inferred_variable_types.update(variable_types) for v in inferred_variable_types: # TODO document how vtype can be tuple vtype = inferred_variable_types[v] if isinstance(vtype, tuple): # vtype is (ft.Variable, dict_of_kwargs) _v = vtype[0](v, self, **vtype[1]) else: _v = inferred_variable_types[v](v, self) variables += [_v] # convert data once we've inferred self.df = convert_all_variable_data(df=self.df, variable_types=inferred_variable_types) # make sure index is at the beginning index_variable = [v for v in variables if v.id == index][0] self.variables = [index_variable] + [v for v in variables if v.id != index] def update_data(self, df, already_sorted=False, recalculate_last_time_indexes=True): '''Update entity's internal dataframe, optionaly making sure data is sorted, reference indexes to other entities are consistent, and last_time_indexes are consistent. ''' if len(df.columns) != len(self.variables): raise ValueError("Updated dataframe contains {} columns, expecting {}".format(len(df.columns), len(self.variables))) for v in self.variables: if v.id not in df.columns: raise ValueError("Updated dataframe is missing new {} column".format(v.id)) # Make sure column ordering matches variable ordering self.df = df[[v.id for v in self.variables]] self.set_index(self.index) if self.time_index is not None: self.set_time_index(self.time_index, already_sorted=already_sorted) self.set_secondary_time_index(self.secondary_time_index) if recalculate_last_time_indexes and self.last_time_index is not None: self.entityset.add_last_time_indexes(updated_entities=[self.id]) self.entityset.reset_data_description()
[docs] def add_interesting_values(self, max_values=5, verbose=False): """ Find interesting values for categorical variables, to be used to generate "where" clauses Args: max_values (int) : Maximum number of values per variable to add. verbose (bool) : If True, print summary of interesting values found. Returns: None """ for variable in self.variables: # some heuristics to find basic 'where'-able variables if isinstance(variable, vtypes.Discrete): variable.interesting_values = pd.Series(dtype=variable.entity.df[variable.id].dtype) # TODO - consider removing this constraints # don't add interesting values for entities in relationships skip = False for r in self.entityset.relationships: if variable in [r.child_variable, r.parent_variable]: skip = True break if skip: continue counts = self.df[variable.id].value_counts() # find how many of each unique value there are; sort by count, # and add interesting values to each variable total_count = np.sum(counts) counts[:] = counts.sort_values()[::-1] for i in range(min(max_values, len(counts.index))): idx = counts.index[i] # add the value to interesting_values if it represents more than # 25% of the values we have not seen so far if len(counts.index) < 25: if verbose: msg = "Variable {}: Marking {} as an " msg += "interesting value" logger.info(msg.format(variable.id, idx)) variable.interesting_values = variable.interesting_values.append(pd.Series([idx])) else: fraction = counts[idx] / total_count if fraction > 0.05 and fraction < 0.95: if verbose: msg = "Variable {}: Marking {} as an " msg += "interesting value" logger.info(msg.format(variable.id, idx)) variable.interesting_values = variable.interesting_values.append(pd.Series([idx])) # total_count -= counts[idx] else: break self.entityset.reset_data_description()
def delete_variables(self, variable_ids): """ Remove variables from entity's dataframe and from self.variables """ self.df = self.df.drop(variable_ids, axis=1) for v_id in variable_ids: v = self._get_variable(v_id) self.variables.remove(v) def set_time_index(self, variable_id, already_sorted=False): # check time type if isinstance(self.df, dd.DataFrame) or self.df.empty: time_to_check = vtypes.DEFAULT_DTYPE_VALUES[self[variable_id]._default_pandas_dtype] else: time_to_check = self.df[variable_id].iloc[0] time_type = _check_time_type(time_to_check) if time_type is None: raise TypeError("%s time index not recognized as numeric or" " datetime" % (self.id)) if self.entityset.time_type is None: self.entityset.time_type = time_type elif self.entityset.time_type != time_type: raise TypeError("%s time index is %s type which differs from" " other entityset time indexes" % (self.id, time_type)) if isinstance(self.df, dd.DataFrame): t = time_type # skip checking values already_sorted = True # skip sorting else: t = vtypes.NumericTimeIndex if col_is_datetime(self.df[variable_id]): t = vtypes.DatetimeTimeIndex # use stable sort if not already_sorted: # sort by time variable, then by index self.df = self.df.sort_values([variable_id, self.index]) self.convert_variable_type(variable_id, t, convert_data=False) self.time_index = variable_id def set_index(self, variable_id, unique=True): """ Args: variable_id (string) : Name of an existing variable to set as index. unique (bool) : Whether to assert that the index is unique. """ if isinstance(self.df, pd.DataFrame): self.df = self.df.set_index(self.df[variable_id], drop=False) self.df.index.name = None if unique: assert self.df.index.is_unique, "Index is not unique on dataframe " \ "(Entity {})".format(self.id) self.convert_variable_type(variable_id, vtypes.Index, convert_data=False) self.index = variable_id def set_secondary_time_index(self, secondary_time_index): for time_index, columns in secondary_time_index.items(): if len(self.df) == 0: time_to_check = vtypes.DEFAULT_DTYPE_VALUES[self[time_index]._default_pandas_dtype] else: time_to_check = self.df[time_index].head(1).iloc[0] time_type = _check_time_type(time_to_check) if time_type is None: raise TypeError("%s time index not recognized as numeric or" " datetime" % (self.id)) if self.entityset.time_type != time_type: raise TypeError("%s time index is %s type which differs from" " other entityset time indexes" % (self.id, time_type)) if time_index not in columns: columns.append(time_index) self.secondary_time_index = secondary_time_index def _vals_to_series(self, instance_vals, variable_id): """ instance_vals may be a pd.Dataframe, a pd.Series, a list, a single value, or None. This function always returns a Series or None. """ if instance_vals is None: return None # If this is a single value, make it a list if not hasattr(instance_vals, '__iter__'): instance_vals = [instance_vals] # convert iterable to pd.Series if type(instance_vals) == pd.DataFrame: out_vals = instance_vals[variable_id] elif type(instance_vals) == pd.Series or type(instance_vals) == dd.Series: out_vals = instance_vals.rename(variable_id) else: out_vals = pd.Series(instance_vals) # no duplicates or NaN values out_vals = out_vals.drop_duplicates().dropna() # want index to have no name for the merge in query_by_values out_vals.index.name = None return out_vals def _handle_time(self, df, time_last=None, training_window=None, include_cutoff_time=True): """ Filter a dataframe for all instances before time_last. If this entity does not have a time index, return the original dataframe. """ if self.time_index: df_empty = df.empty if isinstance(df, pd.DataFrame) else False if time_last is not None and not df_empty: if include_cutoff_time: df = df[df[self.time_index] <= time_last] else: df = df[df[self.time_index] < time_last] if training_window is not None: training_window = _check_timedelta(training_window) if include_cutoff_time: mask = df[self.time_index] > time_last - training_window else: mask = df[self.time_index] >= time_last - training_window if self.last_time_index is not None: lti_slice = self.last_time_index.reindex(df.index) if include_cutoff_time: lti_mask = lti_slice > time_last - training_window else: lti_mask = lti_slice >= time_last - training_window mask = mask | lti_mask else: logger.warning( "Using training_window but last_time_index is " "not set on entity %s" % (self.id) ) df = df[mask] for secondary_time_index, columns in self.secondary_time_index.items(): # should we use ignore time last here? df_empty = df.empty if isinstance(df, pd.DataFrame) else False if time_last is not None and not df_empty: mask = df[secondary_time_index] >= time_last if isinstance(df, dd.DataFrame): for col in columns: df[col] = df[col].mask(mask, np.nan) else: df.loc[mask, columns] = np.nan return df
def _create_index(index, make_index, df): '''Handles index creation logic base on user input''' created_index = None if index is None: # Case 1: user wanted to make index but did not specify column name assert not make_index, "Must specify an index name if make_index is True" # Case 2: make_index not specified but no index supplied, use first column logger.warning(("Using first column as index. ", "To change this, specify the index parameter")) index = df.columns[0] elif make_index and index in df.columns: # Case 3: user wanted to make index but column already exists raise RuntimeError("Cannot make index: index variable already present") elif index not in df.columns: if not make_index: # Case 4: user names index, it is not in df. does not specify # make_index. Make new index column and warn logger.warning("index %s not found in dataframe, creating new " "integer column", index) # Case 5: make_index with no errors or warnings # (Case 4 also uses this code path) if isinstance(df, dd.DataFrame): df[index] = 1 df[index] = df[index].cumsum() - 1 else: df.insert(0, index, range(len(df))) created_index = index # Case 6: user specified index, which is already in df. No action needed. return created_index, index, df def _validate_entity_params(id, df, time_index): '''Validation checks for Entity inputs''' assert isinstance(id, str), "Entity id must be a string" assert len(df.columns) == len(set(df.columns)), "Duplicate column names" for c in df.columns: if not isinstance(c, str): raise ValueError("All column names must be strings (Column {} " "is not a string)".format(c)) if time_index is not None and time_index not in df.columns: raise LookupError('Time index not found in dataframe')