prettyprint

2021年9月26日 星期日

使用NODE-RED Dashboard 以MQTT訊息驅動來控制MCU上的感應器或設備(Controlling MCU/Sensor/Device with Node-RED Dashboard via MQTT)

 本實驗使用Node-RED透過MQTT訊息傳送來控制遠端的MCU以便收集溫溼度與控制電燈開關,MQTT使用publish/subscribe方式來傳送訊息,因此本實驗的設計邏輯就以訊息驅動思維來設計。

一、使用元件:

  1. Raspberry Pi 3B+,安裝mosquitto broker and NODE-RED
  2. ESP8266(WeMos D1 Mini)
  3. DHT11
  4. LED x 1, NPN電晶體 x 1,LM393 x 1, 200歐姆 x 2, 4.7K歐姆 x 1

二、設計思維:

MQTT使用publish/subscribe方式來傳送訊息,設計邏輯就以訊息驅動思維來設計。本實驗最後目標是以Node-RED Dashboard來收集並展示遠端溫濕度資料與遠端控制電燈開關,首先定義完成目標所需要的Topics,再定義Publish/Subscribe Topics message flow以及兩端點須完成的動作,最後就能輕易使用Node-RED完成設計的需求。

  • 定義系所需的Topics:
    遠端有兩個元件: DHT11(Sensor)與電燈(LED Device)需要控制,另需要知道網路的狀態,因此定義出下列六個Topics與每個Topic的Payloads:
    遠端設備狀態資料的更新可以用interrupt或polling方式,本實驗以polling方式來更新,因此ESP8266/Sensor/RefreshRate即為更新頻率的秒數。

  • Node-RED Dashboard所需UI Nodes
  • 定義端點間Topics Message flow與需完成的動作






根據上面的Flow以Node-RED來實現,最後完成的Flow如下圖。


手機上Node-RED Dashboard畫面


  • Node-RED詳細步驟如下影片所展示:


三、遠端設備線路圖:



LM393 OPA當成電位比較器,用來檢測LED的狀態,當LED在ON時,LM393 pin 1輸出應為高電位,若低電位時,則LED是處於斷路狀態,因此publish Light/Status為FAILURE。



四、ESP8266端程式碼:

使用EspMQTTClient and DHTesp library。

#include "EspMQTTClient.h"

#include "DHTesp.h" // Click here to get the library: http://librarymanager/All#DHTesp

#ifdef ESP32
#pragma message(THIS EXAMPLE IS FOR ESP8266 ONLY!)
#error Select ESP8266 board.
#endif

DHTesp dht;

#define LIGHTPIN D5
#define DHTPIN 12                     //DHT11, D6 pin
#define LIGHT_STATUS_PIN  D7
#define LIGHT_ON      1
#define LIGHT_OFF     2
#define LIGHT_FAILURE 3

byte bLightStatus=LIGHT_OFF;  
byte bSwitchStatus=LIGHT_OFF;
int publishDelaySeconds=2;
 
///////////////////

EspMQTTClient client(
  "your-ssid",
  "your-pwd",
  "192.168.1.1",  // MQTT Broker server ip
  "MQTTUsername",   // Can be omitted if not needed
  "MQTTPassword",   // Can be omitted if not needed
  "ESP8266Client",     // Client name that uniquely identify your device
  1883              // The MQTT port, default to 1883. this line can be omitted
);


byte checkLightStatus() {
  digitalWrite(LIGHTPIN, HIGH);
  delay(1000);
  if (digitalRead(LIGHT_STATUS_PIN) == HIGH) {
      bLightStatus=LIGHT_ON;
   }  else {
      bLightStatus = LIGHT_FAILURE;
   }
   return bLightStatus;
}
void setup() {
  //Serial.begin(115200);
  pinMode(LIGHTPIN, OUTPUT);

  pinMode(LIGHT_STATUS_PIN, INPUT_PULLUP);
  checkLightStatus();

  dht.setup(12, DHTesp::DHT11); // Connect DHT sensor to GPIO 17

  client.enableLastWillMessage("ESP8266/Network/Status", "OFFLINE");
  client.setMaxPacketSize(5120);
  client.setKeepAlive(20);
  client.enableMQTTPersistence();

  digitalWrite(LIGHTPIN, LOW);
  bLightStatus = LIGHT_OFF;
  

}

