MQTT

MQTT pubsub组件的详细文档

组件格式

要配置MQTT pub/sub,您需要创建一个类型为pubsub.mqtt的组件。请参阅pub/sub broker组件文件以了解ConsumerID的自动生成方式。阅读操作指南:发布和订阅指南以了解如何创建和应用pub/sub配置。

apiVersion: dapr.io/v1alpha1 kind: Component metadata:  name: mqtt-pubsub spec:  type: pubsub.mqtt  version: v1  metadata:  - name: url  value: "tcp://[username][:password]@host.domain[:port]"  - name: qos  value: 1  - name: retain  value: "false"  - name: cleanSession  value: "false"  - name: consumerID  value: "channel1" 

规格元数据字段

字段必需详情示例
urlYMQTT broker的地址。可以使用secretKeyRef来引用密钥。
对于非TLS通信,使用**tcp://** URI方案。
对于TLS通信,使用**ssl://** URI方案。
"tcp://[username][:password]@host.domain[:port]"
consumerIDN用于连接到MQTT broker的消费者连接的客户端ID。默认为Dapr应用ID。
注意:如果未设置producerID,则在此值后附加-consumer用于消费者连接
可以设置为字符串值(如上例中的"channel1")或字符串格式值(如"{podName}"等)。查看可以在组件元数据中使用的所有模板标签。
producerIDN用于连接到MQTT broker的生产者连接的客户端ID。默认为{consumerID}-producer"myMqttProducerApp"
qosN表示消息的服务质量级别(QoS)(更多信息)。默认为10, 1, 2
retainN定义broker是否将消息保存为指定主题的最后已知良好值。默认为"false""true", "false"
cleanSessionN如果为"true",则在连接消息中设置clean_session标志到MQTT broker(更多信息)。默认为"false""true", "false"
caCert使用TLS时必需用于验证服务器TLS证书的证书颁发机构(CA)证书,格式为PEM。"-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientCert使用TLS时必需TLS客户端证书,格式为PEM。必须与clientKey一起使用。"-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientKey使用TLS时必需TLS客户端密钥,格式为PEM。必须与clientCert一起使用。可以使用secretKeyRef来引用密钥。"-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----"

启用消息传递重试

MQTT pub/sub组件不支持内置的重试策略。这意味着sidecar只会向服务发送一次消息。如果服务标记消息为未处理,则消息不会被确认回broker。只有当broker重新发送消息时,才会重试。

要使Dapr使用更复杂的重试策略,可以将重试弹性策略应用于MQTT pub/sub组件。

两种重试方式之间有一个关键区别:

  1. 未确认消息的重新传递完全依赖于broker。Dapr不保证这一点。一些broker如emqxvernemq等支持它,但它不是MQTT3规范的一部分。

  2. 使用重试弹性策略使得同一个Dapr sidecar重试重新传递消息。因此是同一个Dapr sidecar和同一个应用接收相同的消息。

使用TLS进行通信

要配置使用TLS进行通信,请确保MQTT broker(例如,mosquitto)配置为支持证书,并在组件配置中提供caCertclientCertclientKey元数据。例如:

apiVersion: dapr.io/v1alpha1 kind: Component metadata:  name: mqtt-pubsub spec:  type: pubsub.mqtt  version: v1  metadata:  - name: url  value: "ssl://host.domain[:port]"  - name: qos  value: 1  - name: retain  value: "false"  - name: cleanSession  value: "false"  - name: caCert  value: ${{ myLoadedCACert }}  - name: clientCert  value: ${{ myLoadedClientCert }}  - name: clientKey  secretKeyRef:  name: myMqttClientKey  key: myMqttClientKey auth:  secretStore: <SECRET_STORE_NAME> 

注意,虽然caCertclientCert值可能不是密钥,但为了方便起见,它们也可以从Dapr密钥存储中引用。

消费共享主题

在消费共享主题时,每个消费者必须有一个唯一标识符。默认情况下,应用ID用于唯一标识每个消费者和发布者。在selfhost模式下,调用每个dapr run时使用不同的应用ID即可使它们从同一个共享主题中消费。然而,在Kubernetes上,应用pod的多个实例将共享相同的应用ID,禁止所有实例消费同一个主题。为了解决这个问题,配置组件的consumerID元数据为{uuid}标签,这将在启动时为每个实例提供一个随机生成的consumerID值。例如:

apiVersion: dapr.io/v1alpha1 kind: Component metadata:  name: mqtt-pubsub spec:  type: pubsub.mqtt  version: v1  metadata:  - name: consumerID  value: "{uuid}"  - name: url  value: "tcp://admin:public@localhost:1883"  - name: qos  value: 1  - name: retain  value: "false"  - name: cleanSession  value: "true" 

注意,在这种情况下,每次Dapr重启时,consumer ID的值都是随机的,因此我们也将cleanSession设置为true。

创建MQTT broker

您可以使用Docker本地运行MQTT broker:

docker run -d -p 1883:1883 -p 9001:9001 --name mqtt eclipse-mosquitto:1.6 

然后您可以使用客户端端口与服务器交互:mqtt://localhost:1883

您可以在kubernetes中使用以下yaml运行MQTT broker:

apiVersion: apps/v1 kind: Deployment metadata:  name: mqtt-broker  labels:  app-name: mqtt-broker spec:  replicas: 1  selector:  matchLabels:  app-name: mqtt-broker  template:  metadata:  labels:  app-name: mqtt-broker  spec:  containers:  - name: mqtt  image: eclipse-mosquitto:1.6  imagePullPolicy: IfNotPresent  ports:  - name: default  containerPort: 1883  protocol: TCP  - name: websocket  containerPort: 9001  protocol: TCP --- apiVersion: v1 kind: Service metadata:  name: mqtt-broker  labels:  app-name: mqtt-broker spec:  type: ClusterIP  selector:  app-name: mqtt-broker  ports:  - port: 1883  targetPort: default  name: default  protocol: TCP  - port: 9001  targetPort: websocket  name: websocket  protocol: TCP 

然后您可以使用客户端端口与服务器交互:tcp://mqtt-broker.default.svc.cluster.local:1883

相关链接