パターン 5. 蓄積したデータを条件にイベント登録したい

注意

パターン5は現在未対応です。対応につきましては、2022年以降を予定しております。

本パターンでは、My-IoTデータストアに蓄積されたデータを定期的にチェック。条件を満たしたとき、イベント登録を行います。
(パターン6と組み合わせることで、エッジ側にイベントによるメッセージを送信することができます)

クラウドアプリケーション内の構成は以下の通りです。

パターン05

No.

リソース名

概要

1

定期実行

No2のカスタム処理を定期的に実行するLambda。

2

カスタム処理

My-IoTデータストアに蓄積したデータの検索、取得したデータの解析及び、イベント発行を行います。

3

Elasticsearchアクセス

myiot-rel-es-access-lambda
My-IoTデータストアに検索、登録等を行います。

4

Elasticsearch

My-IoTデータストアです。エッジアプリから送信されたデータが蓄積されています。

5

DynamoDBアクセス

イベントテーブルにイベントデータの登録を行います。

6

DynamoDB

イベントデータを蓄積するデータベースです。

CloudFormationテンプレート例

本パターンにおけるCloudFormationテンプレートを作成します。
各項目についての設定の詳細はAWSのドキュメントを参照してください。
※yml/yamlファイルの場合に、IoTストアでは!GetAttなど、短縮形の構文で組み込み関数は使用できないため、Fn::GetAttのように完全名関数の構文で記述する必要があります。

テンプレート作成する際の注意事項として以下のコメント種別で説明をします。

コメント種別

内容

+

利用目的に応じて開発者側で適切な値の設定が必要な箇所を示しています

!

My-IoTが提供する共通リソースに関する記載のため変更禁止の箇所を示しています

*

その他の補足説明を示しています

yaml形式の場合の例

AWSTemplateFormatVersion: '2010-09-09'
Description: An AWS function.
Resources:
#************************************************************
# Lambda Function
# No2.カスタム処理Lambdaのテンプレート例です
#
  sipSamplePattern05lm:
    Type: 'AWS::Lambda::Function'
    Properties:
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      # ソースコードの格納先はIoTストアで展開時に自動設定されるため記載しないでください
      # Code:
      #   S3Bucket: 
      #   S3Key: 
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      Description: ''
      Handler: lambda_function.lambda_handler
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      # ロールは変更しないでください
      #
      Role: 'arn:aws:iam::946501391975:role/sip-sample-lambda-role'
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      Runtime: python3.7
      MemorySize: 128
      Timeout: 30
      #************************************************************
      # Lambdaのファンクション名は導入時にIoTストアにて一意の名称に変換されます
      # テンプレート内では任意の名称で構いません
      #
      FunctionName: 'sip-sample-pattern-05-lm'
      #************************************************************
      Environment:
        Variables:
        #************************************************************
        # カスタムLambdaが参照する環境変数と値を定義します
        #
          #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
          # 下記の項目は変更しないでください
          #
          # CTRL_LAMBDA:      No.2 ElasticsearchアクセスLambda関数名を指定しています
          # DB_ACCESS_LAMBDA: No.5 DynamoDBアクセスLambda関数名を指定しています
          # TABLE_NAME:       DynamoDB上のイベントテーブルのテーブル名を指定しています
          # 
          CTRL_LAMBDA: myiot-rel-es-access-lambda
          DB_ACCESS_LAMBDA: myiot-rel-dynamodb-access-lambda
          TABLE_NAME: sip-sample-event-dynamodb
          #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
          #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
          # 下記の項目は作成するクラウドアプリケーションに応じて変更してください
          # 
          # INDEX_NAME:  イベント登録を判断するための解析対象のデータが登録されているインデックスを指定します
          # THRESHOLD:   イベント登録を判断するための閾値を設定してください
          # EVENT_NAME:  DynamoDBに通知するイベント名を記載してください
          #              指定した文字列がイベント名としてイベントテーブルに登録されます
          #
          INDEX_NAME: index_name
          THRESHOLD: '100'
          EVENT_NAME: light_alert
          #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
# Lambda Function
#************************************************************

#************************************************************
# Lambda Permission
# No2.カスタム処理のLambda Permissionのテンプレート例です
#
  LambdaPermission:
    Type: 'AWS::Lambda::Permission'
    Properties:
      Action: 'lambda:InvokeFunction'
      Principal: 'events.amazonaws.com'
      FunctionName: 
        Ref: sipSamplePattern05lm
      SourceArn: 
        Fn::GetAtt: Schedule.Arn
#
# Lambda Permission
#************************************************************

#************************************************************
# EventBridge
# No1.定期実行のテンプレート例です
#
  Schedule: 
    Type: 'AWS::Events::Rule'
    Properties:
      Description: ''
      #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
      # 定期実行される間隔を記載してください
      # 最短1分間隔で登録可能です
      #
      ScheduleExpression: 'cron(* * ? * * *)'
      #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
      Targets:
        - Arn: 
            Fn::GetAtt: sipSamplePattern05lm.Arn
          Id: sipSamplePattern05
#
# EventBridge
#************************************************************

カスタム処理ソースコード例(Python)**

import json
import sys
import logging
import os
import datetime
import boto3

# Loggerオブジェクトを取得し、表示するレベルを設定します。
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# 呼び出すサービスを指定します。
lambdaClient = boto3.client("lambda")