void onTopicMessageReceived(const String& topic, const String& msg)
{
  if (topic == "ESP8266/Sensor/Light/Switch") {  // switch on: turn on light and check light status
       if (msg == "ON") {
        bSwitchStatus=LIGHT_ON;
        digitalWrite(LIGHTPIN, HIGH);
        delay(10);
        if (digitalRead(LIGHT_STATUS_PIN) == LOW) {
          client.publish("ESP8266/Sensor/Light/Status", "FAILURE");
          bLightStatus=LIGHT_FAILURE;
        } else {
          client.publish("ESP8266/Sensor/Light/Status", "ON");
          bLightStatus= LIGHT_ON;
        }
      } else {
        bSwitchStatus=LIGHT_OFF;
        digitalWrite(LIGHTPIN, LOW);
        bLightStatus= LIGHT_OFF;
        client.publish("ESP8266/Sensor/Light/Status", "OFF");
      }
    }
    if (topic == "ESP8266/Sensor/RefreshRate") {
      publishDelaySeconds=msg.toInt();
    }
}

void publishTempHumLS() {
  char buff[100];
  String DHTStatus=dht.getStatusString();
  delay(dht.getMinimumSamplingPeriod());
  if (DHTStatus == "OK") {
    //float humidity = dht.getHumidity();
    //float temperature = dht.getTemperature();
    sprintf(buff, "{\"H\":%.1f,\"T\":%.1f}", dht.getHumidity(), dht.getTemperature());
    client.publish("ESP8266/Sensor/DHT/Data", buff);
  } else {
    client.publish("ESP8266/Sensor/DHT/Status",DHTStatus); //  DHT/Status不送出OK。
  }
 
  
  
  switch(bSwitchStatus) {
    case LIGHT_ON:
      if (digitalRead(LIGHT_STATUS_PIN) == LOW){
          client.publish("ESP8266/Sensor/Light/Status", "FAILURE");
          bLightStatus =LIGHT_FAILURE;
      } else {
        client.publish("ESP8266/Sensor/Light/Status", "ON");
        bLightStatus =LIGHT_ON;
      }
      break;
    case LIGHT_OFF:
      client.publish("ESP8266/Sensor/Light/Status", "OFF");
      break;
    
  } 
  
  client.executeDelayed(publishDelaySeconds*1000, publishTempHumLS);
}

void onConnectionEstablished() {
  client.subscribe("ESP8266/Sensor/Light/Switch", onTopicMessageReceived);
  client.subscribe("ESP8266/Sensor/RefreshRate", onTopicMessageReceived);
  client.publish("ESP8266/Network/Status", "CONNECTED");
  switch(bLightStatus) {
    case LIGHT_ON:
      client.publish("ESP8266/Sensor/Light/Status", "ON");
      break;
    case LIGHT_OFF:
      client.publish("ESP8266/Sensor/Light/Status", "OFF");
      break;
  case LIGHT_FAILURE:
      client.publish("ESP8266/Sensor/Light/Status", "FAILURE");
      break;
     
  } 

  client.executeDelayed(1000, publishTempHumLS);
}



void loop() {
  client.loop();

}

2021年9月12日 星期日

MQTT 簡介:使用Mosquitto Broker

