EasyTCPServer

EasyTCPServer 和 EasyTCPClientSocket 提供了一种简单的方法来创建 TCP 服务器并以事件驱动的方式管理客户端连接。

UIFLOW2 应用示例

简易服务器

在 UiFlow2 中打开 cores3_simple_server_example.m5f2 项目。

此示例创建一个 TCP 服务器,监听端口 8000 并在屏幕上显示接收到的数据。

UiFlow2 代码块:

cores3_simple_server_example.png

示例输出:

None

MicroPython 应用示例

简易服务器

此示例创建一个 TCP 服务器,监听端口 8000 并在屏幕上显示接收到的数据。

MicroPython 代码块:

  1# SPDX-FileCopyrightText: 2025 M5Stack Technology CO LTD
  2#
  3# SPDX-License-Identifier: MIT
  4
  5import os, sys, io
  6import M5
  7from M5 import *
  8import m5ui
  9import lvgl as lv
 10from easysocket import EasyTCPServer
 11import network
 12
 13
 14page0 = None
 15button0 = None
 16label0 = None
 17textarea0 = None
 18label1 = None
 19textarea1 = None
 20textarea2 = None
 21label3 = None
 22wlan_sta = None
 23tcps_0 = None
 24
 25
 26client = None
 27received_data = None
 28state = None
 29
 30
 31def tcps_0_connect_event(client_sock):
 32    global \
 33        page0, \
 34        button0, \
 35        label0, \
 36        textarea0, \
 37        label1, \
 38        textarea1, \
 39        textarea2, \
 40        label3, \
 41        wlan_sta, \
 42        tcps_0, \
 43        client, \
 44        received_data, \
 45        state
 46    client = client_sock
 47    textarea2.add_text(str("new clinet: "))
 48    textarea2.add_text(str(client.getpeername()[0]))
 49    textarea2.add_text(str("\n"))
 50
 51
 52def tcps_0_received_event(args):
 53    global \
 54        page0, \
 55        button0, \
 56        label0, \
 57        textarea0, \
 58        label1, \
 59        textarea1, \
 60        textarea2, \
 61        label3, \
 62        wlan_sta, \
 63        tcps_0, \
 64        client, \
 65        received_data, \
 66        state
 67    client, received_data = args
 68    textarea2.add_text(str(client.getpeername()[0]))
 69    textarea2.add_text(str(":"))
 70    textarea2.add_text(str(received_data))
 71    textarea2.add_text(str("\n"))
 72
 73
 74def tcps_0_disconnect_event(client_sock):
 75    global \
 76        page0, \
 77        button0, \
 78        label0, \
 79        textarea0, \
 80        label1, \
 81        textarea1, \
 82        textarea2, \
 83        label3, \
 84        wlan_sta, \
 85        tcps_0, \
 86        client, \
 87        received_data, \
 88        state
 89    client = client_sock
 90    textarea2.add_text(str(client.getpeername()[0]))
 91    textarea2.add_text(str(" disconnected\n"))
 92
 93
 94def button0_short_clicked_event(event_struct):
 95    global \
 96        page0, \
 97        button0, \
 98        label0, \
 99        textarea0, \
