This repository contains the source code for a MySQL loadable function library (previously called UDF - User Defined Functions), which provides some additonal SQL functions to interact with your MQTT server for publish and subscribe MQTT topics.
If you like lib_mysqludf_mqtt give it a star or fork it:
Ensure the Eclipse Paho C Client Library for the MQTT Protocol is installed.
Also install libjsonparser:
sudo apt install libjsonparser-dev
From the base directory run:
make sudo make install
This will build and install the library file.
To active the loadable function within your MySQL server run the follwoing SQL queries:
CREATE FUNCTION mqtt_info RETURNS STRING SONAME 'lib_mysqludf_mqtt.so'; CREATE FUNCTION mqtt_lasterror RETURNS STRING SONAME 'lib_mysqludf_mqtt.so'; CREATE FUNCTION mqtt_connect RETURNS INTEGER SONAME 'lib_mysqludf_mqtt.so'; CREATE FUNCTION mqtt_disconnect RETURNS INTEGER SONAME 'lib_mysqludf_mqtt.so'; CREATE FUNCTION mqtt_publish RETURNS INTEGER SONAME 'lib_mysqludf_mqtt.so'; CREATE FUNCTION mqtt_subscribe RETURNS STRING SONAME 'lib_mysqludf_mqtt.so';
To uninstall first deactive the loadable function within your MySQL server running the SQL queries:
DROP FUNCTION IF EXISTS mqtt_info; DROP FUNCTION IF EXISTS mqtt_lasterror; DROP FUNCTION IF EXISTS mqtt_connect; DROP FUNCTION IF EXISTS mqtt_disconnect; DROP FUNCTION IF EXISTS mqtt_publish; DROP FUNCTION IF EXISTS mqtt_subscribe;
Then uninstall the library file using command line:
sudo make uninstall
Connect to a mqtt server and returns a handle.
mqtt_connect(server {,[username]} {,[password] {,[options]}}})
Parameter in {}
are optional an can be omit.
Parameter in []
can be NULL
- in this case a default value is used.
To use optional parameters after omitting other optional parameters, use NULL
.
server
String- Specifying the server to which the client will connect. It takes the form
protocol://host:port
.
Currentlyprotocol
must betcp
orssl
.
Forhost
you can specify either an IP address or a hostname.
For instance, to connect to a server running on the local machines with the default MQTT port, specifytcp://localhost:1883
orssl://localhost:8883
for an SSL connection. username
String- Username for authentification. If
username
should remain unused, omit the parameter or set it toNULL
password
String- Password for authentification. If the
password
should remain unused, omit the parameter or set it toNULL
options
String- JSON string containing additonal options or NULL if unused. The following JSON objects are accept:
CApath
: String- Points to a directory containing CA certificates in PEM format
CAfile
: String- The file in PEM format containing the public digital certificates trusted by the client.
keyStore
: String- The file in PEM format containing the public certificate chain of the client. It may also include the client's private key.
privateKey
: String- If not included in the sslKeyStore, this setting points to the file in PEM format containing the client's private key.
privateKeyPassword
: String- The password to load the client's privateKey if encrypted.
enabledCipherSuites
: String- The list of cipher suites that the client will present to the server during the SSL handshake.
verify
: boolean- Whether to carry out post-connect checks, including that a certificate matches the given host name.
enableServerCertAuth
: boolean- True/False option to enable verification of the server certificate.
sslVersion
: integer- The SSL/TLS version to use. Specify one of:
0 = MQTT_SSL_VERSION_DEFAULT
1 = MQTT_SSL_VERSION_TLS_1_0
2 = MQTT_SSL_VERSION_TLS_1_1
3 = MQTT_SSL_VERSION_TLS_1_2 keepAliveInterval
: integer- The "keep alive" interval, measured in seconds, defines the maximum time that should pass without communication between the client and the server.
cleansession
: boolean- The cleansession setting controls the behaviour of both the client and the server at connection and disconnection time.
reliable
: boolean- This is a boolean value that controls how many messages can be in-flight simultaneously. Setting reliable to true means that a published message must be completed (acknowledgements received) before another can be sent.
MQTTVersion
: String- Sets the version of MQTT to be used on the connect:
0 = MQTTVERSION_DEFAULT start with 3.1.1, and if that fails, fall back to 3.1
3 = MQTTVERSION_3_1
4 = MQTTVERSION_3_1_1
5 = MQTTVERSION_5 maxInflightMessages
: integer- The maximum number of messages in flight
willTopic
: String- The LWT topic to which the LWT message will be published.
willMessage
: String- The LWT payload.
willRetained
: boolean- The retained flag for the LWT message.
willQos
: String- The quality of service setting for the LWT message
Returns a valid handle or 0 on error
Examples:
SET @client = (SELECT mqtt_connect('tcp://localhost:1883', NULL, NULL)); SET @client = (SELECT mqtt_connect('tcp://localhost:1883', 'myuser', 'mypasswd')); SET @client = (SELECT mqtt_connect('ssl://mqtt.eclipseprojects.io:8883', NULL, NULL, '{"verify":true,"CApath":"/etc/ssl/certs"}')); SET @client = (SELECT mqtt_connect('ssl://mqtt.eclipseprojects.io:8883', 'myuser', 'mypasswd', '{"verify":true,"CAfile":"/etc/ssl/certs/ISRG_Root_X1.pem"}'));
Disconnect from a mqtt server using previous requested handle by mqtt_connect()
.
mqtt_disconnect(handle, {timeout})
Parameter in {}
are optional an can be omit.
handle
BIGINT- Handle previously got from
mqtt_connect
. timeout
INT- Optional timeout in ms
Returns 0 if successful.
Example:
SELECT mqtt_disconnect(@client);
Publish a mqtt payload and returns its status.
Parameter in {}
are optional an can be omit.
Parameter in []
can be NULL
- in this case a default value is used.
To use optional parameters after omitting other optional parameters, use NULL
.
Possible call variants:
(1) mqtt_publish(server, [username], [password], topic, [payload] {,[qos] {,[retained] {,[timeout] {,[options]}}}})
(2) mqtt_publish(client, topic, [payload] {,[qos] {,[retained] {,[timeout] {,[options]}}}})
Variant (1) connects to MQTT, publish the payload and disconnnect after publish. This variant is provided for individual single mqtt_publish()
calls.
Because this variant may slow down when a lot of publishing should be done, you can do publish using variant (2) using a client handle from a previous mqtt_connect()
.
Variant (2) should be used for multiple mqtt_publish()
calls with a preceding mqtt_connect()
and a final mqtt_disconnect()
:
- Call
mqtt_connect()
to get a valid mqtt client connection handle - Call
mqtt_publish()
usinghandle
parameter - Repeat step 2. for your needs
- Call
mqtt_disconnect()
usinghandle
to free the client connection handle
server
String- Specifying the server to which the client will connect. It takes the form
protocol://host:port
.
Currentlyprotocol
must betcp
orssl
.
Forhost
you can specify either an IP address or a hostname.
For instance, to connect to a server running on the local machines with the default MQTT port, specifytcp://localhost:1883
orssl://localhost:8883
for an SSL connection. username
String- Username for authentification or
NULL
if unused password
String- Password for authentification or
NULL
if unused client
BIGINT- A valid handle returned from mqtt_connect() call.
topic
String- The topic to be published
payload
String- The message published for the topic
qos
INT [0..2] (default 0)- The QOS (Quality Of Service) number
retained
INT [0,1] (default 0)- Flag if message should be retained (1) or not (0)
timeout
INT- Timeout value for connecting to MQTT server (in ms)
options
String- JSON string containing additonal options or NULL if unused.
For details seemqtt_connect()
Returns 0 for success, otherwise error code from MQTTClient_connect() (see also http://www.eclipse.org/paho/files/mqttdoc/MQTTClient/html/_m_q_t_t_client_8h.html).
You can also retrieve the error code and description using mqtt_lasterror()
.
Examples:
SELECT mqtt_publish('tcp://localhost:1883', 'myuser', 'mypasswd', 'mytopic/time', NOW());
SET @client = (SELECT mqtt_connect('ssl://localhost:8883', 'myuser', 'mypasswd', '{"verify":true}')); SELECT IF(@client IS NOT NULL, mqtt_publish(@client, 'mytopic/LWT', "Online", NULL, 1), NULL); SELECT IF(@client IS NOT NULL, mqtt_publish(@client, 'mytopic/time', NOW())); SELECT IF(@client IS NOT NULL, mqtt_publish(@client, 'mytopic/hello', "world", 1, 1)); SELECT IF(@client IS NOT NULL, mqtt_publish(@client, 'mytopic/LWT', "Offline", NULL, 1), NULL); SELECT IF(@client IS NOT NULL, mqtt_disconnect(@client), NULL);
Subsribe to a mqtt topic and returns the payload if any..
Parameter in {}
are optional an can be omit.
Parameter in []
can be NULL
- in this case a default value is used.
To use optional parameters after omitting other optional parameters, use NULL
.
Possible call variants:
(1) mqtt_subscribe(server, [username], [password], topic, {,[qos] {,[timeout] {,[options]}}})
(2) mqtt_subscribe(client, topic, [payload] {,[qos] {,[timeout] {,[options]}}})
Variant (1) connects to MQTT, subscribes to a topic and disconnnect after subscribe. This variant is provided for individual single mqtt_subscribe()
calls.
Because this variant may slow down when a lot of subscribes should be done, you can do subscribes using variant (2) using a client handle from a previous mqtt_connect()
.
Vvariant (2) should be used for multiple mqtt_subscribe()
calls with a preceding mqtt_connect()
and a final mqtt_disconnect()
:
- Call
mqtt_connect()
to get a valid mqtt client connection handle - Call
mqtt_subscribe()
usinghandle
parameter - Repeat step 2. for your needs
- Call
mqtt_disconnect()
usinghandle
to free the client connection handle
server
String- Specifying the server to which the client will connect. It takes the form
protocol://host:port
.
Currentlyprotocol
must betcp
orssl
.
Forhost
you can specify either an IP address or a hostname.
For instance, to connect to a server running on the local machines with the default MQTT port, specifytcp://localhost:1883
orssl://localhost:8883
for an SSL connection. username
String- Username for authentification or
NULL
if unused password
String- Password for authentification or
NULL
if unused client
BIGINT- A valid handle returned from mqtt_connect() call.
topic
String- The topic to be published
payload
String- The message published for the topic
qos
INT [0..2] (default 0)- The QOS (Quality Of Service) number
retained
INT [0,1] (default 0)- Flag if message should be retained (1) or not (0)
timeout
INT- Timeout value for connecting to MQTT server (in ms)
options
String- JSON string containing additonal options or NULL if unused.
For details seemqtt_connect()
Returns 0 for success, otherwise error code from MQTTClient_connect() (see also http://www.eclipse.org/paho/files/mqttdoc/MQTTClient/html/_m_q_t_t_client_8h.html).
You can also retrieve the error code and description using mqtt_lasterror()
.
Examples:
SELECT mqtt_subscribe('tcp://localhost:1883', 'myuser', 'mypasswd', 'mytopic/time'); 2021-11-01 14:35:25
SET @client = (SELECT mqtt_connect('ssl://localhost:8883', 'myuser', 'mypasswd', '{"verify":true}')); SELECT IF(@client IS NOT NULL, mqtt_subscribe(@client, 'mytopic/time', NULL, 1000)); SELECT IF(@client IS NOT NULL, mqtt_subscribe(@client, 'mytopic/hello', 1, 5000)); SELECT IF(@client IS NOT NULL, mqtt_disconnect(@client), NULL);
Returns last error as JSON string
SELECT mqtt_lasterror();
Examples:
> SELECT mqtt_lasterror(); +----------------------------------------------------------------------+ | mqtt_lasterror() | +----------------------------------------------------------------------+ | {"func":"MQTTClient_connect","rc":5, "desc": "Unknown error code 5"} | +----------------------------------------------------------------------+
> SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_lasterror(),'$.rc')) AS rc, JSON_UNQUOTE(JSON_VALUE(mqtt_lasterror(),'$."func"')) AS 'func', JSON_UNQUOTE(JSON_VALUE(mqtt_lasterror(),'$."desc"')) AS 'desc'; +--------------------+------+----------------------+ | func | rc | desc | +--------------------+------+----------------------+ | MQTTClient_connect | 5 | Unknown error code 5 | +--------------------+------+----------------------+
Returns library info as JSON string
Examples:
> SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$.Name')) AS Name, JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Version"')) AS Version, JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Build"')) AS Build; +-------------------+---------+----------------------+ | Name | Version | Build | +-------------------+---------+----------------------+ | lib_mysqludf_mqtt | 1.0.0 | Nov 1 2021 09:31:40 | +-------------------+---------+----------------------+ > SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."Product name"')) AS Library, JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."Version"')) AS `Library Version`, JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."Build level"')) AS `Library Build`; +------------------------------------------------+-----------------+-------------------------------+ | Library | Library Version | Library Build | +------------------------------------------------+-----------------+-------------------------------+ | Eclipse Paho Synchronous MQTT C Client Library | 1.3.9 | Sa 23. Okt 11:09:35 CEST 2021 | +------------------------------------------------+-----------------+-------------------------------+ > SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."Product name"')) AS Library; +------------------------------------------------+ | Library | +------------------------------------------------+ | Eclipse Paho Synchronous MQTT C Client Library | +------------------------------------------------+ 1 row in set (0.001 sec) > SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."Version"')) AS `Library Version`; +-----------------+ | Library Version | +-----------------+ | 1.3.9 | +-----------------+ 1 row in set (0.001 sec) > SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."Build level"')) AS `Library Build`; +-------------------------------+ | Library Build | +-------------------------------+ | Sa 23. Okt 15:59:53 CEST 2021 | +-------------------------------+ 1 row in set (0.001 sec) > SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."OpenSSL platform"')) AS `OpenSSL platform`; +------------------------+ | OpenSSL platform | +------------------------+ | platform: debian-amd64 | +------------------------+ 1 row in set (0.000 sec) > SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."OpenSSL build timestamp"')) AS `OpenSSL build`; +----------------------------------------+ | OpenSSL build | +----------------------------------------+ | built on: Mon Aug 23 17:02:39 2021 UTC | +----------------------------------------+ 1 row in set (0.001 sec) > SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."OpenSSL version"')) AS `OpenSSL version`; +-----------------------------+ | OpenSSL version | +-----------------------------+ | OpenSSL 1.1.1f 31 Mar 2020 | +-----------------------------+ 1 row in set (0.000 sec) > SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."OpenSSL directory"')) AS `OpenSSL directory`; +----------------------------+ | OpenSSL directory | +----------------------------+ | OPENSSLDIR: '/usr/lib/ssl' | +----------------------------+ 1 row in set (0.000 sec) > SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."OpenSSL flags"')) AS `OpenSSL flags`; +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | OpenSSL flags | +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | compiler: gcc -fPIC -pthread -m64 -Wa,--noexecstack -Wall -Wa,--noexecstack -g -O2 -fdebug-prefix-map=/build/openssl-JWge0V/openssl-1.1.1f=. -fstack-protector-strong -Wformat -Werror=format-security -DOPENSSL_TLS_SECURITY_LEVEL=2 -DOPENSSL_USE_NODELETE -DL_ENDIAN -DOPENSSL_PIC -DOPENSSL_CPUID_OBJ -DOPENSSL_IA32_SSE2 -DOPENSSL_BN_ASM_MONT -DOPENSSL_BN_ASM_MONT5 -DOPENSSL_BN_ASM_GF2m -DSHA1_ASM -DSHA256_ASM -DSHA512_ASM -DKECCAK1600_ASM -DRC4_ASM -DMD5_ASM -DAESNI_ASM -DVPAES_ASM -DGHASH_ASM -DECP_NISTZ256_ASM -DX25519_ASM -DPOLY1305_ASM -DNDEBUG -Wdate-time -D_FORTIFY_SOURCE=2 | +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row in set (0.000 sec)
If you get some errors that system does not find the MQTT Paho library, make a copy from local lib dir to lib dir:
sudo cp /usr/local/lib/libpaho-mqtt3* /usr/lib/
Function like mqtt_lasterror()
returns a hex string instead of a JSON like 0x7B2266756E63223A224D5154544...
.
This is a client setting. To prevent this start your client with --binary-as-hex=0
If this is not possible, convert the hex-string into a readable one using CONVERT():
SELECT CONVERT(mqtt_info() USING utf8);