本篇文章內容主要介紹以下主題,使用mosquitto broker並搭配Wireshark擷取封包比對說明,讓有興趣的同好,很快對MQTT運作了解:

  1. MQTT Network Stack
  2. MQTT Control Packet格式解說
  3. 以Wireshark擷取封包驗證Control Packet
  4. 不同QoS下,Broker與Clients  Control Packet flow解說
  5. Clean Session, Retain, Will message等實例說明 
  6. 使用username and password
  7. 使用SSL/TLS

 MQTT(MQ Telemetry Transport) protocol 透過Broker(Server) 轉送用戶端(Client)的發佈者(publisher)送出的Message 到其他用戶端的訂閱者(Subscirber),每個用戶端設備可同時具有publisher與subscriber角色,發佈者發送不同主題(topic) 的訊息(message)到伺服器(Server)代理者(Broker),而每個Subscriber可向Broker訂閱不同topic,當有所訂閱的topic 送到Broker時則可立即收到所訂閱topic的message。如下圖。

如上圖所示Device A可發佈 topic A的 訊息,同時可訂閱 topic B的訊息。而每個client(device)並不需要知道彼此的位址,只需要向Broker(server)發佈主題訊息,或向Brorker訂閱要的主題而獲得想要的訊息。

而Broker一般並不會一直儲存publisher送來的某個topic的message,直到將該topic的messag送出到所有已上線訂閱該主題的的subscriber後,即將該topic的message刪除。若publisher發佈的topic沒有上線的subscriber訂閱,該主題即自動刪除。如下圖所示。
如上圖所示message 3, message 4 Subscriber 則無法收到。若需要在Subscriber斷線或離線,然後重新連線後仍會需要收到離線時的message,在下面章節有關retained message 與clean Session說明。

一、 Network stack

MQTT 屬於ISO/OSI的應用層,底層使用TCP/IP,因此可應用於區域或廣域網路,相對應的網路架構如下圖說明:

MQTT在Application layer所傳送的內容可傳送明文或透過SSL/TLS加密內容後再傳送。MQTT一般使用port 1883傳送未加密的MQTT Control Packet或使用port 8883傳送SSL/TLS加密的MQTT Control Packet。如下wireshark擷取畫面:




二、 MQTT Control Packet:

MQTT 屬於網路應用層,MQTT Control Packet分成三部分: 
  • Fixed Header: 包含Message type and Remaining Length,必須要有,最少2 bytes,最多5 bytes。
  • Variable Header: 依Message type而有不同內容,最小長度為0 byte。
  • Payload: 最小長度為0 byte。
Variable Header and Payload因不同Message type而有不同長度,總長度為0~256M bytes。

MQTT Control Packet Format

1. MQTT Message type Format:

只有一個byte分成前4個位元為message type共2^4=16種,後4個位元為QoS 與retain等flag。



Message Type 16種如下表所示:

2. Remaining Length:
說明後面兩個欄位(Variable Header and payload)總長度,為1~4 bytes,最高位元的bit為continuation bit,當為1時表示總長度數值還須包含下一個byte,每個byte僅有較低的7位元表示數值。因此Remaining Length 表示的長度最小為0,最大為2^28=256M。28(=7*4)因為每個byte僅以最低7個bits為表示值
例如:
  • DISCONNECT MESSAGE:為最小message,Fixed Header長度僅為2 bytes,而Remaining Length只需一個byte,值為0x00。
  • 若長度為509:509=125+128*3,所以Remaining Length為2 bytes,且第一個byte最高bit為1,2 bytes為 0b1111 1101  0b0000 0011(0xFD 0x03)
  • 若長度為49798:49798=6+128*5+128*128*3,所以Remaining Length為3 bytes,且第一、二個byte最高bit為1,3 bytes為 0b1000 0110  0b1000 0101 0b0000 0011(0x86 0x85 0x03)
  • Remaining Length最大可表示256M: 0xFF 0xFF 0xFF 0x7F。
  • Remaining Length可表示的長度與所需byte(s)數如下表所示

