User-defined functions
User-defined functions enable you to extend ksqlDB's suite of built-in functions using Java hooks. This section is a reference for how they work. Use the how-to guide to learn how to use them.
Data type mapping¶
Because SQL has a type system that is independent from Java’s, user-defined functions (UDFs) need to use specific Java types so that ksqlDB can manage the correspondence from SQL to Java. Below is the mapping to use for all UDF parameters and return types. Use boxed types when you want to tolerate null values.
SQL Type | Java Type |
---|---|
INT | int , java.lang.Integer |
BOOLEAN | boolean , java.lang.Boolean |
BIGINT | long , java.lang.Long |
DOUBLE | double , java.lang.Double |
DECIMAL | java.math.BigDecimal |
VARCHAR | java.lang.String |
BYTES | java.nio.ByteBuffer |
TIME | java.sql.Time |
DATE | java.sql.Date |
TIMESTAMP | java.sql.Timestamp |
ARRAY | java.util.List |
MAP | java.util.Map |
STRUCT | org.apache.kafka.connect.data.Struct |
BYTES | java.nio.ByteBuffer |
Note
Using Struct
or BigDecimal
in your functions requires specifying the schema by using paramSchema
, returnSchema
, aggregateSchema
, or a schema provider.
Classloading¶
How does ksqlDB choose which classes to load as user-defined functions? At start up time, ksqlDB scans the jars in its extensions directory looking for classes with UDF annotated. Each function that is found is parsed and, if successful, loaded into ksqlDB.
Each function instance has its own child-first ClassLoader
that is isolated from other functions. If you need to use any third-party libraries with your functions, they should also be part of your jar, which means that you should create an uberjar. The classes in your uberjar are loaded in preference to any classes on the ksqlDB classpath, excluding anything vital to the running of ksqlDB, i.e., classes that are part of org.apache.kafka
and io.confluent
.
Annotations¶
Annotations not only help ksqlDB figure out which classes will be used as UDFs, they also help commands like DESCRIBE FUNCTION
display helpful metadata.
Scalar functions¶
When a class is annotated with @UdfDescription
, it's scanned for any public methods that are annotated with @Udf
. If it matches, the class is loaded as a scalar function. Each method's parameters may optionally be annotated with @UdfParameter
. Here is what each of these annotations can be parameterized with.
@UdfDescription
¶
The @UdfDescription
annotation is applied at the class level.
Field | Description | Required |
---|---|---|
name | The case-insensitive name of the UDF(s) represented by this class. | Yes |
description | A string describing generally what the function(s) in this class do. | Yes |
category | For grouping similar functions in the output of SHOW FUNCTIONS. | No |
author | The author of the UDF. | No |
version | The version of the UDF. | No |
@Udf
¶
The @Udf
annotation is applied to public methods of a class annotated with @UdfDescription
. Each annotated method will become an invocable function in SQL.
Field | Description | Required |
---|---|---|
description | A string describing generally what a particular version of the UDF does. | No |
schema | The ksqlDB schema for the return type of this UDF. | For complex types such as STRUCT if schemaProvider is not passed in. |
schemaProvider | A reference to a method that computes the return schema of this UDF (e.g. dynamic return type). | For complex types, like STRUCT , if schema is not provided. |
@UdfParameter
¶
The @UdfParameter
annotation is applied to parameters of methods annotated with @Udf
. ksqlDB uses the information in the @UdfParameter
annotation to specify the parameter schema (if it can't be inferred from the Java type) and to convey metadata.
Field | Description | Required |
---|---|---|
value | The case-insensitive name of the parameter | Required if the UDF JAR was not compiled with the -parameters javac argument. |
description | A string describing generally what the parameter represents | No |
schema | The ksqlDB schema for the parameter. | For complex types, like STRUCT |
Note
If schema
is supplied in the @UdfParameter
annotation for a STRUCT
it is considered "strict" - any inputs must match exactly, including order and names of the fields.
If your Java 8 class is compiled with the -parameters
compiler flag, the name of the parameter will be inferred from the method declaration.
Tabular functions¶
When a class is annotated with @UdtfDescription
, it's scanned for any public methods that are annotated with @Udtf
. If it matches, the class is loaded as a tabular function. Each method's parameters may optionally be annotated with @UdfParameter
. Here is what each of these annotations can be parameterized with.
@UdtfDescription
¶
The @UdtfDescription
annotation is applied at the class level.
Field | Description | Required |
---|---|---|
name | The case-insensitive name of the UDTF(s) represented by this class. | Yes |
description | A string describing generally what the function(s) in this class do. | Yes |
author | The author of the UDTF. | No |
version | The version of the UDTF. | No |
@Udtf
¶
The @Udtf
annotation is applied to public methods of a class annotated with @UdtfDescription
. Each annotated method becomes an invocable function in SQL. This annotation supports the following fields:
Field | Description | Required |
---|---|---|
description | A string describing generally what a particular version of the UDTF does. | No |
schema | The ksqlDB schema for the return type of this UDTF. | For complex types such as STRUCT if schemaProvider is not passed in. |
schemaProvider | A reference to a method that computes the return schema of this UDTF. (e.g. dynamic return type). | For complex types such as STRUCT if schema is not passed in. |
@UdfParameter
¶
You can use the @UdfParameter
annotation to provide extra information for UDTF parameters. This is the same annotation as used for UDFs. Please see the earlier documentation on this for further information.
Aggregation functions¶
When a class is annotated with @UdafDescription
, it's scanned for any public static methods that are annotated with @UdafFactory
that return either Udaf
or TableUdaf
. If it matches, the class is loaded as an aggregation function. The factory function represents a collection of UDAFs all with the same name but may have different arguments and return types. Here is what each of these annotations can be parameterized with.
Both Udaf
and TableUdaf
are parameterized by three generic types:
I
is the input type of the UDAF.I
can be a tuple type, one ofPair
,Triple
,Quadruple
, orQuintuple
, when there are multiple column arguments.VariadicArgs
can be nested inside a tuple to create a variadic column argument. A function can have at most one variadic argument anywhere in its signature (including the parameters ofUdafFactory
). A variadic column argument may haveObject
as its type parameter to accept any number of columns of any type, though a variadicObject
factory argument is not supported. A variadic column argument outside a tuple is not supported.A
is the data type of the intermediate storage used to keep track of the state of the UDAF.O
is the data type of the return value.
Decoupling the data types of the state and return value enables you to define UDAFs like average
, as shown in the following example.
When you create a UDAF, you can use the map
method to provide the logic that transforms an intermediate aggregate value to the returned value.
The merge
method is only called when merging sessions when session windowing is used.
@UdafDescription
¶
The @UdafDescription
annotation is applied at the class level.
Field | Description | Required |
---|---|---|
name | The case-insensitive name of the UDAF(s) represented by this class. | Yes |
description | A string describing generally what the function(s) in this class do. | Yes |
author | The author of the UDF. | No |
version | The version of the UDF. | No |
@UdafFactory
¶
The @UdafFactory
annotation is applied to public static methods of a class annotated with @UdafDescription
. The method must return either Udaf
, or, if it supports table aggregations, TableUdaf
. Each annotated method is a factory for an invocable aggregate function in SQL. The annotation supports the following fields:
Field | Description | Required |
---|---|---|
description | A string describing generally what the function(s) in this class do. | Yes |
paramSchema | The ksqlDB schema(s) for the input parameter(s). If you provide fewer schemas than there are parameters, the schemas for the remaining parameters will default to being empty. if you provide more schemas than there are arguments, the extra schemas will be ignored. | For complex types, like STRUCT |
aggregateSchema | The ksqlDB schema for the intermediate state. | For complex types, like STRUCT |
returnSchema | The ksqlDB schema for the return value. | For complex types, like STRUCT |
Note
If paramSchema
, aggregateSchema
or returnSchema
is supplied in the @UdafFactory
annotation for a STRUCT
, it's considered "strict" - any inputs must match exactly, including order and names of the fields.
Null values¶
If a user defined function uses primitive types in its signature it is indicating that the parameter should never be null
. Conversely, using boxed types indicates the function can accept null
values for the parameter. It's up to the implementer of the UDF to choose which is the more appropriate. A common pattern is to return null
if the input is null
, though generally this is only for parameters that are expected to be supplied from the source row being processed.
For example, a substring(String str, int pos)
UDF might return null
if str
is null
, but a null
value for the pos
parameter would be treated as an error, and so should be a primitive. In fact, the built-in substring is more lenient and would return null
if pos
is null
).
The return type of a UDF can also be a primitive or boxed type. A primitive return type indicates the function will never return null
, whereas a boxed type indicates that it may return null
.
ksqlDB checks the value that's passed to each parameter and reports an error to the server log for any null
values being passed to a primitive type. The associated column in the output row will be null
.
Dynamic types¶
UDFs support dynamic return types that are resolved at runtime. This is useful if you want to implement a UDF with a non-deterministic return type, like DECIMAL
or STRUCT
. For example, a UDF that returns BigDecimal
, which maps to the SQL DECIMAL
type, may vary the precision and scale of the output based on the input schema.
To use this functionality, you need to specify a method with signature public SqlType <your-method-name>(final List<SqlArgument> params)
and annotate it with @UdfSchemaProvider
. Also, you need to link it to the corresponding UDF by using the schemaProvider=<your-method-name>
parameter of the @Udf
annotation.
When implementing dynamic returns for a UDTF function, if your method returns a value of type List<T>
, the type referred to by the schema provider method is the type T
, not the type List<T>
.
For dynamic UDAFs, the aggregate
or map
methods may depend on the input SQL type, so implementations of the Udaf
interface override some of the following three methods: initializeTypeArguments(List<SqlArgument> argTypeList)
, getAggregateSqlType()
, and getReturnSqlType()
.
Generics¶
A UDF declaration can utilize generics if they match the following conditions:
-
Any generic in the return value of a method must appear in at least one of the method parameters
-
The generic must not adhere to any interface. For example,
<T extends Number>
is not valid. -
The generic does not support type coercion or inheritance. For example,
add(T a, T b)
will acceptBIGINT, BIGINT
but notINT, BIGINT
.
External parameters¶
If the UDF class needs access to the ksqlDB Server configuration, it can implement org.apache.kafka.common.Configurable
. configure()
will be invoked with the map of server parameters. This can be useful for parameterizing a function on a per-deployment basis.
For security reasons, only settings whose name is prefixed with ksql.functions.<lowercase-udfname>.
or ksql.functions._global_.
are propagated to the UDF.
Security¶
Blacklisting¶
In some deployment environments, it may be necessary to restrict the classes that UDFs have access to, as they may represent a security risk. To reduce the attack surface of ksqlDB user defined functions you can optionally blacklist classes and packages so that they can't be used from a UDF. An example blacklist is in a file named resource-blacklist.txt
in the extensions directory. All of the entries in the default version of the file are commented out, but it shows how you can use the blacklist.
This file contains one entry per line, where each line is a class or package that should be blacklisted. The matching of the names is based on a regular expression, so if you have an entry, java.lang.Process
like this:
1 |
|
This matches any paths that begin with java.lang.Process
, like java.lang.Process
, java.lang.ProcessBuilder
, etc.
If you want to blacklist a single class, for example, java.lang.Compiler
, then you would add:
1 |
|
Any blank lines or lines beginning with #
are ignored. If the file is not present, or is empty, then no classes are blacklisted.
Security Manager¶
By default, ksqlDB installs a simple Java security manager for executing user defined functions. The security manager blocks attempts by any functions to fork processes from the ksqlDB Server. It also prevents them from calling System.exit(..)
.
You can disable the security manager by setting ksql.udf.enable.security.manager
to false
.
Disabling ksqlDB Custom Functions¶
You can disable the loading of all UDFs in the extensions directory by setting ksql.udfs.enabled
to false
. By default, they are enabled.
Metrics¶
Metric collection can be enabled by setting the config ksql.udf.collect.metrics
to true
. This defaults to false
and is generally not recommended for production usage, as metrics are collected on each invocation and introduce some overhead to processing time. See more details in the UDF metrics reference section.