# 環境変数の値を取得します。
DB_ACCESS_LAMBDA = os.environ.get('DB_ACCESS_LAMBDA')
INDEX_NAME = os.environ.get('INDEX_NAME')
CTRL_LAMBDA = os.environ.get('CTRL_LAMBDA')
THRESHOLD = int(os.environ.get('THRESHOLD'))
EVENT_NAME = os.environ.get('EVENT_NAME')
TABLE_NAME = os.environ.get('TABLE_NAME')

def call_db_access(edgeId):
    """イベントテーブルへデータを登録します
    Args:
        edgeId: エッジID
    Returns:
        なし
    """
    try:
        # 現在時刻(日本時間)を取得し、unixtime型にします。
        dt = datetime.datetime.now(tz=datetime.timezone(datetime.timedelta(hours=9)))
        unixtime = int(dt.timestamp())

        detail = {
            'edgeId':edgeId
        }

        #==============================================================================
        # DynamoDBにアクセスする際に必要な引数を設定しています。
        # 詳細については【PF仕様書】共通リソースの利用方法(API仕様など)No4 イベントデータの登録
        # を参照してください。
        # ここでは操作内容や、イベント名などを設定しています。
        #==============================================================================
        payload = {
            "method":"Put",
            "table":TABLE_NAME,
            "params":{
                'source':'light_sample',
                'time':unixtime,
                'name':EVENT_NAME,
                'detail':detail,
                'closed':0
            }
        }

        #==============================================================================
        # No4.DynamoDBアクセス
        # 環境変数で定義されているmyiot-rel-dynamodb-access-lambdaに
        # リクエストを実施し、結果を取得します。
        #==============================================================================
        lambdaClient.invoke( 
            FunctionName=DB_ACCESS_LAMBDA, 
            InvocationType="RequestResponse", 
            Payload=json.dumps(payload)
        )
    except Exception:
        import traceback
        logger.error(traceback.format_exc())


def data_check(data):
    """条件を満たしたとき、DynamoDBにイベント情報を登録します
    Args:
        data: 検索結果
    Returns:
        なし
    """
    try:
        # データを取得します。
        result_data = data['aggregations']['aggs_edgeId']['buckets'][0]["avg_light"]["value"]
        # edgeIdを取得します。
        edgeId = data['aggregations']['aggs_edgeId']['buckets'][0]['key']

        # 閾値未満か確認します。
        if result_data < THRESHOLD:
            # DynamoDBアクセスLambdaに通知します。
            call_db_access(edgeId)

    except Exception:
        import traceback
        logger.error(traceback.format_exc())


def es_access():
    """Elasticsearchへ検索を行います
    Args:
        なし
    Returns:
        なし
    """

    # 現在時刻(日本時間)を取得し、1000倍(ミリ秒付与)してunixtime型にします。
    dt = datetime.datetime.now(tz=datetime.timezone(datetime.timedelta(hours=9)))
    unixtime = int(dt.timestamp() * 1000)
    # ミリ秒で5分間を計算します。
    be_time = (60*5)*1000

    #==============================================================================
    # Elasticsearchにリクエストする検索文を設定します。
    # このサンプルで想定しているプロパティは以下の通りです。
    #
    # プロパティ名:型  :説明
    # edgeId     :text:エッジID
    # time       :date:時刻情報
    # light      :long:光度
    #
    # サンプルクエリは同一のedgeIdにおける5分間の光度の平均値を取得するものです。
    #==============================================================================
    body = {
        "query": {
            "range": {
                "time": {
                    "gte": unixtime - be_time,
                    "lte": unixtime
                }
            }
        },
        "size":0,
        "aggs":{
            "aggs_edgeId":{
                "terms": {
                    "field": "edgeId.keyword",
                    "size": 1
                },
                "aggs": {
                    "avg_light": {
                        "avg": {
                            "field": "light"
                        }
                    }
                }
            }
        }
    }

    try:
        #==============================================================================
        # Elasticsearchにアクセスする際に必要な引数を設定しています。
        # 詳細については【PF仕様書】共通リソースの利用方法(API仕様など)No2 IoTデータへのアクセス
        # を参照してください。
        # ここでは操作内容や、検索文などを設定しています。
        #==============================================================================
        Payload = {
            'method': 'Get',
            'index': INDEX_NAME,
            'query': body
        }

        #==============================================================================
        # No2.Elasticsearchアクセス
        # 環境変数で定義されているmyiot-rel-es-access-lambdaに
        # リクエストを実施し、結果を取得します。
        #==============================================================================
        res = lambdaClient.invoke(
            FunctionName=CTRL_LAMBDA,
            InvocationType='RequestResponse',
            Payload=json.dumps(Payload)
        )

        response = json.loads(res['Payload'].read())
        # 正常終了か確認します。
        if response['statusCode'] == 200:
            # 検索結果が1件以上の場合、データありと判断しデータチェック処理を行います。
            if response["body"]["result"]["hits"]["total"]["value"] > 0:
                data_check(response["body"]["result"])
            else:
                return
        else:
            return
    except Exception:
        import traceback
        logger.error(traceback.format_exc())
    return


# Lambda関数の処理の入口
def lambda_handler(event, context):

    # Elasticsearchに蓄積されたデータをチェックし、条件を満たしたとき、DynamoDBにイベント情報を登録する。
    return es_access()