3. Variable Header & Payload:
每個message type有自己需要的Varialbe Header 與 Payload。下表列出14種message type是否需要Variable Header與Payload`;
Message TypeVariable HeaderPayload
CONNECTRequiredRequired
CONNACKNoneNone
PUBLISHRequiredOptional
PUBACKRequiredNone
PUBRECRequiredNone
PUBRELRequiredNone
PUBCOMPRequiredNone
SUBSCRIBERequiredRequired
SUBACKRequiredRequired
UNSUBSCRIBERequiredRequired
UNSUBACKRequiredRequired
PINGREQNoneNone
PINGRESPNoneNone
DISCONNECTNoneNone

詳細Varialble Header內容或Payload必要那些內容,可參考:MQTT Specifications 文件說明。本篇文章列出以PUBLISH 搭配Wireshark擷取封包內容說明如下。

PUBLISH Control Packet Format:

註:Packet Identifier的2 bytes只有在QoS level為1 or 2時才需要有。
以下所有範例皆使用Mosquitto Broker軟體測試。

使用指令:mosquitto_pub -t Room1/Light -m "Turn On" -i p1 -h 192.168.1.61 送出PUBLISH Control Packet,以Wireshark擷取封包後比對PUBLISH Control Packet Format各欄位說明如下:

  • byte 1:0x30 Control Packet Type為3 PUBLISH,  DUP=0, QoS=0, RETAIN=0
  • byte 2: 0x14(=20),Remaining Length 表示varialbe header + payload需要20 bytes,因此Remaining Length欄位只需一個byte即可。
  • byte 3~4:0x00 0x0b(=11),Topic name長度為11 bytes
  • 接下來11 bytes為Room1/Light (topic name的內容共11 bytes)
  • 因為QoS=0,所以不會有packet identifier
  • 最後7 bytes(=20-2-11)為Turn On (payload的內容共7 bytes)

三、各類QoS Server(Broker)與Client(Publisher, Subscriber) Message Flow:

QoS為2 bits,代表QoS Level如下表所示:

使用相同指令加入QoS項目,
mosquitto_pub -t Room1/Light -m "Turn On" -q 0 -i p1 -h 192.168.1.61 (QoS Default is 0)
mosquitto_pub -t Room1/Light -m "Turn On" -q 1 -i p1 -h 192.168.1.61 (QoS = 1)
mosquitto_pub -t Room1/Light -m "Turn On" -q 2 -i p1 -h 192.168.1.61 (QoS = 2)
以Wireshark 擷取封包以了解 Broker與Publisher如何達到QoS要求。

QoS=0

QoS=1

QoS=2
Control Package Flow
  • QoS = 0: 最多一次,不管是否被讀取,下一個PUBLISH MESSAGE會馬上再送出的狀況下使用。這種模式適用於傳送大量且即時的資訊,例如live video或溫濕度。
  • QoS =1: 至少一次,確定messages已送到,因此可能會重複送許多次。
  • QoS =2: 正確送達一次,適合如帳務系統等,確保不會重複送且沒有遺失messages。

四、Clean Session, Retain, Will message

1. Retained Messages

  • 一般情況下Broker轉送出publisher送來的topic給所有已連線且訂閱此topic的subscriber之後,此topic即被清除,並不會保留在broker中。
  • 每一個topic能指定保留一個retained message,舊的retained message會被新的取代。
  • 在publisher 發布一個 topic之後,後來才連線的subscriber將無法收到以前的 topic,但是依然能收到此topic 指定的retained message。
  • 要移除retained message只要送出一個null 的retained messages即可移除。
  • client device送出SUBSCRIBE request訂閱此topic時能立即收到此topic的第一個message 是retained message。
以一個publisher 與兩個subscriber來測試,順序如下,結果請參閱影片。
  • 先publish 一個 retained message,再publish a not retained message。
  • subscribe 此toptic 則仍可取得此retained message
  • 更改retained messages內容,後來再訂閱此topic的subscriber會收到更新的retained message
  • 送出null retained message 移除retained message。


2. Will Message

當client(publisher or subscriber)發生I/O錯誤或網路異常斷線(非正常diconnect或超過keepalive time broker未收到任何ping request)時,server(broker)則可代為發出此client事先指定Will Message。
測試一個publisher與一個subscriber均使定Will Message,另一個subscriber接收此Will Message,結果請看下列影片。
CONNECT variable header and payload filed



異常斷線下測試Will Message

3. Clean Session

當Subscriber CONNECT設定clean flag(=0) disable時,在subscriber重新連線時,可再收到離線期間所有未收到的messages。
  • QoS必須為1 or 2
  • 必須使定client ID。
connect flags的clean session = 0

離線後重新連線後,立即收到Broker送來離線時未收取的topic的message

mosquitto_pub與mosquitto_sub指令測試影片

五、使用Username & Password驗證

為了增加安全性,使用username/password驗證,讓只經過驗證的使用者才能Connect上Broker。
Mosquitto Broker configuration設定檔案在/etc/mosquitto/mosquitto.conf,增加
  • allow_anonymouse false #default is true
  • password_file  pwd_file_path #指定username/password存放位置
使用mosquitto_passwd 指令產生 password file:
  • touch pwd_file_path      <--建立password file
  • mosquitto_passwd -b pwd_file_path user1 passwd1    <--建立username/password 
  • 需要reload mosquitto service功能才會生效
由Wireshark擷取封包看看client CONNECT 到 Broker時增加那些內容。
MQTT Specifications說明CONNACK(connect ack) 的 variable header 第二個byte會回應connect結果

Byte 2: Connect Return code values
以指令 mosquitto_pub -t Room1/Light -m test  -i pub1  -h 192.168.1.61 不含username/password,驗證失敗,所以return code=0x5。
擷取wireshark如下:

以指令mosquitto_pub -t Room1/Light -m test  -u user1 -P user1test -h 192.168.1.61, 
username/password 驗證成功後,connect act return code為0

但是上圖顯示的username/password為明碼,因此使用在網際網路上仍有安全問題,因此需要將client與server之間的通訊透過加密完成。

六、使用SSL/TLS安全憑證

如上節所使用username/password認證client端,所傳送的內容均為明碼,實用時內容可能被截取,因此需使用加密與憑證,使用openssl 產生self-signed certificate,所有傳送MQTT加密,如下如wireshark所擷取
以openssl來製作server 與 client的primary key 與certification,說明如下步驟:

1.產生一組CA key:
 openssl genrsa -des3 -out ca.key 2048

2.製作一組CA 憑證:
 openssl req -new -x509 -days 3650 -key ca.key -out ca.crt

3.製作server key:
openssl genrsa -out server.key 2048
4. 使用server key製作server憑證需求(certificate request):
 openssl req -new -out server.csr -key server.key
Common name輸入server hostname
5.製作server憑證:
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 360


6. 將ca.crt複製到 /etc/mosquitto/ca_certificates,server.key與server.crt複製到/etc/mosquitto/certs
7.更改/etc/mosquitto/mosquitto.conf設定
在/etc/mosquitto/mosquitto.conf後面增加以下幾行:
port 8883
cafile /etc/mosquitto/ca_certificates/ca.crt
keyfile /etc/mosquitto/certs/server.key
certfile /etc/mosquitto/certs/server.crt
require_certificate true
tls_version tlsv1.2

若要同時以port 1883接收明碼內容,以port 8883接收SSL/TLS加密內容,mosquitto.conf設定如下:
port 1883 
 
listener 8883
cafile /etc/mosquitto/ca_certificates/ca.crt
keyfile /etc/mosquitto/certs/server.key
certfile /etc/mosquitto/certs/server.crt
require_certificate true
tls_version tlsv1.2

8.製作client private key:
openssl genrsa -out client.key 2048
9.使用client private key製作client certificate request:
openssl req -new -out client.csr -key client.key
10.製作client憑證:
 openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out client.crt -days 360
11將ca.crt, client.key, client.crt複製到client端

七、整合測試

publisher 與 subscriber使用TLS(port 8883)連線,並且使用username/password驗證。publisher 設定will message,  subscriber 同時subscribe正常topic 與will topic,測試如下影片

測試MQTT Client Android APP 以TLS port 8883 連接mosquitto broker: