# FlinkSQL 中怎么自定义 UDF ## 一、UDF 概述 ### 1.1 什么是 UDF UDF(User Defined Function)即用户自定义函数,是数据库和数据处理系统中常见的扩展机制。在 FlinkSQL 中,UDF 允许用户通过编程方式扩展 SQL 的功能,实现内置函数无法完成的特殊计算逻辑。 ### 1.2 FlinkSQL 中 UDF 的类型 Flink 主要支持三种 UDF 类型: 1. **Scalar Function**:一对一转换,输入一行输出一个值 2. **Table Function**:一对多转换,输入一行输出多行(通过 `LATERAL TABLE` 调用) 3. **Aggregate Function**:多对一转换,聚合多行输出一个值 ## 二、开发环境准备 ### 2.1 项目依赖配置 在 Maven 项目中需要添加以下依赖: ```xml <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.15.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> <version>1.15.0</version> </dependency>
推荐使用 IntelliJ IDEA 或 Eclipse 进行开发,确保安装: - Java 8+ SDK - Maven 3.2+ - Scala 插件(如需混合开发)
import org.apache.flink.table.functions.ScalarFunction; public class MyConcatFunction extends ScalarFunction { public String eval(String a, String b) { return a + "-" + b; } }
public class JsonParser extends ScalarFunction { private static final ObjectMapper mapper = new ObjectMapper(); public String eval(String json, String field) throws Exception { JsonNode node = mapper.readTree(json); return node.get(field).asText(); } }
// 在 TableEnvironment 中注册 tableEnv.createTemporarySystemFunction("my_concat", MyConcatFunction.class); // SQL 中使用 tableEnv.executeSql("SELECT my_concat(name, desc) FROM products");
import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; public class SplitFunction extends TableFunction<Row> { public void eval(String str, String delimiter) { for (String s : str.split(delimiter)) { collect(Row.of(s)); } } @Override public DataType getResultType(Object[] arguments, Class[] argTypes) { return DataTypes.ROW(DataTypes.FIELD("item", DataTypes.STRING())); } }
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>")) public class AdvancedSplit extends TableFunction<Row> { public void eval(String str) { for (String s : str.split("\\s+")) { collect(Row.of(s, s.length())); } } }
SELECT user_id, t.word, t.length FROM comments, LATERAL TABLE(advanced_split(content)) AS t(word, length)
public class WeightedAvgAccum { public long sum = 0; public int count = 0; }
import org.apache.flink.table.functions.AggregateFunction; public class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccum> { @Override public WeightedAvgAccum createAccumulator() { return new WeightedAvgAccum(); } public void accumulate(WeightedAvgAccum acc, Integer value, Integer weight) { acc.sum += value * weight; acc.count += weight; } @Override public Double getValue(WeightedAvgAccum acc) { return acc.count == 0 ? null : (double)acc.sum / acc.count; } }
retract()
方法支持回撤merge()
方法提高分布式计算效率public class OverloadedFunc extends ScalarFunction { public Integer eval(Integer a, Integer b) { return a + b; } public String eval(String a, String b) { return a.concat(b); } }
public class ConcatWS extends ScalarFunction { public String eval(String delimiter, String... parts) { return String.join(delimiter, parts); } }
@FunctionHint( input = {@DataTypeHint("INT"), @DataTypeHint("INT")}, output = @DataTypeHint("INT") ) public class SafeDivide extends ScalarFunction { public Integer eval(Integer a, Integer b) { return b == 0 ? null : a / b; } }
public class UDFTest { @Test public void testConcat() { MyConcatFunction func = new MyConcatFunction(); assertEquals("a-b", func.eval("a", "b")); } }
@FunctionHint
提前声明类型避免运行时推断parse_json_v2
)public class MonitoredFunction extends ScalarFunction { @Override public void open(FunctionContext context) { // 初始化指标收集 } public String eval(String input) { long start = System.currentTimeMillis(); // ...处理逻辑 // 记录执行时间 return result; } }
通过本文的详细讲解,您应该已经掌握了在 FlinkSQL 中开发各类 UDF 的方法。实际开发中建议从简单场景入手,逐步扩展到复杂函数实现,同时注意性能优化和生产环境的最佳实践。 “`
这篇文章共计约2700字,采用Markdown格式编写,包含: 1. 完整的UDF实现分类说明 2. 详细的代码示例和最佳实践 3. 从基础到高级的渐进式讲解 4. 生产环境注意事项 5. 格式化的代码块和清晰的结构划分
可根据需要调整具体实现示例或补充特定场景的案例。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。