Source code for featuretools.entityset.deserialize

import json
import os
import tarfile
import tempfile
from pathlib import Path

import pandas as pd
from dask import dataframe as dd

from featuretools.entityset.relationship import Relationship
from featuretools.entityset.serialize import FORMATS
from featuretools.utils.gen_utils import check_schema_version
from featuretools.utils.s3_utils import get_transport_params, use_smartopen_es
from featuretools.utils.wrangle import _is_s3, _is_url
from featuretools.variable_types.variable import LatLong, find_variable_types


def description_to_variable(description, entity=None):
    '''Deserialize variable from variable description.

    Args:
        description (dict) : Description of :class:`.Variable`.
        entity (Entity) : Instance of :class:`.Entity` to add :class:`.Variable`. If entity is None, :class:`.Variable` will not be instantiated.

    Returns:
        variable (Variable) : Returns :class:`.Variable`.
    '''
    is_type_string = isinstance(description['type'], str)
    variable = description['type'] if is_type_string else description['type'].pop('value')
    if entity is not None:
        variable_types = find_variable_types()
        variable_class = variable_types.get(variable, variable_types.get('unknown'))
        kwargs = {} if is_type_string else description['type']
        variable = variable_class(description['id'], entity, **kwargs)
        interesting_values = pd.read_json(description['properties']['interesting_values'], typ='series')
        variable.interesting_values = interesting_values
    return variable


def description_to_entity(description, entityset, path=None):
    '''Deserialize entity from entity description and add to entityset.

    Args:
        description (dict) : Description of :class:`.Entity`.
        entityset (EntitySet) : Instance of :class:`.EntitySet` to add :class:`.Entity`.
        path (str) : Root directory to serialized entityset.
    '''
    if path:
        dataframe = read_entity_data(description, path=path)
    else:
        dataframe = empty_dataframe(description)
    variable_types = {variable['id']: (description_to_variable(variable), variable)
                      for variable in description['variables']}
    es = entityset.entity_from_dataframe(
        description['id'],
        dataframe,
        index=description.get('index'),
        time_index=description.get('time_index'),
        secondary_time_index=description['properties'].get('secondary_time_index'),
        variable_types={variable: variable_types[variable][0] for variable in variable_types})
    for variable in es[description['id']].variables:
        interesting_values = variable_types[variable.id][1]['properties']['interesting_values']
        interesting_values = pd.read_json(interesting_values, typ="series")
        variable.interesting_values = interesting_values


def description_to_entityset(description, **kwargs):
    '''Deserialize entityset from data description.

    Args:
        description (dict) : Description of an :class:`.EntitySet`. Likely generated using :meth:`.serialize.entityset_to_description`
        kwargs (keywords): Additional keyword arguments to pass as keywords arguments to the underlying deserialization method.

    Returns:
        entityset (EntitySet) : Instance of :class:`.EntitySet`.
    '''
    check_schema_version(description, 'entityset')

    from featuretools.entityset import EntitySet
    # If data description was not read from disk, path is None.
    path = description.get('path')
    entityset = EntitySet(description['id'])

    last_time_index = []
    for entity in description['entities'].values():
        entity['loading_info']['params'].update(kwargs)
        # If path is None, an empty dataframe will be created for entity.
        description_to_entity(entity, entityset, path=path)
        if entity['properties']['last_time_index']:
            last_time_index.append(entity['id'])

    for relationship in description['relationships']:
        relationship = Relationship.from_dictionary(relationship, entityset)
        entityset.add_relationship(relationship)

    if len(last_time_index):
        entityset.add_last_time_indexes(updated_entities=last_time_index)

    return entityset


def empty_dataframe(description):
    '''Deserialize empty dataframe from entity description.

    Args:
        description (dict) : Description of :class:`.Entity`.

    Returns:
        df (DataFrame) : Empty dataframe for entity.
    '''
    columns = [variable['id'] for variable in description['variables']]
    dtypes = description['loading_info']['properties']['dtypes']
    return pd.DataFrame(columns=columns).astype(dtypes)


def read_entity_data(description, path):
    '''Read description data from disk.

    Args:
        description (dict) : Description of :class:`.Entity`.
        path (str): Location on disk to read entity data.

    Returns:
        df (DataFrame) : Instance of dataframe.
    '''
    file = os.path.join(path, description['loading_info']['location'])
    kwargs = description['loading_info'].get('params', {})
    load_format = description['loading_info']['type']
    entity_type = description['loading_info'].get('entity_type', 'pandas')
    if entity_type == 'dask':
        lib = dd
    else:
        lib = pd
    if load_format == 'csv':
        dataframe = lib.read_csv(
            file,
            engine=kwargs['engine'],
            compression=kwargs['compression'],
            encoding=kwargs['encoding'],
        )
    elif load_format == 'parquet':
        dataframe = lib.read_parquet(file, engine=kwargs['engine'])
    elif load_format == 'pickle':
        dataframe = pd.read_pickle(file, **kwargs)
    else:
        error = 'must be one of the following formats: {}'
        raise ValueError(error.format(', '.join(FORMATS)))
    dtypes = description['loading_info']['properties']['dtypes']
    dataframe = dataframe.astype(dtypes)

    if load_format in ['parquet', 'csv']:
        latlongs = []
        for var_description in description['variables']:
            if var_description['type']['value'] == LatLong.type_string:
                latlongs.append(var_description["id"])

        def parse_latlong(x):
            return tuple(float(y) for y in x[1:-1].split(","))

        for column in latlongs:
            if entity_type == 'dask':
                meta = (column, tuple([float, float]))
                dataframe[column] = dataframe[column].apply(parse_latlong,
                                                            meta=meta)
            else:
                dataframe[column] = dataframe[column].apply(parse_latlong)

    return dataframe


def read_data_description(path):
    '''Read data description from disk, S3 path, or URL.

        Args:
            path (str): Location on disk, S3 path, or URL to read `data_description.json`.

        Returns:
            description (dict) : Description of :class:`.EntitySet`.
    '''

    path = os.path.abspath(path)
    assert os.path.exists(path), '"{}" does not exist'.format(path)
    file = os.path.join(path, 'data_description.json')
    with open(file, 'r') as file:
        description = json.load(file)
    description['path'] = path
    return description


[docs]def read_entityset(path, profile_name=None, **kwargs): '''Read entityset from disk, S3 path, or URL. Args: path (str): Directory on disk, S3 path, or URL to read `data_description.json`. profile_name (str, bool): The AWS profile specified to write to S3. Will default to None and search for AWS credentials. Set to False to use an anonymous profile. kwargs (keywords): Additional keyword arguments to pass as keyword arguments to the underlying deserialization method. ''' if _is_url(path) or _is_s3(path): with tempfile.TemporaryDirectory() as tmpdir: file_name = Path(path).name file_path = os.path.join(tmpdir, file_name) transport_params = None if _is_s3(path): transport_params = get_transport_params(profile_name) use_smartopen_es(file_path, path, transport_params) with tarfile.open(str(file_path)) as tar: tar.extractall(path=tmpdir) data_description = read_data_description(tmpdir) return description_to_entityset(data_description, **kwargs) else: data_description = read_data_description(path) return description_to_entityset(data_description, **kwargs)