EasyUDPServer

EasyUDPServer 和 EasyUDPClientSocket 提供了一种简单的方法来创建 UDP 服务器并以事件驱动的方式管理客户端连接。

UiFlow2 示例

简单服务器

在 UiFlow2 中打开 udp_server_core2_example.m5f2 项目。

本示例创建一个 UDP 服务器,监听 8000 端口并将接收到的数据显示在屏幕上。

UiFlow2 代码块:

udp_server_core2_example.png

示例输出:

None

MicroPython 示例

简单服务器

本示例创建一个 UDP 服务器,监听 8000 端口并将接收到的数据显示在屏幕上。

MicroPython 代码块:

 1# SPDX-FileCopyrightText: 2026 M5Stack Technology CO LTD
 2#
 3# SPDX-License-Identifier: MIT
 4
 5import os, sys, io
 6import M5
 7from M5 import *
 8from easysocket import EasyUDPServer
 9import network
10import time
11
12
13title0 = None
14label2 = None
15label0 = None
16label1 = None
17wlan_sta = None
18udps_0 = None
19
20
21received_data = None
22client_address_port = None
23
24
25def udps_0_received_event(args):
26    global title0, label2, label0, label1, wlan_sta, udps_0, received_data, client_address_port
27    server, client_address_port, received_data = args
28    label1.setText(str("Receive Msg:"))
29    label2.setText(str((str(received_data) + str(client_address_port))))
30    print((str(received_data) + str(client_address_port)))
31    udps_0.sendto(received_data, client_address_port)
32
33
34def setup():
35    global title0, label2, label0, label1, wlan_sta, udps_0, received_data, client_address_port
36
37    M5.begin()
38    Widgets.setRotation(1)
39    Widgets.fillScreen(0x222222)
40    title0 = Widgets.Title(
41        "UDPServer Core2 Example", 3, 0xFFFFFF, 0x0000FF, Widgets.FONTS.DejaVu18
42    )
43    label2 = Widgets.Label("", 1, 146, 1.0, 0xFFFFFF, 0x222222, Widgets.FONTS.DejaVu18)
44    label0 = Widgets.Label("Local IP:", 2, 68, 1.0, 0xFFFFFF, 0x222222, Widgets.FONTS.DejaVu18)
45    label1 = Widgets.Label("Receive Msg:", 2, 109, 1.0, 0xFFFFFF, 0x222222, Widgets.FONTS.DejaVu18)
46
47    wlan_sta = network.WLAN(network.STA_IF)
48    print("wait network connecting")
49    while not (wlan_sta.isconnected()):
50        print(".")
51        time.sleep(1)
52    print("connect success")
53    print(wlan_sta.ifconfig()[0])
54    label0.setText(str((str("Local IP:") + str((wlan_sta.ifconfig()[0])))))
55    udps_0 = EasyUDPServer(
56        host="0.0.0.0",
57        port=8000,
58        mode=EasyUDPServer.MODE_UNICAST,
59        multicast_group=None,
60        verbose=False,
61    )
62    udps_0.on_data_received(udps_0_received_event)
63
64
65def loop():
66    global title0, label2, label0, label1, wlan_sta, udps_0, received_data, client_address_port
67    M5.update()
68    udps_0.check_event(timeout=-1)
69
70
71if __name__ == "__main__":
72    try:
73        setup()
74        while True:
75            loop()
76    except (Exception, KeyboardInterrupt) as e:
77        try:
78            from utility import print_error_msg
79
80            print_error_msg(e)
81        except ImportError:
82            print("please update to latest firmware")

示例输出:

None

API参考

class software.easysocket.udp_server.EasyUDPServer(host='0.0.0.0', port=8000, mode=0, multicast_group=None, verbose=False)

基类:object

创建一个 EasyUDPServer 对象。

参数:
  • host (str) – 要绑定的主机地址。

  • port (int) – 要绑定的端口号。

  • mode (int) – UDP模式(单播、广播、组播)。默认为单播。

  • multicast_group (str) – 多播组地址(如果模式为多播,则为必填项)。

  • verbose (bool) – 是否打印详细输出。

备注

初始化后自动启动服务。

备注

这个类是非阻塞式的,并且是事件驱动的。你需要定期调用 check_event() 来处理事件。

UiFlow2 代码块:

init.png

MicroPython 代码块:

from easysocket import EasyUDPServer

udp_server = EasyUDPServer(host="0.0.0.0", port=8080)
start()

启动服务器。

MicroPython 代码块:

udp_server.start()
stop()

停止服务器。

MicroPython 代码块:

udp_server.stop()
on_data_received(callback)

设置数据接收事件的回调函数。

参数:

callback – 回调函数。

UiFlow2 代码块:

received_callback.png

MicroPython 代码块:

def on_data_received_cb(args):
    client, address, data = args
    print("Received:", data, "from", address)

udp_server.on_data_received(on_data_received_cb)
check_event(timeout=-1)

检查事件。

参数:

timeout (int) – 超时时间,单位为毫秒。默认值为 -1(无超时)。

UiFlow2 代码块:

check_event.png

MicroPython 代码块:

udp_server.check_event()
sendto(data, address)

向远程服务器发送数据。

参数:
  • data (bytes) – 要发送的数据。

  • address (tuple) – 要发送数据的(主机,端口)元组。

返回:

已发送的字节数。

UiFlow2 代码块:

sendto.png

MicroPython 代码块:

udp_server.sendto(b"Hello", ("192.168.1.100", 8080))