umqtt.default

umqtt.default 重写了 subscribe() 方法并支持 ca 文件。

UiFlow2 应用示例

MQTT 基础

在 UiFlow2 中打开 cores3_mqtt_basic_example.m5f2 项目。

此示例演示如何连接到 MQTT 代理服务器。

UiFlow2 代码块:

在 UiFlow2 中打开 |cores3_mqtt_basic_example.m5f2| 项目。

示例输出:

None

MQTT SSL 连接

在 UiFlow2 中打开 cores3_mqtt_over_ssl_example.m5f2 项目。

此示例演示如何通过 SSL 连接到 MQTT 代理服务器。

UiFlow2 代码块:

cores3_mqtt_over_ssl_example.png

示例输出:

None

MicroPython 应用示例

MQTT 基础

此示例演示如何连接到 MQTT 代理服务器。

MicroPython 代码块:

# SPDX-FileCopyrightText: 2025 M5Stack Technology CO LTD
#
# SPDX-License-Identifier: MIT

import os, sys, io
import M5
from M5 import *
import m5ui
import lvgl as lv
from umqtt import MQTTClient


page0 = None
label0 = None
button0 = None
textarea0 = None
label1 = None
textarea1 = None
label2 = None
keyboard0 = None
mqtt_client = None


def button0_short_clicked_event(event_struct):
    global page0, label0, button0, textarea0, label1, textarea1, label2, keyboard0, mqtt_client
    mqtt_client.publish("testtopic/test", textarea0.get_text(), qos=0)


def mqtt_testtopic_test_event(data):
    global page0, label0, button0, textarea0, label1, textarea1, label2, keyboard0, mqtt_client
    textarea1.set_text(str(data[1]))


def textarea0_focused_event(event_struct):
    global page0, label0, button0, textarea0, label1, textarea1, label2, keyboard0, mqtt_client
    keyboard0.set_flag(lv.obj.FLAG.HIDDEN, False)


def textarea0_defocused_event(event_struct):
    global page0, label0, button0, textarea0, label1, textarea1, label2, keyboard0, mqtt_client
    keyboard0.set_flag(lv.obj.FLAG.HIDDEN, True)


def button0_event_handler(event_struct):
    global page0, label0, button0, textarea0, label1, textarea1, label2, keyboard0, mqtt_client
    event = event_struct.code
    if event == lv.EVENT.SHORT_CLICKED and True:
        button0_short_clicked_event(event_struct)
    return


def textarea0_event_handler(event_struct):
    global page0, label0, button0, textarea0, label1, textarea1, label2, keyboard0, mqtt_client
    event = event_struct.code
    if event == lv.EVENT.FOCUSED and True:
        textarea0_focused_event(event_struct)
    if event == lv.EVENT.DEFOCUSED and True:
        textarea0_defocused_event(event_struct)
    return


def setup():
    global page0, label0, button0, textarea0, label1, textarea1, label2, keyboard0, mqtt_client

    M5.begin()
    Widgets.setRotation(1)
    m5ui.init()
    page0 = m5ui.M5Page(bg_c=0xFFFFFF)
    textarea0 = m5ui.M5TextArea(
        text="textarea0",
        placeholder="Placeholder...",
        x=52,
        y=32,
        w=150,
        h=70,
        font=lv.font_montserrat_14,
        bg_c=0xFFFFFF,
        border_c=0xE0E0E0,
        text_c=0x212121,
        parent=page0,
    )
    textarea1 = m5ui.M5TextArea(
        text="textarea1",
        placeholder="Placeholder...",
        x=10,
        y=132,
        w=150,
        h=70,
        font=lv.font_montserrat_14,
        bg_c=0xFFFFFF,
        border_c=0xE0E0E0,
        text_c=0x212121,
        parent=page0,
    )
    label0 = m5ui.M5Label(
        "pubish topic: testtopic/test",
        x=10,
        y=10,
        text_c=0x000000,
        bg_c=0xFFFFFF,
        bg_opa=0,
        font=lv.font_montserrat_14,
        parent=page0,
    )
    button0 = m5ui.M5Button(
        text="publish",
        x=217,
        y=32,
        bg_c=0x2196F3,
        text_c=0xFFFFFF,
        font=lv.font_montserrat_14,
        parent=page0,
    )
    label1 = m5ui.M5Label(
        "subscribe topic: testtopic/test",
        x=11,
        y=110,
        text_c=0x000000,
        bg_c=0xFFFFFF,
        bg_opa=0,
        font=lv.font_montserrat_14,
        parent=page0,
    )
    label2 = m5ui.M5Label(
        "msg:",
        x=10,
        y=32,
        text_c=0x000000,
        bg_c=0xFFFFFF,
        bg_opa=0,
        font=lv.font_montserrat_14,
        parent=page0,
    )
    keyboard0 = m5ui.M5Keyboard(
        x=0,
        y=120,
        w=320,
        h=120,
        mode=lv.keyboard.MODE.TEXT_LOWER,
        target_textarea=textarea0,
        parent=page0,
    )

    button0.add_event_cb(button0_event_handler, lv.EVENT.ALL, None)
    textarea0.add_event_cb(textarea0_event_handler, lv.EVENT.ALL, None)

    page0.set_flag(lv.obj.FLAG.SCROLLABLE, True)
    keyboard0.set_flag(lv.obj.FLAG.HIDDEN, True)
    page0.screen_load()
    mqtt_client = MQTTClient(
        "uiflow", "broker.emqx.io", port=1883, user="test", password="test", keepalive=0
    )
    mqtt_client.connect(clean_session=True)
    mqtt_client.subscribe("testtopic/test", mqtt_testtopic_test_event, qos=0)


