MQTT
组件格式
要配置MQTT pub/sub,您需要创建一个类型为pubsub.mqtt的组件。请参阅pub/sub broker组件文件以了解ConsumerID的自动生成方式。阅读操作指南:发布和订阅指南以了解如何创建和应用pub/sub配置。
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: mqtt-pubsub spec: type: pubsub.mqtt version: v1 metadata: - name: url value: "tcp://[username][:password]@host.domain[:port]" - name: qos value: 1 - name: retain value: "false" - name: cleanSession value: "false" - name: consumerID value: "channel1" 警告
上述示例中使用了明文字符串作为密钥。建议使用密钥存储来保护密钥,详情请参阅这里。规格元数据字段
| 字段 | 必需 | 详情 | 示例 |
|---|---|---|---|
| url | Y | MQTT broker的地址。可以使用secretKeyRef来引用密钥。对于非TLS通信,使用** tcp://** URI方案。对于TLS通信,使用** ssl://** URI方案。 | "tcp://[username][:password]@host.domain[:port]" |
| consumerID | N | 用于连接到MQTT broker的消费者连接的客户端ID。默认为Dapr应用ID。 注意:如果未设置 producerID,则在此值后附加-consumer用于消费者连接 | 可以设置为字符串值(如上例中的"channel1")或字符串格式值(如"{podName}"等)。查看可以在组件元数据中使用的所有模板标签。 |
| producerID | N | 用于连接到MQTT broker的生产者连接的客户端ID。默认为{consumerID}-producer。 | "myMqttProducerApp" |
| qos | N | 表示消息的服务质量级别(QoS)(更多信息)。默认为1。 | 0, 1, 2 |
| retain | N | 定义broker是否将消息保存为指定主题的最后已知良好值。默认为"false"。 | "true", "false" |
| cleanSession | N | 如果为"true",则在连接消息中设置clean_session标志到MQTT broker(更多信息)。默认为"false"。 | "true", "false" |
| caCert | 使用TLS时必需 | 用于验证服务器TLS证书的证书颁发机构(CA)证书,格式为PEM。 | "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----" |
| clientCert | 使用TLS时必需 | TLS客户端证书,格式为PEM。必须与clientKey一起使用。 | "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----" |
| clientKey | 使用TLS时必需 | TLS客户端密钥,格式为PEM。必须与clientCert一起使用。可以使用secretKeyRef来引用密钥。 | "-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----" |
启用消息传递重试
MQTT pub/sub组件不支持内置的重试策略。这意味着sidecar只会向服务发送一次消息。如果服务标记消息为未处理,则消息不会被确认回broker。只有当broker重新发送消息时,才会重试。
要使Dapr使用更复杂的重试策略,可以将重试弹性策略应用于MQTT pub/sub组件。
两种重试方式之间有一个关键区别:
未确认消息的重新传递完全依赖于broker。Dapr不保证这一点。一些broker如emqx、vernemq等支持它,但它不是MQTT3规范的一部分。
使用重试弹性策略使得同一个Dapr sidecar重试重新传递消息。因此是同一个Dapr sidecar和同一个应用接收相同的消息。
使用TLS进行通信
要配置使用TLS进行通信,请确保MQTT broker(例如,mosquitto)配置为支持证书,并在组件配置中提供caCert、clientCert、clientKey元数据。例如:
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: mqtt-pubsub spec: type: pubsub.mqtt version: v1 metadata: - name: url value: "ssl://host.domain[:port]" - name: qos value: 1 - name: retain value: "false" - name: cleanSession value: "false" - name: caCert value: ${{ myLoadedCACert }} - name: clientCert value: ${{ myLoadedClientCert }} - name: clientKey secretKeyRef: name: myMqttClientKey key: myMqttClientKey auth: secretStore: <SECRET_STORE_NAME> 注意,虽然caCert和clientCert值可能不是密钥,但为了方便起见,它们也可以从Dapr密钥存储中引用。
消费共享主题
在消费共享主题时,每个消费者必须有一个唯一标识符。默认情况下,应用ID用于唯一标识每个消费者和发布者。在selfhost模式下,调用每个dapr run时使用不同的应用ID即可使它们从同一个共享主题中消费。然而,在Kubernetes上,应用pod的多个实例将共享相同的应用ID,禁止所有实例消费同一个主题。为了解决这个问题,配置组件的consumerID元数据为{uuid}标签,这将在启动时为每个实例提供一个随机生成的consumerID值。例如:
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: mqtt-pubsub spec: type: pubsub.mqtt version: v1 metadata: - name: consumerID value: "{uuid}" - name: url value: "tcp://admin:public@localhost:1883" - name: qos value: 1 - name: retain value: "false" - name: cleanSession value: "true" 警告
上述示例中使用了明文字符串作为密钥。建议使用密钥存储来保护密钥,详情请参阅这里。注意,在这种情况下,每次Dapr重启时,consumer ID的值都是随机的,因此我们也将cleanSession设置为true。
创建MQTT broker
您可以使用Docker本地运行MQTT broker:
docker run -d -p 1883:1883 -p 9001:9001 --name mqtt eclipse-mosquitto:1.6 然后您可以使用客户端端口与服务器交互:mqtt://localhost:1883
您可以在kubernetes中使用以下yaml运行MQTT broker:
apiVersion: apps/v1 kind: Deployment metadata: name: mqtt-broker labels: app-name: mqtt-broker spec: replicas: 1 selector: matchLabels: app-name: mqtt-broker template: metadata: labels: app-name: mqtt-broker spec: containers: - name: mqtt image: eclipse-mosquitto:1.6 imagePullPolicy: IfNotPresent ports: - name: default containerPort: 1883 protocol: TCP - name: websocket containerPort: 9001 protocol: TCP --- apiVersion: v1 kind: Service metadata: name: mqtt-broker labels: app-name: mqtt-broker spec: type: ClusterIP selector: app-name: mqtt-broker ports: - port: 1883 targetPort: default name: default protocol: TCP - port: 9001 targetPort: websocket name: websocket protocol: TCP 然后您可以使用客户端端口与服务器交互:tcp://mqtt-broker.default.svc.cluster.local:1883
相关链接
- Dapr组件的基本架构
- 阅读本指南以获取配置pub/sub组件的说明
- Pub/Sub构建块