Azure連携(6):クラウドからのMQTT通信を受信(Python)

<< 前の記事:「Azure連携(5):Azure FunctionsからデバイスへのMQTT送信(C#)」
>> 次の記事:「Azure連携(7):IoT Hub受信データをPower BIで可視化」 … <準備中>

この記事は PLCnext Control に限らず一般的なパソコン等にも適用可能です。

前回の記事で、エッジデバイスからクラウド送信したデータをクラウド上で処理し、その結果をクラウドからエッジデバイスに向けて送信するところまでを実現しました。

今回は、エッジデバイスである PLCnext Control 上の Python コードでクラウドからの MQTT 通信を受信する方法について説明します。

目次

基本編:単純な文字列の受信

まず、ごく基本的なデータ受信のコードを紹介します。これさえ出来れば、あとは自分で応用範囲を広げることが可能です。

エッジデバイス側にデータ受信用コードを用意して実行

STEP
「受信用デバイス」の接続文字列を取得

前回記事の「デバイスの(追加)登録」で Azure IoT Hub へ登録した「受信用デバイス」の接続文字列を、以下の手順で取得してください。Python のコード内で使用します。

STEP
デバイス側の Python 環境の準備

前回までの記事では、PLCnext Control (エッジデバイス) はクラウドへデータを送信するための 1 台しか登場しませんでしたが、今回はクラウドからのデータ受信用に別の個体の PLCnext Contorl を利用することを想定しています。(同じ個体で送信用、受信用両方のコードを実行しても構いません)

もし新しい PLCnext Control の Python 環境が整っていない場合は、「Azure連携(2):デバイスからIoT HubへのMQTT送信(Python)」の「デバイス側の環境準備」の項目を参考に azure.iot.device モジュールのインストール等を行ってください。

STEP
データ受信のシンプルな Python コードを作成

以下のプログラムを「sample_receicve_str.py」のファイル名で作成してください。

CONNECTION_STRING 変数の部分は、先ほど「受信用デバイス」について実際に取得したデバイスの接続文字列を入力してください。

ファイルをパソコン上で作成した場合は、「Azure連携(2):デバイスからIoT HubへのMQTT送信(Python)」記事の「プログラムファイルを PLCnext Control 上へ配置」を参考にして、PLCnext Control のディレクトリへ配置※してください。
(※ターミナルソフトから SSH で PLCnext Control にログインして nano エディターなどで直接書いても構いません。)

from time             import sleep
from datetime         import datetime, timedelta, timezone
from azure.iot.device import IoTHubDeviceClient

# デバイスの接続文字列
CONNECTION_STRING = "****************************************************************************************************************"

# ========= メッセージ受信ハンドラ =========
def message_handler(message):
    # タイムスタンプ取得
    timestamp_utc     = datetime.now(timezone.utc)                      # 現在のUTCのタイムスタンプを取得
    timestamp_jst     = timestamp_utc + timedelta(hours)                # 9時間を加算
    readable_time_jst = timestamp_jst.strftime('%Y-%m-%d %H:%M:%S JST') # 人間が読みやすい形式に整形

    # 受信したデータを表示
    recv_str = message.data.decode()
    print(f'[{readable_time_jst}] Received message: {recv_str}', flush = True)

# ========= メインループ =========
while True:
    try:
        # IoTHubDeviceClient インスタンスを作成します
        client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)
        # メッセージハンドラを設定します
        client.on_message_received = message_handler

        # 非同期操作を待つために、無限ループを開始します
        while True:
            sleep(1)

    except Exception as e:
        print(f"Caught exception: {e}. Restarting...", flush = True)

なおこのコードは、「Azure連携(2):デバイスからIoT HubへのMQTT送信(Python)」でご紹介したエッジデバイスからクラウドへ MQTT でデータを送信するためのものと基本的な構造は同じで、データ送信のロジックを省き、代わりにデータ受信時のコールバック関数を追加しています。

データ受信プログラムを実行

PLCnext Control へターミナルソフトから SSH ログインし、以下のコマンドを入力して Python のプログラムを実行してください。

python3 sample_receicve_str.py

成功すると、データ受信を待機します。

これで、クラウドからデータを受信する準備が整いました。

今回の記事はエッジデバイス用のデータ受信コードの作成を目的としているので Python のコードを実行していますが、単に通信状態を確認したいだけであれば、前回記事の「CLI でクラウドから送信されたデータを受信」を参考に PC で受信確認を行うことも可能です。

Azure ポータルから文字列を送信

Azure ポータルからデバイスを選択し、以下の様に任意の文字列を送信してください。

データ受信の確認

正しく受信できれば、ターミナルに以下の様に表示されます。

応用編:JSONデータを受信して処理

次に、クラウドからの受信データが JSON である場合を考えてみましょう。

といっても、Python コードとしては先ほどのコードへ単純に JSON 形式の文字列から各キーの値を取り出すだけです。このような処理は標準ライブラリの json モジュールでシンプルに実現できます。

前回までの記事で Azure 上の CosmodDB トリガー関数からエッジデバイスへ向けて JSON 文字列を送信できるようになっていますので、今回はそのデータを受信して処理することとします。

