パターン 6. IoT Coreを介してエッジ側にメッセージを送信したい

注意

パターン6のDynamoDBからイベントデータを受け取る処理は現在未対応です。対応につきましては、2022年以降を予定しております。

本パターンでは、イベントデータがDyanmoDBに登録されたタイミングでDynamoDBからイベントデータを受け取り、IoT Coreを介してエッジ側にメッセージを送信します。

クラウドアプリケーション内の構成は以下の通りです。

パターン06

No.

リソース名

概要

1

DynamoDB

イベントのトリガーになるデータベースです。

2

イベント制御

DynamoDBに登録されたイベントデータを、カスタムLambdaに通知して実行します。

3

カスタム処理

イベント制御Lambdaからリクエストを受け取り、制御指示Lambdaを実行します。

4

MQTT Publish

カスタムLambdaからメッセージを受け取り、MQTT Publishを実行します。

5

IoT Core

エッジ側にメッセージを送信します。

CloudFormationテンプレート例

本パターンにおけるCloudFormationテンプレートを作成します。
各項目についての設定の詳細はAWSのドキュメントを参照してください。
※yml/yamlファイルの場合に、IoTストアでは!GetAttなど、短縮形の構文で組み込み関数は使用できないため、Fn::GetAttのように完全名関数の構文で記述する必要があります。

テンプレート作成する際の注意事項として以下のコメント種別で説明をします。

コメント種別

内容

+

利用目的に応じて開発者側で適切な値の設定が必要な箇所を示しています

!

My-IoTが提供する共通リソースに関する記載のため変更禁止の箇所を示しています

*

その他の補足説明を示しています

yaml形式の場合の例

AWSTemplateFormatVersion: '2010-09-09'
Description: An AWS function.
Resources:
#************************************************************
# Lambda Function
# No3.カスタム処理Lambdaのテンプレート例です
#
  sipsampleprocessorderlm:
    Type: 'AWS::Lambda::Function'
    Properties:
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      # ソースコードの格納先はIoTストアで展開時に自動設定されるため記載しないでください
      # Code:
      #   S3Bucket: 
      #   S3Key: 
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      Description: ''
      Handler: lambda_function.lambda_handler
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      # ロールは変更しないでください
      #
      Role: 'arn:aws:iam::946501391975:role/sip-sample-lambda-role'
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      Runtime: python3.7
      MemorySize: 128
      Timeout: 10
      #************************************************************
      # Lambdaのファンクション名は導入時にIoTストアにて一意の名称に変換されます
      # テンプレート内では任意の名称で構いません
      #
      FunctionName: 'sip-sample-pattern-06-lm'
      #************************************************************
      Environment:
        Variables:
        #************************************************************
        # カスタムLambdaが参照する環境変数と値を定義します
        #
          #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
          # 下記の項目は変更しないでください
          #
          # MQTT_LAMBDA:      No.4 MQTT PublishのLambdaを示しています
          # SUBSCRIPTION:     No.5 IoT Coreに送信するトピック名の形式です
          # 
          MQTT_LAMBDA: myiot-rel-publish-mqtt-lambda
          SUBSCRIPTION: 'ex/{}/alert'
          #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
#
# Lambda Function
#************************************************************

カスタム処理ソースコード例(Python)

import logging
import json
import os
import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# トピック名。例では「ex/{}/alert」を想定しています。
SUBSCRIPTION = (os.environ.get("SUBSCRIPTION"))
# No6 IoT Coreへのメッセージ送信の関数名を入力してください。
MQTT_API_NAME = (os.environ.get("MQTT_API_NAME"))
# テナントID
TENANT_ID = (os.environ.get('TENANT_ID'))

def is_empty_str(val):
    """値が空文字列か、又はNoneかを調べます。

    Args:
        val (str/NonType): 調べる値

    Returns:
        bool: 値が空文字列か、又はNoneなら True を返す
    """

    return not val if isinstance(val, str) else val is None


def is_empty_check(val):
    """値が空か、またははNoneかを調べます。
       list,dict型に対応しています。

    Args:
        val : 調べる値

    Returns:
        bool: 値が空か、又はNoneなら True を返す
    """

    res = False
    
    if is_empty_str(val):
        res = True
    elif isinstance(val, (list,dict)):
        if not val:
            res = True
            
    return res


def mqtt_lambda_publish(event):
    """MQTT Publish APIを呼び出し、メッセージを送信します。

    Args:
        event(dict):メッセージ送信情報
    Returns:
        なし
    """
    try:
        client = boto3.client('lambda')
        # qosレベルを設定します。
        qos_level = 1

        #==============================================================================
        # MQTT Publishする際に必要な引数を設定しています。
        # 詳細については【PF仕様書】共通リソースの利用方法(API仕様など)
        # No6 IoT Coreへのメッセージ送信を参照してください。
        # ここでは送信先や、送信内容(メッセージ)などを設定しています。
        #==============================================================================
        payload = {
            "tenantId": TENANT_ID,
            "edgeId": event["detail"]["edgeId"],
            "payload":{
                "message":"アラート"
            },
            "qos":qos_level
        }

        #==============================================================================
        # No6 IoT Coreへのメッセージ送信
        # 環境変数で定義されているmyiot-rel-publish-mqtt-lambdaに
        # リクエストを実施します。
        #==============================================================================
        response = client.invoke(
            FunctionName = MQTT_API_NAME,
            InvocationType = "RequestResponse",
            Payload = json.dumps(payload)
        )
    except Exception as e:
        logger.error(e)


# Lambda関数の処理の入口
def lambda_handler(event, context):

    try:
        # 入力キーのチェックを行います。必須の引数がない場合は処理を終了します。
        if (event.get("event") is None):
            logger.error('Required tag does not exist.')
            return
        
        event = event["event"]
        
        if (event.get("detail") is None) or (event["detail"].get("edgeId") is None):
            logger.error('Required tag does not exist.')
            return

        if event.get("source") is None:
            logger.error('Required tag does not exist.')
            return

        # "source"が"light_sample"と一致しているかチェックします。
        if event["source"] != "light_sample":
            logger.error('Not match source name.')
            return

        # "edgeId"が文字列型であるかをチェックします。
        if is_empty_check(event["detail"]["edgeId"]):
            logger.error('No Value specified for edgeId.')
            return

        # MQTT送信を行います。        
        mqtt_lambda_publish(event)

    except Exception as e:
        logger.error(e)

エッジでのメッセージ受信

クラウドアプリケーションから送信したメッセージをエッジで受信するには、Node-REDフローでMy-IoT専用ノードを使用します。 My-IoT専用ノードの詳細は、My-IoT専用ノードを参照してください。