Skip to main content
The simplest way to cache the results of tasks within in a flow is to set persist_result=True on a task definition.
from prefect import task, flow  @task(persist_result=True) def add_one(x: int):  return x + 1  @flow def my_flow():  add_one(1) # will not be cached  add_one(1) # will be cached  add_one(2) # will not be cached  if __name__ == "__main__":  my_flow() 
16:28:51.230 | INFO | Flow run 'outgoing-cicada' - Beginning flow run 'outgoing-cicada' for flow 'my-flow' 16:28:51.257 | INFO | Task run 'add_one-d85' - Finished in state Completed() 16:28:51.276 | INFO | Task run 'add_one-eff' - Finished in state Cached(type=COMPLETED) 16:28:51.294 | INFO | Task run 'add_one-d16' - Finished in state Completed() 16:28:51.311 | INFO | Flow run 'outgoing-cicada' - Finished in state Completed() 
This will implicitly use the DEFAULT cache policy, which is a composite cache policy defined as:
DEFAULT = INPUTS + TASK_SOURCE + RUN_ID 
This means subsequent calls of a task with identical inputs from within the same parent run will return cached results without executing the body of the function.
The TASK_SOURCE component of the DEFAULT cache policy helps avoid naming collisions between similar tasks that should not share a cache.

Cache based on inputs

To cache the result of a task based only on task inputs, set cache_policy=INPUTS in the task decorator:
from prefect import task from prefect.cache_policies import INPUTS  import time   @task(cache_policy=INPUTS) def my_stateful_task(x: int):  print('sleeping')  time.sleep(10)  return x + 1  my_stateful_task(x=1) # sleeps my_stateful_task(x=1) # does not sleep my_stateful_task(x=2) # sleeps 
The above task will sleep the first time it is called with x=1, but will not sleep for any subsequent calls with the same input. Prefect ships with several cache policies that can be used to customize caching behavior.

Cache based on a subset of inputs

To cache based on a subset of inputs, you can subtract kwargs from the INPUTS cache policy.
from prefect import task from prefect.cache_policies import INPUTS  import time   @task(cache_policy=INPUTS - 'debug') def my_stateful_task(x: int, debug: bool = False):  print('sleeping')  time.sleep(10)  return x + 1  my_stateful_task(x=1) # sleeps my_stateful_task(x=1, debug=True) # does not sleep my_stateful_task(x=1, debug=False) # does not sleep 

Cache with an expiration

To cache with an expiration, set the cache_expiration parameter on the task decorator.
from prefect import task from prefect.cache_policies import INPUTS  import time from datetime import timedelta   @task(cache_policy=INPUTS, cache_expiration=timedelta(seconds=10)) def my_stateful_task(x: int):  print('sleeping')  time.sleep(10)  return x + 1  my_stateful_task(x=1) # sleeps my_stateful_task(x=1) # does not sleep # ... 10 seconds pass ... my_stateful_task(x=1) # sleeps again 

Ignore the cache

To ignore the cache regardless of the cache policy, set the refresh_cache parameter on the task decorator.
import random  from prefect import task from prefect.cache_policies import TASK_SOURCE   @task(cache_policy=TASK_SOURCE, refresh_cache=True) def never_caching_task():  return random.random() 
To refresh the cache for all tasks, use the PREFECT_TASKS_REFRESH_CACHE setting. Setting PREFECT_TASKS_REFRESH_CACHE=true changes the default behavior of all tasks to refresh. If you have tasks that should not refresh when this setting is enabled, set refresh_cache to False. These tasks will never write to the cache. If a cache key exists it will be read, not updated. If a cache key does not exist yet, these tasks can still write to the cache.
import random  from prefect import task from prefect.cache_policies import TASK_SOURCE   @task(cache_policy=TASK_SOURCE, refresh_cache=False) def caching_task():  return random.random() 

Cache on multiple criteria

Cache policies can be combined using the + operator.
from prefect import task from prefect.cache_policies import INPUTS, TASK_SOURCE  @task(cache_policy=INPUTS + TASK_SOURCE) def my_task(x: int):  return x + 1 
The above task will use a cached result as long as the same inputs and task source are used.

Cache in a distributed environment

By default Prefect stores results locally in ~/.prefect/storage/. To share the cache across tasks running on different machines, provide a storage block to the result_storage parameter on the task decorator. Here’s an example with of a task that uses an S3 bucket to store cache records:
from prefect import task from prefect.cache_policies import INPUTS  from prefect_aws import AwsCredentials, S3Bucket  s3_bucket = S3Bucket(  credentials=AwsCredentials(  aws_access_key_id="my-access-key-id",  aws_secret_access_key="my-secret-access-key",  ),  bucket_name="my-bucket", ) # save the block to ensure it is available across machines s3_bucket.save("my-cache-bucket")  @task(cache_policy=INPUTS, result_storage=s3_bucket) def my_cached_task(x: int):  return x + 42 
When using a storage block from a Prefect integration package, the package the storage block is imported from must be installed in all environments where the task will run.For example, the prefect_aws package must be installed to use the S3Bucket storage block.

Further reading

For more advanced caching examples, see the advanced caching how-to guide. For more information on Prefect’s caching system, see the caching concepts page.