Improving Computational Performance#

Feature engineering is a computationally expensive task. While Featuretools comes with reasonable default settings for feature calculation, there are a number of built-in approaches to improve computational performance based on dataset and problem specific considerations.

Reduce number of unique cutoff times#

Each row in a feature matrix created by Featuretools is calculated at a specific cutoff time that represents the last point in time that data from any dataframe in an entityset can be used to calculate the feature. As a result, calculations incur an overhead in finding the subset of allowed data for each distinct time in the calculation.

Note

Featuretools is very precise in how it deals with time. For more information, see Handling Time.

If there are many unique cutoff times, it is often worthwhile to figure out how to have fewer. This can be done manually by figuring out which unique times are necessary for the prediction problem or automatically using approximate.

Parallel Feature Computation#

Computational performance can often be improved by parallelizing the feature calculation process. There are several different approaches that can be used to perform parallel feature computation with Featuretools. An overview of the most commonly used approaches is provided below.

Simple Parallel Feature Computation#

If using a pandas EntitySet, Featuretools can optionally compute features on multiple cores. The simplest way to control the amount of parallelism is to specify the n_jobs parameter:

fm = ft.calculate_feature_matrix(features=features,
                                 entityset=entityset,
                                 cutoff_time=cutoff_time,
                                 n_jobs=2,
                                 verbose=True)

The above command will start 2 processes to compute chunks of the feature matrix in parallel. Each process receives its own copy of the entityset, so memory use will be proportional to the number of parallel processes. Because the entityset has to be copied to each process, there is overhead to perform this operation before calculation can begin. To avoid this overhead on successive calls to calculate_feature_matrix, read the section below on using a persistent cluster.

Adjust chunk size#

By default, Featuretools calculates rows with the same cutoff time simultaneously. The chunk_size parameter limits the maximum number of rows that will be grouped and then calculated together. If calculation is done using parallel processing, the default chunk size is set to be 1 / n_jobs to ensure the computation can be spread across available workers. Normally, this behavior works well, but if there are only a few unique cutoff times it can lead to higher peak memory usage (due to more intermediate calculations stored in memory) or limited parallelism (if the number of chunks is less than n_jobs).

By setting chunk_size, we can limit the maximum number of rows in each group to specific number or a percentage of the overall data when calling ft.dfs or ft.calculate_feature_matrix:

# use maximum  100 rows per chunk
feature_matrix, features_list = ft.dfs(entityset=es,
                                       target_dataframe_name="customers",
                                       chunk_size=100)

We can also set chunk size to be a percentage of total rows:

# use maximum 5% of all rows per chunk
feature_matrix, features_list = ft.dfs(entityset=es,
                                       target_dataframe_name="customers",
                                       chunk_size=.05)

Using persistent cluster#

Behind the scenes, Featuretools uses Dask’s distributed scheduler to implement multiprocessing. When you only specify the n_jobs parameter, a cluster will be created for that specific feature matrix calculation and destroyed once calculations have finished. A drawback of this is that each time a feature matrix is calculated, the entityset has to be transmitted to the workers again. To avoid this, we would like to reuse the same cluster between calls. The way to do this is by creating a cluster first and telling featuretools to use it with the dask_kwargs parameter:

import featuretools as ft
from dask.distributed import LocalCluster

cluster = LocalCluster()
fm_1 = ft.calculate_feature_matrix(features=features_1,
                                   entityset=entityset,
                                   cutoff_time=cutoff_time,
                                   dask_kwargs={'cluster': cluster},
                                   verbose=True)

The ‘cluster’ value can either be the actual cluster object or a string of the address the cluster’s scheduler can be reached at. The call below would also work. This second feature matrix calculation will not need to resend the entityset data to the workers because it has already been saved on the cluster.

fm_2 = ft.calculate_feature_matrix(features=features_2,
                                   entityset=entityset,
                                   cutoff_time=cutoff_time,
                                   dask_kwargs={'cluster': cluster.scheduler.address},
                                   verbose=True)

Note

When using a persistent cluster, Featuretools publishes a copy of the EntitySet to the cluster the first time it calculates a feature matrix. Based on the EntitySet’s metadata the cluster will reuse it for successive computations. This means if two EntitySets have the same metadata but different row values (e.g. new data is added to the EntitySet), Featuretools won’t recopy the second EntitySet in later calls. A simple way to avoid this scenario is to use a unique EntitySet id.

Using the distributed dashboard#

Dask.distributed has a web-based diagnostics dashboard that can be used to analyze the state of the workers and tasks. It can also be useful for tracking memory use or visualizing task run-times. An in-depth description of the web interface can be found here.

Distributed dashboard image

The dashboard requires an additional python package, bokeh, to work. Once bokeh is installed, the web interface will be launched by default when a LocalCluster is created. The cluster created by featuretools when using n_jobs does not enable the web interface automatically. To do so, the port to launch the main web interface on must be specified in dask_kwargs:

fm = ft.calculate_feature_matrix(features=features,
                                 entityset=entityset,
                                 cutoff_time=cutoff_time,
                                 n_jobs=2,
                                 dask_kwargs={'diagnostics_port': 8787}
                                 verbose=True)

Parallel Computation by Partitioning Data#

As an alternative to Featuretools’ parallelization, the data can be partitioned and the feature calculations run on multiple cores or a cluster using Dask or Apache Spark with PySpark. This approach may be necessary with a large pandas EntitySet because the current parallel implementation sends the entire EntitySet to each worker which may exhaust the worker memory. Dask and Spark allow Featuretools to scale to multiple cores on a single machine or multiple machines on a cluster.

When an entire dataset is not required to calculate the features for a given set of instances, we can split the data into independent partitions and calculate on each partition. For example, imagine we are calculating features for customers and the features are “number of other customers in this zip code” or “average age of other customers in this zip code”. In this case, we can load in data partitioned by zip code. As long as we have all of the data for a zip code when calculating, we can calculate all features for a subset of customers.

An example of this approach can be seen in the Predict Next Purchase demo notebook. In this example, we partition data by customer and only load a fixed number of customers into memory at any given time. We implement this easily using Dask, which could also be used to scale the computation to a cluster of computers. A framework like Spark could be used similarly.

An additional example of partitioning data to distribute on multiple cores or a cluster using Dask can be seen in the Featuretools on Dask notebook. This approach is detailed in the Parallelizing Feature Engineering with Dask article on the Feature Labs engineering blog. Dask allows for simple scaling to multiple cores on a single computer or multiple machines on a cluster.

For a similar partition and distribute implementation using Apache Spark with PySpark, refer to the Feature Engineering on Spark notebook. This implementation shows how to carry out feature engineering on a cluster of EC2 instances using Spark as the distributed framework.