Enrichment with Google Cloud Vertex AI Feature Store

Pydoc Pydoc




In Apache Beam 2.55.0 and later versions, the enrichment transform includes a built-in enrichment handler for Vertex AI Feature Store. The following example demonstrates how to create a pipeline that use the enrichment transform with the VertexAIFeatureStoreEnrichmentHandler handler and the VertexAIFeatureStoreLegacyEnrichmentHandler handler.

Example 1: Enrichment with Vertex AI Feature Store

The precomputed feature values stored in Vertex AI Feature Store uses the following format:

user_idagegenderstatecountry
2142212000
296312111
2059212122
7653812130
import apache_beam as beam from apache_beam.transforms.enrichment import Enrichment from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store \  import VertexAIFeatureStoreEnrichmentHandler  project_id = 'apache-beam-testing' location = 'us-central1' api_endpoint = f"{location}-aiplatform.googleapis.com" data = [  beam.Row(user_id='2963', product_id=14235, sale_price=15.0),  beam.Row(user_id='21422', product_id=11203, sale_price=12.0),  beam.Row(user_id='20592', product_id=8579, sale_price=9.0), ]  vertex_ai_handler = VertexAIFeatureStoreEnrichmentHandler(  project=project_id,  location=location,  api_endpoint=api_endpoint,  feature_store_name="vertexai_enrichment_example",  feature_view_name="users",  row_key="user_id", ) with beam.Pipeline() as p:  _ = (  p  | "Create" >> beam.Create(data)  | "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler)  | "Print" >> beam.Map(print))

Output:

Row(user_id='2963', product_id=14235, sale_price=15.0, age=12.0, state='1', gender='1', country='1') Row(user_id='21422', product_id=11203, sale_price=12.0, age=12.0, state='0', gender='0', country='0') Row(user_id='20592', product_id=8579, sale_price=9.0, age=12.0, state='2', gender='1', country='2')

Example 2: Enrichment with Vertex AI Feature Store (legacy)

The precomputed feature values stored in Vertex AI Feature Store (Legacy) use the following format:

entity_idtitlegenres
movie_01The Shawshank RedemptionDrama
movie_02The ShiningHorror
movie_04The Dark KnightAction
import apache_beam as beam from apache_beam.transforms.enrichment import Enrichment from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store \  import VertexAIFeatureStoreLegacyEnrichmentHandler  project_id = 'apache-beam-testing' location = 'us-central1' api_endpoint = f"{location}-aiplatform.googleapis.com" data = [  beam.Row(entity_id="movie_01", title='The Shawshank Redemption'),  beam.Row(entity_id="movie_02", title="The Shining"),  beam.Row(entity_id="movie_04", title='The Dark Knight'), ]  vertex_ai_handler = VertexAIFeatureStoreLegacyEnrichmentHandler(  project=project_id,  location=location,  api_endpoint=api_endpoint,  entity_type_id='movies',  feature_store_id="movie_prediction_unique",  feature_ids=["title", "genres"],  row_key="entity_id", ) with beam.Pipeline() as p:  _ = (  p  | "Create" >> beam.Create(data)  | "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler)  | "Print" >> beam.Map(print))

Output:

Row(entity_id='movie_01', title='The Shawshank Redemption', genres='Drama') Row(entity_id='movie_02', title='The Shining', genres='Horror') Row(entity_id='movie_04', title='The Dark Knight', genres='Action')

Not applicable.

Pydoc Pydoc