Overview #
Flink Agents is an Agentic AI framework based on Apache Flink. By integrate agent with flink DataStream/Table, Flink Agents can leverage the powerful data processing ability of Flink.
From/To Flink DataStream API #
First of all, get the flink StreamExecutionEnvironment and flink-agents AgentsExecutionEnvironment.
# Set up the Flink streaming environment and the Agents execution environment. env = StreamExecutionEnvironment.get_execution_environment() agents_env = AgentsExecutionEnvironment.get_execution_environment(env) // Set up the Flink streaming environment and the Agents execution environment. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); AgentsExecutionEnvironment agentsEnv = AgentsExecutionEnvironment.getExecutionEnvironment(env); Integrate the agent with input DataStream, and return the output DataStream can be consumed by downstream.
# create input datastream input_stream = env.from_source(...) # integrate agent with input datastream, and return output datastream output_stream = ( agents_env.from_datastream( input=input_stream, key_selector=lambda x: x.id ) .apply(your_agent) .to_datastream() ) # consume agent output datastream output_stream.print() // create input datastream DataStream<String> inputStream = env.fromSource(...); // integrate agent with input datastream, and return output datastream DataStream<Object> outputStream = agentsEnv .fromDataStream(inputStream, (KeySelector<YourPojo, String>) x::getId) .apply(yourAgent) .toDataStream(); // consume agent output datastream outputStream.print(); The input DataStream must be KeyedStream, or user should provide KeySelector to tell how to convert the input DataStream to KeyedStream.
From/To Flink Table API #
First of all, get the flink StreamExecutionEnvironment, StreamTableEnvironment, and flink-agents AgentsExecutionEnvironment.
# Set up the Flink streaming environment and table environment env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) # Setup flink agents execution environment agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env, t_env=t_env) // Set up the Flink streaming environment and table environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // Setup flink agents execution environment AgentsExecutionEnvironment agentsEnv = AgentsExecutionEnvironment.getExecutionEnvironment(env, tableEnv); Integrate the agent with input Table, and return the output Table can be consumed by downstream.
input_table = t_env.from_elements(...) output_type = ExternalTypeInfo(RowTypeInfo( [BasicTypeInfo.INT_TYPE_INFO()], ["result"], )) schema = (Schema.new_builder().column("result", DataTypes.INT())).build() output_table = ( agents_env.from_table(input=input_table, key_selector=MyKeySelector()) .apply(agent) .to_table(schema=schema, output_type=output_type) ) Table inputTable = tableEnv.fromValues(...); // Here the output schema should always be a nested row, of which // the f0 column is the expected row. Schema outputSchema = Schema.newBuilder() .column("f0", DataTypes.ROW(DataTypes.FIELD("result", DataTypes.DOUBLE()))) .build(); Table outputTable = agentsEnv .fromTable( inputTable, myKeySelector) .apply(agent) .toTable(outputSchema); User should provide KeySelector in from_table() to tell how to convert the input Table to KeyedStream internally. And provide Schema and TypeInfomation in to_table() to tell the output Table schema.
Currently, user should provide bothSchemaandTypeInformationwhen callto_table(), we will support only provide one of them in the future.