パターン 2. 外部サーバーから定期的にデータを取得し、登録を行いたい

本パターンでは、外部サーバーから定期的にデータを取得し、そのデータを登録します。

クラウドアプリケーション内の構成は以下の通りです。

パターン02

No.

リソース名

概要

1

定期実行

AWS EventBridgeを利用して、カスタムLambdaを定期実行します。

2

カスタム処理

外部サーバからデータを取得し、ElasticsearchアクセスLambda経由で登録します。

3

Elasticsearchアクセス

myiot-rel-es-access-lambda
My-IoTデータストアに検索、登録等を行います。

4

Elasticsearch

My-IoTデータストアです。エッジアプリから送信されたデータが蓄積されています。

CloudFormationテンプレート例

外部サーバに対して、1分ごとに接続してデータを取得し、Elasticsearchに登録します。

本パターンにおけるCloudFormationテンプレートを作成します。
各項目についての設定の詳細はAWSのドキュメントを参照してください。
※yml/yamlファイルの場合に、IoTストアでは!GetAttなど、短縮形の構文で組み込み関数は使用できないため、Fn::GetAttのように完全名関数の構文で記述する必要があります。

テンプレート作成する際の注意事項として以下のコメント種別で説明をします。

コメント種別

内容

+

利用目的に応じて開発者側で適切な値の設定が必要な箇所を示しています

!

My-IoTが提供する共通リソースに関する記載のため変更禁止の箇所を示しています

*

その他の補足説明を示しています

yaml形式の場合の例

AWSTemplateFormatVersion: '2010-09-09'
Description: An AWS function.
Resources:
#************************************************************
# Lambda Function
# No2.カスタム処理Lambdaのテンプレート例です
#
  sipSamplePattern02lm:
    Type: 'AWS::Lambda::Function'
    Properties:
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      # ソースコードの格納先はIoTストアで展開時に自動設定されるため記載しないでください
      # Code:
      #   S3Bucket: 
      #   S3Key: 
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      Description: ''
      Handler: lambda_function.lambda_handler
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      # ロールは変更しないでください
      #
      Role: 'arn:aws:iam::946501391975:role/sip-sample-lambda-role'
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      Runtime: python3.7
      MemorySize: 128
      Timeout: 30
      #************************************************************
      # Lambdaのファンクション名は導入時にIoTストアにて一意の名称に変換されます
      # テンプレート内では任意の名称で構いません
      #
      FunctionName: 'sip-sample-pattern-02-lm'
      #************************************************************
      Environment:
        Variables:
        #************************************************************
        # カスタムLambdaが参照する環境変数と値を定義します
        #
          #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
          # 下記の項目は変更しないでください
          #
          # ES_ACCESS_LAMBDA: No.3 ElasticsearchアクセスLambda関数名を指定しています
          # 
          ES_ACCESS_LAMBDA: myiot-rel-es-access-lambda
#
# Lambda Function
#************************************************************

#************************************************************
# Lambda Permission
# No2.カスタム処理の Lambda Permissionのテンプレート例です
#
  LambdaPermission:
    Type: 'AWS::Lambda::Permission'
    Properties:
      Action: 'lambda:InvokeFunction'
      Principal: 'events.amazonaws.com'
      FunctionName: 
        Ref: sipSamplePattern02lm
      SourceArn: 
        Fn::GetAtt: Schedule.Arn
#
# Lambda Permission
#************************************************************

#************************************************************
# EventBridge
# No1.定期実行のテンプレート例です
#
  Schedule: 
    Type: 'AWS::Events::Rule'
    Properties:
      Description: ''
      #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
      # 定期実行される間隔を記載してください
      # 最短1分間隔で登録可能です(AWS EventBridgeの仕様)
      #
      ScheduleExpression: 'cron(* * ? * * *)'
      #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
      Targets:
        #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
        # 下記の項目は作成するクラウドアプリケーションに応じて変更してください
        # Arn: 定期実行されるLambdaのArnを記載してください
        #      "GetAtt"の後ろにLambdaのリソース名を記載し最後に".Arn"をつけてください
        # Id:  任意のIDを設定してください
        - Arn: 
            Fn::GetAtt: sipSamplePattern02lm.Arn
          Id: sipSamplePattern02
        #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
# EventBridge
#************************************************************

カスタム処理ソースコード例(Python)

注意

外部サーバへの接続・アクセス処理は外部サーバのIF仕様に合わせて実装してください。

import boto3
import json
import os
import traceback
import logging
from datetime import datetime
from dateutil import tz

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# 環境変数
# ElasticSearchアクセスLambda
ES_ACCESS_LAMBDA = os.environ.get("ES_ACCESS_LAMBDA")
# テナントID
TENANT_ID = os.environ.get('TENANT_ID')

# 外部サーバへリクエストするSQL文のフォーマットです。
SERVER_SQL = "SELECT * FROM sample_table"


def connect_db(connect_str):
    """外部サーバへ接続します。

    Args:
        connect_str (str): 外部サーバ接続情報

    Returns:
       cursor : 外部サーバ接続オブジェクト
    """

    #==============================================================================
    # 外部サーバへ接続するための処理を記載してください
    #==============================================================================
    cursor = ""

    return cursor

def query_sql(cursor, sql):
    """外部サーバへの検索を実行します。

    Args:
        cursor (cursor): 外部サーバ接続オブジェクト
        sql (str): 検索クエリ(SQL)

    Returns:
        rtn: 検索データ
    """

    #==============================================================================
    # 外部サーバからデータを検索し、データを取得する処理を記載してください
    #==============================================================================
    rtn = {
        "key1":"value1",
        "key2":"value2"
    }

    return rtn


def post_es(index, items):
    """ElasticSearchアクセスAPIで外部サーバから取得したデータを登録します。

    Args:
        index (str): 登録するElasticSearchのインデックス名
        items (list): 登録するデータリスト

    Returns:
        なし
    """

    #==============================================================================
    # Elasticsearchにアクセスする際に必要な引数を設定しています。
    # 詳細については【PF仕様書】共通リソースの利用方法(API仕様など)No2 IoTデータへのアクセス
    # を参照してください。
    # ここでは操作内容や、登録データなどを設定しています。
    #==============================================================================
    payload = {
        'method': 'Post',
        'index': index,
        'item': items.copy()
    }

    #==============================================================================
    # No2.Elasticsearchアクセス
    # 環境変数で定義されているmyiot-rel-es-access-lambdaに
    # リクエストを実施し、結果を取得します。
    #==============================================================================
    res = boto3.client('lambda').invoke(
        FunctionName=ES_ACCESS_LAMBDA,
        InvocationType='RequestResponse',
        Payload=json.dumps(payload)
    )


def get_server_data():
    """外部サーバのデータをElasticSearchに登録する処理です。

    Args:
        なし
    Returns:
        なし
    """

    #==============================================================================
    # 外部サーバへ接続するための情報取得処理を記載してください
    #==============================================================================
    connection = ""

    # 外部サーバへ接続します。
    cursor = connect_db(connection)
    
    # ElasticSearchに登録する際のインデックス名をテナントIDと日付から生成する(テナントID_YYYY.MM.DD)
    JST = tz.gettz('Asia/Tokyo')
    today = datetime.now(JST).strftime('%Y.%m.%d')
    es_index = '{}_{}'.format(TENANT_ID, today)

    # 外部サーバからデータを取得する際に必要なSQL文を生成します。
    sql = SERVER_SQL

    # 外部サーバからデータを取得します。
    es_items = query_sql(cursor, sql)

    # ElasticSearchに取得したデータを登録します。
    post_es(es_index, es_items)


# Lambda関数の処理の入口
def lambda_handler(event, context):
    try:
        # 外部サーバのデータをElasticSearchに登録する
        get_server_data()
    except Exception as e:
        logger.error(traceback.format_exc())
        raise e