Skip to main content
RisingWave provides a Python SDK risingwave-py to help users develop event-driven applications. This SDK provides a simple way to perform ad-hoc queries, subscribe to changes, and define event handlers for tables and materialized views, making it easier to integrate real-time data into applications.

Use risingwave-py to connect to RisingWave

risingwave-py is a RisingWave Python SDK that provides the following capabilities:
  • Interact with RisingWave via Pandas DataFrame.
  • Subscribe and process changes from RisingWave tables or materialized views.
  • Run SQL commands supported in RisingWave.

Run RisingWave

To learn about how to run RisingWave, see Run RisingWave.

Connect to RisingWave

To connect to RisingWave via risingwave-py:
from risingwave import RisingWave, RisingWaveConnOptions  # Connect to RisingWave instance on localhost with named parameters rw = RisingWave(  RisingWaveConnOptions.from_connection_info(  host="localhost", port=4566, user="root", password="root", database="dev"  ) )  # Connect to RisingWave instance on localhost with connection string rw = RisingWave(RisingWaveConnOptions("postgresql://root:root@localhost:4566/dev"))  # You can create a new SQL connection and execute operations under the with statement.  # This is the recommended way for python sdk usage. with rw.getconn() as conn:  conn.insert(...)  conn.fetch(...)  conn.execute(...)  conn.mv(...)  conn.on_change(...)   # You can also use the existing connection created by RisingWave object to execute operations. # This will be used in the later sections for simplicity. rw.insert(...) rw.fetch(...) rw.execute(...) rw.mv(...) rw.on_change(...)  

Ingestion into RisingWave

Load a Pandas DataFrame into RisingWave:
from datetime import datetime import pandas as pd  df = pd.DataFrame(  {  "product": ["foo", "bar"],  "price": [123.4, 456.7],  "ts": [datetime.strptime("2023-10-05 14:30:00", "%Y-%m-%d %H:%M:%S"),   datetime.strptime("2023-10-05 14:31:20", "%Y-%m-%d %H:%M:%S")],  } )  # A test table will be created if not exist in risingwave with the correct schema rw.insert(table_name="test", data=df)  # You can provide an optional force_flush parameter and set it to True # if you would the inserted data to be visible in fetch query immediately. # Otherwise, data will be inserted in batches asynchronously for better performance. # rw.insert(table_name="test", data=df, force_flush = True) 
Load data into RisingWave from external systems:
# Create a table and load data from upstream kafka rw.execute("""  CREATE TABLE IF NOT EXISTS source_abc  WITH (  connector='kafka',  properties.bootstrap.server='localhost:9092',  topic='test_topic'  )   FORMAT UPSERT ENCODE AVRO (  schema.registry = 'http://127.0.0.1:8081',  schema.registry.username='your_schema_registry_username',  schema.registry.password='your_schema_registry_password' )""") 
For supported sources and the SQL syntax, see this topic.

Query from RisingWave

from risingwave import OutputFormat  result: pd.DataFrame = rw.fetch("""  SELECT window_start, window_end, product, ROUND(avg(price)) as avg_price  FROM tumble(test, ts, interval '10 seconds')   GROUP BY window_start, window_end, product""",   format=OutputFormat.DATAFRAME)  print(result) # Output: # window_start window_end product avg_price # 0 2023-10-05 14:31:20 2023-10-05 14:31:30 bar 457.0 # 1 2023-10-05 14:30:00 2023-10-05 14:30:10 foo 123.0  # You can also use OutputFormat.RAW to get back list of tuples as the query results # rw.fetch("...", format=OutputFormat.RAW) # [(datetime.datetime(2023, 10, 5, 14, 31, 20), datetime.datetime(2023, 10, 5, 14, 31, 30), 'bar', 457.0),  # (datetime.datetime(2023, 10, 5, 14, 30), datetime.datetime(2023, 10, 5, 14, 30, 10), 'foo', 123.0)] 

Event-driven processing with RisingWave

Event-driven applications depend on real-time data processing to react to events as they occur. With risingwave-py, you can define materialized views using SQL and run them in RisingWave. Behind the scenes, events are processed continuously, and the results are incrementally maintained. In the following example, test_mv is created to incrementally maintain the result of the defined SQL as events are ingested in to the test table.
mv = rw.mv(name="test_mv",  stmt="""SELECT window_start, window_end, product, ROUND(avg(price)) as avg_price  FROM tumble(test, ts, interval '10 seconds')   GROUP BY window_start, window_end, product""") 
In addition to using SQL to do ad-hoc query on tables and materialized views. With risingwave-py, You can also subscribe changes from table / materialized view and define handler of the change events from table / materialized view for you applications.
# Write your own handler # the event will contains all fields of the subscribed MV/Table plus two additional columns: # - op: varchar. The change operations. Valid values: [Insert, UpdateInsert, Delete, UpdateDelete].  # The reason why we have UpdateInsert and UpdateDelete is because RisingWave treats an UPDATE  # as a delete of the old value followed by an insert of the new value. # - rw_timestamp: bigint. The Unix timestamp in milliseconds when the data was written. def simple_event_handler(event: pd.DataFrame):  for _, row in event.iterrows():  # Trigger an action (e.g. place an order via REST API) when the avg_price exceeds 300  if (row["op"] == "UpdateInsert" or row["op"] == "Insert") and row["avg_price"] >= 300:  print(  f"{row['window_start']} - {row['window_end']}: {row['product']} avg price {row['avg_price']} exceeds 300")  # ...   import threading  # Subscribe changes of a materialized view and feed it to your own handler. # Run on_change in a separate thread without blocking the main thread. threading.Thread(  target=lambda: mv.on_change(  # Pass your handler here  handler = simple_event_handler,  # Support DATAFRAME and RAW tuples here  output_format=OutputFormat.DATAFRAME,  # If set to True, progress of the subscription will be saved and can be recovered on python application crashed  persist_progress=True,   # Maximal number of rows returned each time  max_batch_size = 10)  ).start()  # Subscribe changes of a table and print them to console. # Run on_change in a separate thread without blocking the main thread. threading.Thread(  target=lambda: rw.on_change(  subscribe_from="test",  handler = lambda data: print(data),  output_format=OutputFormat.RAW,  persist_progress=False,   max_batch_size = 5)  ).start()   # Future inserted data into the base table will be reflected in the subscriptions # df = pd.DataFrame( # { # "product": ["foo", "bar"], # "price": [1000.2, 5000.4], # "ts": [datetime.strptime("2023-10-05 17:30:00", "%Y-%m-%d %H:%M:%S"),  # datetime.strptime("2023-10-05 17:31:20", "%Y-%m-%d %H:%M:%S")], # } # ) # rw.insert(table_name="test", data=df) 
For more details, please refer to the risingwave-py GitHub repo.