def loop():
    global page0, label0, button0, textarea0, label1, textarea1, label2, keyboard0, mqtt_client
    M5.update()
    mqtt_client.check_msg()


if __name__ == "__main__":
    try:
        setup()
        while True:
            loop()
    except (Exception, KeyboardInterrupt) as e:
        try:
            m5ui.deinit()
            from utility import print_error_msg

            print_error_msg(e)
        except ImportError:
            print("please update to latest firmware")

示例输出:

None

MQTT SSL 连接

此示例演示如何通过 SSL 连接到 MQTT 代理服务器。

MicroPython 代码块:

  1# SPDX-FileCopyrightText: 2024 M5Stack Technology CO LTD
  2#
  3# SPDX-License-Identifier: MIT
  4
  5import os, sys, io
  6import M5
  7from M5 import *
  8import m5ui
  9import lvgl as lv
 10from umqtt import MQTTClient
 11
 12
 13page0 = None
 14label0 = None
 15button0 = None
 16textarea0 = None
 17label1 = None
 18textarea1 = None
 19label2 = None
 20keyboard0 = None
 21mqtt_client = None
 22
 23
 24def button0_short_clicked_event(event_struct):
 25    global page0, label0, button0, textarea0, label1, textarea1, label2, keyboard0, mqtt_client
 26    mqtt_client.publish("testtopic/test", textarea0.get_text(), qos=0)
 27
 28
 29def mqtt_testtopic_test_event(data):
 30    global page0, label0, button0, textarea0, label1, textarea1, label2, keyboard0, mqtt_client
 31    textarea1.set_text(str(data[1]))
 32
 33
 34def textarea0_focused_event(event_struct):
 35    global page0, label0, button0, textarea0, label1, textarea1, label2, keyboard0, mqtt_client
 36    keyboard0.set_flag(lv.obj.FLAG.HIDDEN, False)
 37
 38
 39def textarea0_defocused_event(event_struct):
 40    global page0, label0, button0, textarea0, label1, textarea1, label2, keyboard0, mqtt_client
 41    keyboard0.set_flag(lv.obj.FLAG.HIDDEN, True)
 42
 43
 44def button0_event_handler(event_struct):
 45    global page0, label0, button0, textarea0, label1, textarea1, label2, keyboard0, mqtt_client
 46    event = event_struct.code
 47    if event == lv.EVENT.SHORT_CLICKED and True:
 48        button0_short_clicked_event(event_struct)
 49    return
 50
 51
 52def textarea0_event_handler(event_struct):
 53    global page0, label0, button0, textarea0, label1, textarea1, label2, keyboard0, mqtt_client
 54    event = event_struct.code
 55    if event == lv.EVENT.FOCUSED and True:
 56        textarea0_focused_event(event_struct)
 57    if event == lv.EVENT.DEFOCUSED and True:
 58        textarea0_defocused_event(event_struct)
 59    return
 60
 61
 62def setup():
 63    global page0, label0, button0, textarea0, label1, textarea1, label2, keyboard0, mqtt_client
 64
 65    M5.begin()
 66    Widgets.setRotation(1)
 67    m5ui.init()
 68    page0 = m5ui.M5Page(bg_c=0xFFFFFF)
 69    textarea0 = m5ui.M5TextArea(
 70        text="textarea0",
 71        placeholder="Placeholder...",
 72        x=52,
 73        y=32,
 74        w=150,
 75        h=70,
 76        font=lv.font_montserrat_14,
 77        bg_c=0xFFFFFF,
 78        border_c=0xE0E0E0,
 79        text_c=0x212121,
 80        parent=page0,
 81    )
 82    textarea1 = m5ui.M5TextArea(
 83        text="textarea1",
 84        placeholder="Placeholder...",
 85        x=10,
 86        y=132,
 87        w=150,
 88        h=70,
 89        font=lv.font_montserrat_14,
 90        bg_c=0xFFFFFF,
 91        border_c=0xE0E0E0,
 92        text_c=0x212121,
 93        parent=page0,
 94    )
 95    label0 = m5ui.M5Label(
 96        "pubish topic: testtopic/test",
 97        x=10,
 98        y=10,
 99        text_c=0x000000,
100        bg_c=0xFFFFFF,
101        bg_opa=0,
102        font=lv.font_montserrat_14,
103        parent=page0,
104    )
105    button0 = m5ui.M5Button(
106        text="publish",
107        x=217,
108        y=32,
109        bg_c=0x2196F3,
110        text_c=0xFFFFFF,
111        font=lv.font_montserrat_14,
112        parent=page0,
113    )
114    label1 = m5ui.M5Label(
115        "subscribe topic: testtopic/test",
116        x=11,
117        y=110,
118        text_c=0x000000,
119        bg_c=0xFFFFFF,
120        bg_opa=0,
121        font=lv.font_montserrat_14,
122        parent=page0,
123    )
124    label2 = m5ui.M5Label(
125        "msg:",
126        x=10,
127        y=32,
128        text_c=0x000000,
129        bg_c=0xFFFFFF,
130        bg_opa=0,
131        font=lv.font_montserrat_14,
132        parent=page0,
133    )
134    keyboard0 = m5ui.M5Keyboard(
135        x=0,
136        y=120,
137        w=320,
138        h=120,
139        mode=lv.keyboard.MODE.TEXT_LOWER,
140        target_textarea=textarea0,
141        parent=page0,
142    )
143
144    button0.add_event_cb(button0_event_handler, lv.EVENT.ALL, None)
145    textarea0.add_event_cb(textarea0_event_handler, lv.EVENT.ALL, None)
146
147    page0.set_flag(lv.obj.FLAG.SCROLLABLE, True)
148    keyboard0.set_flag(lv.obj.FLAG.HIDDEN, True)
149    page0.screen_load()
150    mqtt_client = MQTTClient(
151        "uiflow",
152        "y90166f4.ala.cn-hangzhou.emqxsl.cn",
153        port=8883,
154        user="test",
155        password="test",
156        keepalive=0,
157        ssl=True,
158        ssl_params={
159            "cafile": "/flash/certificate/emqxsl-ca.crt",
160            "server_hostname": "y90166f4.ala.cn-hangzhou.emqxsl.cn",
161        },
162    )
163    mqtt_client.connect(clean_session=True)
164    mqtt_client.subscribe("testtopic/test", mqtt_testtopic_test_event, qos=0)
165
166
167def loop():
168    global page0, label0, button0, textarea0, label1, textarea1, label2, keyboard0, mqtt_client
169    M5.update()
170    mqtt_client.check_msg()
171
172
173if __name__ == "__main__":
174    try:
175        setup()
176        while True:
177            loop()
178    except (Exception, KeyboardInterrupt) as e:
179        try:
180            m5ui.deinit()
181            from utility import print_error_msg
182
183            print_error_msg(e)
184        except ImportError:
185            print("please update to latest firmware")