受信データの構成を確認

前回記事「Azure連携(5):Azure FunctionsからデバイスへのMQTT送信(C#)」の「送信するデータの種類」の項目で定義した、Azure からエッジデバイスへ送信する JSON 文字列の内容を再確認すると以下の通りです。

{"TEMP_WARN_LVL": 0, "HUM_WARN_LVL": 0, "PWR_WARN_LVL": 2}

Python コードの修正

先ほどの Python コードのファイルをコピーを作り、ファイル名を sample_receicve_json.py として、以下の赤でハイライトされた内容を追加してください。

json モジュールの追加

ファイル先頭の import に json モジュールを追加してください。

from time             import sleep
from datetime         import datetime, timedelta, timezone
from azure.iot.device import IoTHubDeviceClient
import json
JSON 文字列の処理を追加

message_handler 関数に、JSON文字列から特定のキーを値を取り出すコードを追加してください。

# ========= メッセージ受信ハンドラ =========
def message_handler(message):
    # タイムスタンプ取得
    timestamp_utc     = datetime.now(timezone.utc)                      # 現在のUTCのタイムスタンプを取得
    timestamp_jst     = timestamp_utc + timedelta(hours                 # 9時間を加算
    readable_time_jst = timestamp_jst.strftime('%Y-%m-%d %H:%M:%S JST') # 人間が読みやすい形式に整形

    # 受信したデータを表示
    recv_str = message.data.decode()
    print(f'[{readable_time_jst}] Received message: {recv_str}', flush = True)

    # 受信したデータを JSON として各要素に分解
    received_json = json.loads(recv_str)
    temp_warn_lvl_val = received_json["TEMP_WARN_LVL"]
    hum_warn_lvl_val  = received_json["HUM_WARN_LVL"]
    pwr_warn_lvl_val  = received_json["PWR_WARN_LVL"]
    print(f'    temp_warn_lvl_val = {temp_warn_lvl_val}', flush = True)
    print(f'    hum_warn_lvl_val  = {hum_warn_lvl_val}', flush = True)
    print(f'    pwr_warn_lvl_val  = {pwr_warn_lvl_val}', flush = True)

動作確認

データ受信用 PLCnext Control 上でデータ受信プログラムを実行

データ受信用 PLCnext Control へターミナルソフトから SSH ログインし、以下のコマンドを入力して Python のプログラムを実行してください。

python3 sample_receicve_json.py

成功すると、データ受信を待機します。

Azure クラウド側の CosmosDB トリガ関数を確認

前回記事の「応用編:エッジデバイスからの受信データ処理結果をエッジデバイスへ送信」で作成した Function が Azure 上にデプロイされていることを確認し、されていなければデプロイし直してください。

Azure ポータル上の関数アプリ画面

データ送信用 PLCnext Control 上でデータ送信プログラムを実行

記事「Azure連携(2):デバイスからIoT HubへのMQTT送信(Python)」の「プログラムの実行」の項目と同様に、データ送信用 PLCnext Control へターミナルソフトから SSH ログインし、以下のコマンドを入力して Python のプログラムを実行してください。

python3 sample_to_send.py

動作結果の確認

正常に動作すると、以下の様に各 PLCnext Control のターミナルで、片側の PLCnext Control から送信された情報がAzure 上で処理され、Azure から送信された処理結果を別の PLCnext Control で受信出来ていることを確認することが出来ます。

送信用 Python コードと受信用 Python コードを同一個体の PLCnext Control で実行することも可能です。
その場合は、ターミナルの画面を送信用・受信用に別々に開いて使用してください。

更なる応用案:実際のセンサ値の取込みと外部機器制御

この項目は、PLCnext Control 特有の機能についての解説です。

この Azure 記事シリーズの第 1 回で紹介した「クラウドを使って実現したい内容」の図をもう一度見てみましょう。

これまでのサンプルプログラムをからわかるように、PLCnext Control がクラウドへ送信するデータやクラウドから受信するデータは、Python コード内の変数に格納されています。

一方、センサ計測値や外部の IO 制御情報は、Python コードではなく PLCnext Runtime プロセスの中で取り扱われています。従って、Python コード側から PLCnext Runtime が管理している記憶域に対して読み書きを行うことで上記の図のような実用的な動作をさせることができます。

PLCnext Control のソフトウエア構造
https://www.plcnext.help/te/About/Home.htm より引用、加筆

方法はいくつかありますが、RESTAPI を使うシンプルな方法を下記のリンクの記事でご紹介していますので、ご興味のある方は是非ご参照ください。

[リンク] PythonとREST APIによるPLCnext Runtime変数の操作

以上で、現実世界で得た計測値をクラウドへ送り、その処理結果を現実世界のデバイスの制御へ反映することができるようになりました。

<< 前の記事:「Azure連携(5):Azure FunctionsからデバイスへのMQTT送信(C#)」
>> 次の記事:「Azure連携(7):IoT Hub受信データをPower BIで可視化」 … <準備中>

  • URLをコピーしました!
目次