MQTT is a lightweight publish/subscribe messaging protocol designed for M2M (machine to machine) telemetry in low bandwidth environments. Fastapi-mqtt is the client for working with MQTT.
For more information about MQTT, please refer to here: MQTT
Fastapi-mqtt wraps around gmqtt module. Gmqtt Python async client for MQTT client implementation. Module has support of MQTT version 5.0 protocol
Documentation: FastApi-MQTT
The key feature are:
MQTT specification avaliable with help decarator methods using callbacks:
-
on_connect()
-
on_disconnect()
-
on_subscribe()
-
on_message()
-
subscribe(topic)
-
MQTT Settings available with
pydanticclass -
Authentication to broker with credentials
-
unsubscribe certain topics and publish to certain topics
pip install fastapi-mqttfrom contextlib import asynccontextmanager from typing import Any from fastapi import FastAPI from gmqtt import Client as MQTTClient from fastapi_mqtt import FastMQTT, MQTTConfig mqtt_config = MQTTConfig() fast_mqtt = FastMQTT(config=mqtt_config) @asynccontextmanager async def _lifespan(_app: FastAPI): await fast_mqtt.mqtt_startup() yield await fast_mqtt.mqtt_shutdown() app = FastAPI(lifespan=_lifespan) @fast_mqtt.on_connect() def connect(client: MQTTClient, flags: int, rc: int, properties: Any): client.subscribe("/mqtt") # subscribing mqtt topic print("Connected: ", client, flags, rc, properties) @fast_mqtt.subscribe("mqtt/+/temperature", "mqtt/+/humidity", qos=1) async def home_message(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any): print("temperature/humidity: ", topic, payload.decode(), qos, properties) @fast_mqtt.on_message() async def message(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any): print("Received message: ", topic, payload.decode(), qos, properties) @fast_mqtt.subscribe("my/mqtt/topic/#", qos=2) async def message_to_topic_with_high_qos( client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any ): print( "Received message to specific topic and QoS=2: ", topic, payload.decode(), qos, properties ) @fast_mqtt.on_disconnect() def disconnect(client: MQTTClient, packet, exc=None): print("Disconnected") @fast_mqtt.on_subscribe() def subscribe(client: MQTTClient, mid: int, qos: int, properties: Any): print("subscribed", client, mid, qos, properties) @app.get("/test") async def func(): fast_mqtt.publish("/mqtt", "Hello from Fastapi") # publishing mqtt topic return {"result": True, "message": "Published"}Publish method:
async def func(): fast_mqtt.publish("/mqtt", "Hello from Fastapi") # publishing mqtt topic return {"result": True, "message": "Published"}Subscribe method:
@fast_mqtt.on_connect() def connect(client, flags, rc, properties): client.subscribe("/mqtt") # subscribing mqtt topic print("Connected: ", client, flags, rc, properties)Changing connection params
mqtt_config = MQTTConfig( host="mqtt.mosquito.org", port=1883, keepalive=60, username="username", password="strong_password", ) fast_mqtt = FastMQTT(config=mqtt_config)- Clone the repository and install it with
poetry. - Run tests with
pytest, using an external MQTT broker to connect (defaults to 'test.mosquitto.org'). - Explore the fastapi app examples and run them with uvicorn
# (opc) Run a local mosquitto MQTT broker with docker docker run -d --name mosquitto -p 9001:9001 -p 1883:1883 eclipse-mosquitto:1.6.15 # Set host for test broker when running pytest TEST_BROKER_HOST=localhost pytest # Run the example apps against local broker, with uvicorn TEST_BROKER_HOST=localhost uvicorn examples.app:app --port 8000 --reload TEST_BROKER_HOST=localhost uvicorn examples.ws_app.app:application --port 8000 --reloadFell free to open issue and send pull request.
Thanks To Contributors. Contributions of any kind are welcome!
Before you start please read CONTRIBUTING