本篇測評(píng)由電子工程世界的優(yōu)秀測評(píng)者“JerryZhen”提供。 本文將介紹基于米爾電子MYD-LT527開發(fā)板的網(wǎng)關(guān)方案測試。
一、系統(tǒng)概述 基于米爾-全志 T527設(shè)計(jì)一個(gè)簡易的物聯(lián)網(wǎng)網(wǎng)關(guān),該網(wǎng)關(guān)能夠管理多臺(tái)MQTT設(shè)備,通過MQTT協(xié)議對設(shè)備進(jìn)行讀寫操作,同時(shí)提供HTTP接口,允許用戶通過HTTP協(xié)議與網(wǎng)關(guān)進(jìn)行交互,并對設(shè)備進(jìn)行讀寫操作。 二、系統(tǒng)架構(gòu) 三、組件設(shè)計(jì) MQTT組件:
設(shè)備管理組件:
- 維護(hù)一個(gè)設(shè)備列表,記錄設(shè)備的唯一標(biāo)識(shí)符(如設(shè)備ID)、MQTT主題、連接狀態(tài)等信息。
提供設(shè)備增刪改查的方法。
HTTP組件:
- 基于FastAPI定義HTTP接口。
- 接收用戶請求,調(diào)用MQTT組件和設(shè)備管理組件進(jìn)行相應(yīng)操作。
返回操作結(jié)果給用戶。
四、接口設(shè)計(jì) 設(shè)備列表:
設(shè)備詳情:
設(shè)備數(shù)據(jù):
設(shè)備控制:
五、數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì) 設(shè)備信息:
- 設(shè)備ID (device_id):唯一標(biāo)識(shí)設(shè)備的字符串。
- MQTT主題 (mqtt_topic):設(shè)備在MQTT broker上的主題。
- 連接狀態(tài) (connection_status):表示設(shè)備是否在線的布爾值。
其他設(shè)備屬性(如名稱、描述等)。
設(shè)備數(shù)據(jù):
六、安全性考慮 七、部署與擴(kuò)展 八、實(shí)現(xiàn)步驟 該設(shè)計(jì)方案僅僅是概述,具體實(shí)現(xiàn)細(xì)節(jié)可能需要根據(jù)實(shí)際需求和項(xiàng)目環(huán)境進(jìn)行調(diào)整和優(yōu)化。在實(shí)際開發(fā)中,還需要考慮異常處理、日志記錄、性能優(yōu)化等方面的問題。基于上述設(shè)計(jì)方案,以下是一個(gè)簡化版的參考代碼,展示了如何使用FastAPI和paho-mqtt庫來創(chuàng)建一個(gè)物聯(lián)網(wǎng)網(wǎng)關(guān)。需要注意,示例中不包含完整的錯(cuò)誤處理、用戶認(rèn)證和授權(quán)機(jī)制,這些在實(shí)際生產(chǎn)環(huán)境中都是必不可少的。依賴的主要庫版本: fastapi==0.108.0 paho-mqtt==1.6.1 網(wǎng)關(guān)模擬代碼gateway.py: - from fastapi import FastAPI, HTTPException, Body, status
- from paho.mqtt.client import Client as MQTTClient
- from typing import List, Dict, Any
- import asyncio
- import json
- app = FastAPI()
- mqtt_client = None
- device_data = {}
- subtopic="gateway/device/#"
- # MQTT回調(diào)函數(shù)
- def on_message(client, userdata, msg):
- payload = msg.payload.decode()
- topic = msg.topic
- device_id = topic.split('/')[-1]
- device_data[device_id] = payload
- print(f"Received message from {device_id}: {payload}")
-
- # MQTT連接和訂閱
- def mqtt_connect_and_subscribe(broker_url, broker_port):
- global mqtt_client
- mqtt_client = MQTTClient()
- mqtt_client.on_message = on_message
- mqtt_client.connect(broker_url, broker_port, 60)
- mqtt_client.subscribe(subtopic)
- mqtt_client.loop_start()
-
- # MQTT發(fā)布消息
- async def mqtt_publish(topic: str, message: str):
- if mqtt_client is not None and mqtt_client.is_connected():
- mqtt_client.publish(topic, message)
- else:
- print("MQTT client is not connected!")
-
- # 設(shè)備管理:添加設(shè)備
- @app.post("/devices/", status_code=status.HTTP_201_CREATED)
- async def add_device(device_id: str):
- device_data[device_id] = None
- return {"message": f"Device {device_id} added"}
-
- # 設(shè)備管理:獲取設(shè)備列表
- @app.get("/devices/")
- async def get_devices():
- return list(device_data.keys())
-
- # 設(shè)備管理:獲取設(shè)備數(shù)據(jù)
- @app.get("/devices/{device_id}/data")
- async def get_device_data(device_id: str):
- if device_id not in device_data:
- raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Device {device_id} not found")
- return device_data.get(device_id)
-
- # 設(shè)備管理:發(fā)送數(shù)據(jù)到設(shè)備
- @app.post("/devices/{device_id}/data")
- async def send_data_to_device(device_id: str, data: Dict[str, Any] = Body(...)):
- topic = f"devices/{device_id}"
- message = json.dumps(data)
- await mqtt_publish(topic, message)
- return {"message": f"Data sent to {device_id}"}
-
- # 設(shè)備控制:發(fā)送控制命令到設(shè)備
- @app.post("/devices/{device_id}/control")
- async def control_device(device_id: str, command: str):
- topic = f"devices/device/{device_id}"
- await mqtt_publish(topic, command)
- return {"message": f"Control command sent to {device_id}"}
-
- # FastAPI啟動(dòng)事件
- @app.on_event("startup")
- async def startup_event():
- mqtt_connect_and_subscribe("127.0.0.1", 1883)
-
- # FastAPI關(guān)閉事件
- @app.on_event("shutdown")
- async def shutdown_event():
- if mqtt_client is not None:
- mqtt_client.loop_stop()
- mqtt_client.disconnect()
-
- # 運(yùn)行FastAPI應(yīng)用
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="127.0.0.1", port=8000)
復(fù)制代碼
設(shè)備1模擬代碼 dev1.py: - import paho.mqtt.client as mqtt
- # 連接成功回調(diào)
- def on_connect(client, userdata, flags, rc):
- print('Connected with result code '+str(rc))
- client.subscribe('devices/1')
- # 消息接收回調(diào)
- def on_message(client, userdata, msg):
- print(msg.topic+" "+str(msg.payload))
- client.publish('gateway/device/1',payload=f'echo {msg.payload}',qos=0)
-
- client = mqtt.Client()
- # 指定回調(diào)函數(shù)
- client.on_connect = on_connect
- client.on_message = on_message
- # 建立連接
- client.connect('127.0.0.1', 1883)
- # 發(fā)布消息
- client.publish('gateway/device/1',payload='Hello, I am device',qos=0)
- client.loop_forever()
復(fù)制代碼
設(shè)備2模擬代碼 dev2.py - import paho.mqtt.client as mqtt
- # 連接成功回調(diào)
- def on_connect(client, userdata, flags, rc):
- print('Connected with result code '+str(rc))
- client.subscribe('devices/2')
- # 消息接收回調(diào)
- def on_message(client, userdata, msg):
- print(msg.topic+" "+str(msg.payload))
- client.publish('gateway/device/2',payload=f'echo {msg.payload}',qos=0)
- client = mqtt.Client()
- # 指定回調(diào)函數(shù)
- client.on_connect = on_connect
- client.on_message = on_message
- # 建立連接
- client.connect('127.0.0.1', 1883)
- # 發(fā)布消息
- client.publish('gateway/device/2',payload='Hello, I am device',qos=0)
- client.loop_forever()
復(fù)制代碼
運(yùn)行網(wǎng)關(guān)代碼,打開網(wǎng)頁得到api接口:
通過api分別添加設(shè)備1和設(shè)備2,
在另外兩個(gè)控制臺(tái)中分別運(yùn)行模擬設(shè)備1和模擬設(shè)備2的代碼通過網(wǎng)頁API向設(shè)備1發(fā)送數(shù)據(jù)
通過網(wǎng)頁API獲得設(shè)備回復(fù)的數(shù)據(jù),設(shè)備代碼中只是簡單的把網(wǎng)關(guān)發(fā)過來的數(shù)據(jù)進(jìn)行回傳
我們在網(wǎng)關(guān)的后臺(tái)可以看到完整的數(shù)據(jù)流
至此一個(gè)簡易的網(wǎng)關(guān)已經(jīng)實(shí)現(xiàn)了,接下來將會(huì)嘗試實(shí)現(xiàn)樓宇里的最常見的bacnet設(shè)備進(jìn)行通訊管理。
|