Integrate with Flink

Overview #

Flink Agents is an Agentic AI framework based on Apache Flink. By integrate agent with flink DataStream/Table, Flink Agents can leverage the powerful data processing ability of Flink.

First of all, get the flink StreamExecutionEnvironment and flink-agents AgentsExecutionEnvironment.

# Set up the Flink streaming environment and the Agents execution environment. env = StreamExecutionEnvironment.get_execution_environment() agents_env = AgentsExecutionEnvironment.get_execution_environment(env) 
// Set up the Flink streaming environment and the Agents execution environment. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); AgentsExecutionEnvironment agentsEnv =  AgentsExecutionEnvironment.getExecutionEnvironment(env); 

Integrate the agent with input DataStream, and return the output DataStream can be consumed by downstream.

# create input datastream input_stream = env.from_source(...)  # integrate agent with input datastream, and return output datastream output_stream = (  agents_env.from_datastream(  input=input_stream, key_selector=lambda x: x.id  )  .apply(your_agent)  .to_datastream() )  # consume agent output datastream output_stream.print() 
// create input datastream DataStream<String> inputStream = env.fromSource(...);  // integrate agent with input datastream, and return output datastream DataStream<Object> outputStream =  agentsEnv  .fromDataStream(inputStream, (KeySelector<YourPojo, String>) x::getId)  .apply(yourAgent)  .toDataStream();  // consume agent output datastream outputStream.print(); 

The input DataStream must be KeyedStream, or user should provide KeySelector to tell how to convert the input DataStream to KeyedStream.

First of all, get the flink StreamExecutionEnvironment, StreamTableEnvironment, and flink-agents AgentsExecutionEnvironment.

# Set up the Flink streaming environment and table environment env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env)  # Setup flink agents execution environment agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env, t_env=t_env) 
// Set up the Flink streaming environment and table environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);  // Setup flink agents execution environment AgentsExecutionEnvironment agentsEnv =  AgentsExecutionEnvironment.getExecutionEnvironment(env, tableEnv); 

Integrate the agent with input Table, and return the output Table can be consumed by downstream.

input_table = t_env.from_elements(...)  output_type = ExternalTypeInfo(RowTypeInfo(  [BasicTypeInfo.INT_TYPE_INFO()],  ["result"], ))  schema = (Schema.new_builder().column("result", DataTypes.INT())).build()  output_table = (  agents_env.from_table(input=input_table, key_selector=MyKeySelector())  .apply(agent)  .to_table(schema=schema, output_type=output_type) ) 
Table inputTable = tableEnv.fromValues(...);  // Here the output schema should always be a nested row, of which // the f0 column is the expected row. Schema outputSchema =  Schema.newBuilder()  .column("f0", DataTypes.ROW(DataTypes.FIELD("result", DataTypes.DOUBLE())))  .build();  Table outputTable =  agentsEnv  .fromTable(  inputTable,  myKeySelector)  .apply(agent)  .toTable(outputSchema); 

User should provide KeySelector in from_table() to tell how to convert the input Table to KeyedStream internally. And provide Schema and TypeInfomation in to_table() to tell the output Table schema.

Currently, user should provide both Schema and TypeInformation when call to_table(), we will support only provide one of them in the future.