100        label1, \
101        textarea1, \
102        textarea2, \
103        label3, \
104        wlan_sta, \
105        tcps_0, \
106        client, \
107        received_data, \
108        state
109    state = not state
110    if state:
111        textarea2.set_text("")
112        tcps_0.start()
113        button0.set_btn_text(str("stop"))
114        button0.set_bg_color(0xFF0000, 255, lv.PART.MAIN | lv.STATE.DEFAULT)
115    else:
116        tcps_0.stop()
117        button0.set_btn_text(str("start"))
118        button0.set_bg_color(0x3366FF, 255, lv.PART.MAIN | lv.STATE.DEFAULT)
119
120
121def button0_event_handler(event_struct):
122    global \
123        page0, \
124        button0, \
125        label0, \
126        textarea0, \
127        label1, \
128        textarea1, \
129        textarea2, \
130        label3, \
131        wlan_sta, \
132        tcps_0, \
133        client, \
134        received_data, \
135        state
136    event = event_struct.code
137    if event == lv.EVENT.SHORT_CLICKED and True:
138        button0_short_clicked_event(event_struct)
139    return
140
141
142def setup():
143    global \
144        page0, \
145        button0, \
146        label0, \
147        textarea0, \
148        label1, \
149        textarea1, \
150        textarea2, \
151        label3, \
152        wlan_sta, \
153        tcps_0, \
154        client, \
155        received_data, \
156        state
157
158    M5.begin()
159    Widgets.setRotation(1)
160    m5ui.init()
161    page0 = m5ui.M5Page(bg_c=0xFFFFFF)
162    textarea0 = m5ui.M5TextArea(
163        text="textarea0",
164        placeholder="Placeholder...",
165        x=40,
166        y=6,
167        w=70,
168        h=36,
169        font=lv.font_montserrat_14,
170        bg_c=0xFFFFFF,
171        border_c=0xE0E0E0,
172        text_c=0x212121,
173        parent=page0,
174    )
175    textarea1 = m5ui.M5TextArea(
176        text="textarea0",
177        placeholder="Placeholder...",
178        x=178,
179        y=6,
180        w=56,
181        h=36,
182        font=lv.font_montserrat_14,
183        bg_c=0xFFFFFF,
184        border_c=0xE0E0E0,
185        text_c=0x212121,
186        parent=page0,
187    )
188    textarea2 = m5ui.M5TextArea(
189        text="Text",
190        placeholder="Placeholder...",
191        x=6,
192        y=76,
193        w=308,
194        h=158,
195        font=lv.font_montserrat_14,
196        bg_c=0xFFFFFF,
197        border_c=0xE0E0E0,
198        text_c=0x212121,
199        parent=page0,
200    )
201    button0 = m5ui.M5Button(
202        text="stop",
203        x=239,
204        y=6,
205        bg_c=0xF32121,
206        text_c=0xFFFFFF,
207        font=lv.font_montserrat_14,
208        parent=page0,
209    )
210    label0 = m5ui.M5Label(
211        "IP:",
212        x=6,
213        y=10,
214        text_c=0x000000,
215        bg_c=0xFFFFFF,
216        bg_opa=0,
217        font=lv.font_montserrat_24,
218        parent=page0,
219    )
220    label1 = m5ui.M5Label(
221        "Port:",
222        x=116,
223        y=10,
224        text_c=0x000000,
225        bg_c=0xFFFFFF,
226        bg_opa=0,
227        font=lv.font_montserrat_24,
228        parent=page0,
229    )
230    label3 = m5ui.M5Label(
231        "Log:",
232        x=6,
233        y=50,
234        text_c=0x000000,
235        bg_c=0xFFFFFF,
236        bg_opa=0,
237        font=lv.font_montserrat_14,
238        parent=page0,
239    )
240
241    button0.add_event_cb(button0_event_handler, lv.EVENT.ALL, None)
242
243    page0.screen_load()
244    wlan_sta = network.WLAN(network.STA_IF)
245    wlan_sta.active(True)
246    tcps_0 = EasyTCPServer(host="0.0.0.0", port=8000, listen=0, verbose=True)
247    tcps_0.on_client_connect(tcps_0_connect_event)
248    tcps_0.on_data_received(tcps_0_received_event)
249    tcps_0.on_client_disconnect(tcps_0_disconnect_event)
250    button0.set_size(74, 36)
251    textarea0.set_one_line(True)
252    textarea1.set_one_line(True)
253    textarea0.set_text(str(wlan_sta.ifconfig()[0]))
254    textarea1.set_text(str("8000"))
255    state = True
256
257
258def loop():
259    global \
260        page0, \
261        button0, \
262        label0, \
263        textarea0, \
264        label1, \
265        textarea1, \
266        textarea2, \
267        label3, \
268        wlan_sta, \
269        tcps_0, \
270        client, \
271        received_data, \
272        state
273    M5.update()
274    tcps_0.check_event(timeout=10)
275
276
277if __name__ == "__main__":
278    try:
279        setup()
280        while True:
281            loop()
282    except (Exception, KeyboardInterrupt) as e:
283        try:
284            m5ui.deinit()
285            from utility import print_error_msg
286
287            print_error_msg(e)
288        except ImportError:
289            print("please update to latest firmware")

示例输出:

None

API参考