示例输出:

None

API参考

MQTTClient

class umqtt.MQTTClient(client_id, server, port=0, user=None, password=None, keepalive=0, ssl=False, ssl_params={})

创建 MQTTClient 对象。

参数:
  • client_id (str) – 连接到 MQTT 代理服务器时使用的唯一客户端 ID 字符串。

  • server (str) – 远程 MQTT 代理服务器的主机名或 IP 地址。

  • port (int) – 要连接的服务器主机的网络端口。

  • user (str or None) – 用于代理服务器身份验证的用户名。

  • password (str or None) – 用于代理服务器身份验证的密码。

  • keepalive (int) – 与代理服务器通信之间允许的最长时间(以秒为单位)。如果没有交换其他消息,这将控制客户端向代理服务器发送 ping 消息的速率。

  • ssl (bool) – 是否使用 SSL。

  • ssl_params (dict) – 启动 SSL 连接所需的一些参数。

返回:

MQTTClient 对象

返回类型:

MQTTClient

UiFlow2 代码块:

init.png

init_ssl.png

MicroPython 代码块:

from umqtt import MQTTClient

mqtt_client = MQTTClient(
    'uf2',
    'y90166f4.ala.cn-hangzhou.emqxsl.cn',
    port=8883,
    user='test',
    password='test',
    keepalive=0,
    ssl=True,
    ssl_params={
        "server_hostname":'y90166f4.ala.cn-hangzhou.emqxsl.cn',
        "key": "/flash/certificate/emqxsl-ca.crt", # 私钥文件,双向认证的时候使用
        "cert": "/flash/certificate/emqxsl-ca.crt", # 客户端证书文件,双向认证的时候使用
        "cafile": "/flash/certificate/emqxsl-ca.crt", # CA证书,单向认证的时候使用
    }
)
connect(clean_session=True) bool

连接到服务器。如果此连接使用存储在服务器上的持久会话,则返回 True(如果使用 clean_session=True 参数(默认),则始终为 False)。

UiFlow2 代码块:

connect.png

MicroPython 代码块:

mqtt_client.connect(clean_session=True)
disconnect() None

断开与服务器的连接,释放资源。

UiFlow2 代码块:

disconnect.png

MicroPython 代码块:

mqtt_client.disconnect()
reconnect() None

断开与服务器的连接,释放资源。

UiFlow2 代码块:

reconnect.png

MicroPython 代码块:

mqtt_client.reconnect()
ping() None

Ping 服务器(响应由 wait_msg() 自动处理)。

MicroPython 代码块:

mqtt_client.ping()
publish(topic, msg, retain=False, qos=0) None

发布消息。

参数:
  • topic (str or bytes or bytearray) – 应发布消息的主题。

  • msg (str or bytes or bytearray) – 实际要发送的消息。

  • retain (bool) – 如果设置为 True ,则遗嘱消息将被设置为该主题的“最后遗嘱”/保留消息。

  • qos (int) – 使用的服务质量级别

UiFlow2 代码块:

publish.png

MicroPython 代码块:

mqtt_client.publish(topic, msg, retain=False, qos=0)
subscribe(topic, handler, qos=0) None

订阅主题。

参数:
  • topic (str or bytes or bytearray) – 指定要订阅的订阅主题的字符串。

  • handler (function) – 当收到有关客户端订阅的主题的消息并且该消息与现有主题过滤器回调匹配时调用。

  • qos (int) – 订阅所需的服务质量级别。 默认为 0。

UiFlow2 代码块:

subscribe.png

显示已收到消息的处理程序:

def on_sub_cb(data):
    print("topic:", data[0])
    print("msg:", data[1])

在 UiFlow2 上,可以通过 get_topic.pngget_msg.png 获取当前处理器的 主题消息

unsubscribe(topic) None

取消订阅主题。

参数:

topic (str or bytes or bytearray) – 指定要取消订阅的主题的字符串。

UiFlow2 代码块:

unsubscribe.png

MicroPython 代码块:

mqtt_client.unsubscribe(topic)
set_last_will(topic, msg, retain=False, qos=0) None

重要

应该在 connect() 之前调用。

设置 MQTT 遗嘱消息。

参数:
  • topic (str or bytes or bytearray) – 将发布意愿消息的主题。

  • msg (str or bytes or bytearray) – 作为遗嘱发送的消息。如果未提供或设置为 None,则将使用零长度消息作为遗嘱。

  • retain (bool) – 如果设置为 True ,则遗嘱消息将被设置为该主题的“最后遗嘱”/保留消息。

  • qos (int) – 用于意愿的服务质量水平。

UiFlow2 代码块:

set_last_will.png

MicroPython 代码块:

mqtt_client.set_last_will()
wait_msg() None

重要

wait_msg()check_msg() 是主循环迭代方法,有阻塞和非阻塞版本。如果您没有任何其他前台任务要执行(即您的应用程序仅对订阅的 MQTT 消息做出反应),则应在循环中定期调用 wait_msg();如果您还需要处理其他前台任务,则使用 check_msg()

请注意,如果您只发布消息而从不订阅消息,则不需要调用 wait_msg() / check_msg()

等待服务器消息。

MicroPython 代码块:

mqtt_client.wait_msg()
check_msg(attempts=2) None

检查是否有来自服务器的待处理消息。如果有,则与 wait_msg() 处理方式相同;如果没有,则立即返回。

UiFlow2 代码块:

wait_msg.png

MicroPython 代码块:

mqtt_client.check_msg()