class software.easysocket.tcp_server.EasyTCPServer(host='0.0.0.0', port=8000, listen=3, verbose=False)

基类:object

创建一个 EasyTCPServer 对象。

参数:
  • host (str) – 要绑定的主机地址。默认为 “0.0.0.0”。

  • port (int) – 要绑定的端口号。默认为 8000。

  • listen (int) – 系统在拒绝新连接之前允许的未接受连接数。默认为 3。

  • verbose (bool) – 是否打印详细输出。默认为 False。

备注

初始化时自动启动服务。

备注

此类是非阻塞和事件驱动的。您需要定期调用 check_event() 来处理事件。

UiFlow2 代码块:

init.png

MicroPython 代码块:

from easysocket import EasyTCPServer

server = EasyTCPServer(host="0.0.0.0", port=8080)
start()

启动服务器。

UiFlow2 代码块:

start.png

MicroPython 代码块:

server.start()
stop()

停止服务器。

UiFlow2 代码块:

stop.png

MicroPython 代码块:

server.stop()
返回类型:

None

get_sessions()

获取所有已连接的客户端套接字。

返回:

已连接客户端套接字的元组。

返回类型:

tuple[EasyTCPClientSocket]

UiFlow2 代码块:

get_sessions.png

MicroPython 代码块:

sessions = server.get_sessions()
on_client_connect(callback)

设置客户端连接事件的回调函数。

参数:

callback – 回调函数。回调函数必须接受一个参数,即已连接的 EasyTCPClientSocket 客户端套接字实例。

UiFlow2 代码块:

on_client_connect.png

MicroPython 代码块:

def on_client_connect_cb(client_socket):
    print("Client connected")

server.on_client_connect(on_client_connect_cb)
on_client_disconnect(callback)

设置客户端断开连接事件的回调函数。

参数:

callback – 回调函数。回调函数必须接受一个参数,即已断开连接的 EasyTCPClientSocket 客户端套接字实例。

UiFlow2 代码块:

on_client_disconnect.png

MicroPython 代码块:

def on_client_disconnect_cb(client_socket):
    print("Client disconnected")

server.on_client_disconnect(on_client_disconnect_cb)
on_data_received(callback)

设置数据接收事件的回调函数。

参数:

callback – 回调函数。回调函数必须接受一个参数,即包含 EasyTCPClientSocket 客户端套接字实例和接收到的数据(字节)的元组。

UiFlow2 代码块:

on_data_received.png

MicroPython 代码块:

def on_data_received_cb(args):
    client_socket, data = args
    print("Received:", data)

server.on_data_received(on_data_received_cb)
check_event(timeout=-1)

检查事件。

参数:

timeout (int) – 超时时间(毫秒)。默认为 -1(无超时)。

UiFlow2 代码块:

check_event.png

MicroPython 代码块:

server.check_event()
class software.easysocket.tcp_server.EasyTCPClientSocket(sock)

基类:object

创建一个 EasyTCPClientSocket 对象。

备注

这是 EasyTCPServer 中使用的 socket 对象的包装类。

参数:

sock (socket) – socket 对象。

send(*args, **kwargs)

向客户端发送数据。

参数:

data (bytes) – 要发送的数据。

返回:

已发送的字节数。

返回类型:

int

UiFlow2 代码块:

send.png

MicroPython 代码块:

client_socket.send(data)
sendall(*args, **kwargs)

向客户端发送所有数据。

参数:

data (bytes) – 要发送的数据。

UiFlow2 代码块:

sendall.png

MicroPython 代码块:

client_socket.sendall(data)
close(*args, **kwargs)

关闭连接。

UiFlow2 代码块:

close.png

MicroPython 代码块:

client_socket.close()
getsockname(*args, **kwargs)

返回套接字自己的地址。

返回:

套接字自己的地址。格式为 (host, port)。

返回类型:

tuple

UiFlow2 代码块:

getsockname.png

MicroPython 代码块:

# get local ip address
client_socket.getsockname()[0]
getpeername(*args, **kwargs)

返回套接字连接到的远程地址。

返回:

远程地址。格式为 (host, port)。

返回类型:

tuple

UiFlow2 代码块:

getpeername.png

MicroPython 代码块:

# get remote ip address
client_socket.getpeername()[0]