パターン 1. APIを介してデータを検索、登録を行いたい

本パターンでは、WebアプリクライアントがAPIを介してデータを検索、登録します。

クラウドアプリケーション内の構成は以下の通りです。

パターン01

No.

リソース名

概要

1

API Gateway

Webクライアントから受け取ったリクエストをカスタムLambdaに送信し、結果を返却します。

2

カスタム処理

Webクライアントからのリクエストを受けて、ElasticsearchアクセスLambdaを実行します。

3

Elasticsearchアクセス

myiot-rel-es-access-lambda
My-IoTデータストアに検索、登録等を行います。

4

Elasticsearch

My-IoTデータストアです。エッジアプリから送信されたデータが蓄積されています。

CloudFormationテンプレート例

本パターンにおけるCloudFormationテンプレートを作成します。
各項目についての設定の詳細はAWSのドキュメントを参照してください。
※yml/yamlファイルの場合に、IoTストアでは!GetAttなど、短縮形の構文で組み込み関数は使用できないため、Fn::GetAttのように完全名関数の構文で記述する必要があります。

テンプレート作成する際の注意事項として以下のコメント種別で説明をします。

コメント種別

内容

+

利用目的に応じて開発者側で適切な値の設定が必要な箇所を示しています

!

My-IoTが提供する共通リソースに関する記載のため変更禁止の箇所を示しています

*

その他の補足説明を示しています

yaml形式の場合の例

AWSTemplateFormatVersion: '2010-09-09'
Description: An AWS function.
Resources:
#************************************************************
# Lambda Function
# No2.カスタム処理Lambdaのテンプレート例です
#
  sipSamplePattern01lm:
    Type: 'AWS::Lambda::Function'
    Properties:
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      # ソースコードの格納先はIoTストアで展開時に自動設定されるため記載しないでください
      # Code:
      #   S3Bucket: 
      #   S3Key: 
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      Description: ''
      Handler: lambda_function.lambda_handler
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      # ロールは変更しないでください
      #
      Role: 'arn:aws:iam::946501391975:role/sip-sample-lambda-role'
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      Runtime: python3.7
      MemorySize: 128
      Timeout: 30
      #************************************************************
      # Lambdaのファンクション名は導入時にIoTストアにて一意の名称に変換されます
      # テンプレート内では任意の名称で構いません
      #
      FunctionName: 'sip-sample-pattern-01-lm'
      #************************************************************
      Environment:
        Variables:
        #************************************************************
        # カスタムLambdaが参照する環境変数と値を定義します
        #
          #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
          # 下記の項目は変更しないでください
          #
          # ES_ACCESS_LAMBDA:      No.3 ElasticsearchアクセスLambda関数名を指定しています
          # 
          ES_ACCESS_LAMBDA: myiot-rel-es-access-lambda
          #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
#
# Lambda Function
#************************************************************

#************************************************************
# ApiGateway
# No1.API Gatewayのテンプレート例です
#
  RestAPI:
    Type: AWS::ApiGateway::RestApi
    Properties:
      #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
      # 下記の項目は作成するクラウドアプリケーションに応じて変更してください
      # 
      # Description: 説明文を記載してください
      # ※空文字はエラーとなります。不要の場合は「Description」を削除してください
      # Name: API名を記載してください
      #
      Description: 'sip-sample-pattern-01 RestApi'
      Name: 'sample-apigw'
      #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

  RestAPIDeployment:
    Type: AWS::ApiGateway::Deployment
    Properties:
      Description: ''
      RestApiId: 
        Ref: RestAPI
    DependsOn:
      - RestAPIMethod
      - RestAPIResource

  RestAPIStage:
    Type: AWS::ApiGateway::Stage
    Properties:
      RestApiId: 
        Ref: RestAPI
      DeploymentId: 
        Ref: RestAPIDeployment
      #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
      # 下記の項目は作成するクラウドアプリケーションに応じて変更してください
      # 
      # StageName: ステージ名を記載してください
      #
      StageName: 'sample-stage'
      #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

  RestAPIResource:
    Type: AWS::ApiGateway::Resource
    Properties:
      PathPart: "{proxy+}"
      ParentId: 
        Fn::GetAtt: RestAPI.RootResourceId
      RestApiId: 
        Ref: RestAPI

  RestAPIMethod:
    Type: AWS::ApiGateway::Method
    Properties:
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      # 下記の項目は変更しないでください
      #
      # HttpMethod:        メソッド種別を指定しています
      # AuthorizationType: 認証種別を指定しています
      # Integration:       プロキシ統合に関する設定を指定しています
      # 
      HttpMethod: ANY
      AuthorizationType: NONE
      RequestParameters: 
        "method.request.path.proxy": true
      Integration:
        CacheKeyParameters: 
          - "method.request.path.proxy"
        CacheNamespace: 
          Ref: RestAPIResource
        IntegrationHttpMethod: POST
        Type: AWS_PROXY
        Uri: 
          Fn::Sub: "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${sipSamplePattern01lm.Arn}/invocations"
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      ResourceId: 
        Ref: RestAPIResource
      RestApiId: 
        Ref: RestAPI

#
# ApiGateway
#************************************************************

#************************************************************
# LambdaPermission
#
  LambdaPermission:
    Type: 'AWS::Lambda::Permission'
    Properties:
      Action: 'lambda:InvokeFunction'
      Principal: 'apigateway.amazonaws.com'
      FunctionName: !GetAtt sipSamplePattern01lm.Arn
#
# LambdaPermission
#************************************************************

カスタム処理ソースコード例(Python)

import json
import boto3
import copy
import os
import logging
from datetime import datetime
from dateutil import tz

logger = logging.getLogger()
logger.setLevel(logging.INFO)

ES_ACCESS_LAMBDA = os.environ.get('ES_ACCESS_LAMBDA')
# テナントID
TENANT_ID = os.environ.get('TENANT_ID')
# コネクタID
CONNECTOR_IDS = os.environ.get('CONNECTOR_INFO')

# エラーコード&メッセージ
ERR_INVALID_ARGUMENT = 400
ERR_INTERNAL_ERROR = 500

ERR_MSG_INVALID_ARGUMENT = {'message':'Invalid Argument.'}
ERR_MSG_INTERNAL_ERROR = {'message':'Internal error.'}


def is_empty_str(val):
    """値が空文字列か、又はNoneかを調べます。

    Args:
        val (str/NonType): 調べる値

    Returns:
        bool: 値が空文字列か、又はNoneなら True を返す
    """

    return not val if isinstance(val, str) else val is None
 
    
def make_response(code, body):
    """実行結果をJSON形式で返します。

    Args:
        code (int): ステータスコード
        body (str): 実行結果

    Returns:
        [dict]: 実行結果
    """
    response = {
        'statusCode': code,
        "headers":{"Content-Type": "application/json"},
        'body': json.dumps(body),
        "isBase64Encoded": False
    }

    return response
    

def es_query_access(event):
    """ElasticSearchアクセスAPIを呼び出し、データの検索を行います。

    Args:
        event (dict): イベントデータ

    Returns:
        dict: 処理結果を返します。
    """

    # テナントIDと日付からインデックス名を生成する(テナントID_YYYY.MM.DD)
    # 例: エッジアプリからMy-IoTデータストアに今日送信されたデータを取得する場合
    JST = tz.gettz('Asia/Tokyo')
    today = datetime.now(JST).strftime('%Y.%m.%d')
    index_name = '{}_{}'.format(TENANT_ID, today)

    #==============================================================================
    # Elasticsearchにアクセスする際に必要な引数を設定しています。
    # 詳細については【PF仕様書】共通リソースの利用方法(API仕様など)No2 IoTデータへのアクセス
    # を参照してください。
    # ここでは操作内容や、登録データなどを設定しています。
    #==============================================================================
    # 環境変数に設定されたコネクタを検索する条件生成
    connectors = json.loads(CONNECTOR_IDS)
    connector_query = []
    connector_query_item = {
        'match': {
            'connectorID': ''
        }
    }
    for connector in connectors:
        query_item = copy.deepcopy(connector_query_item)
        query_item['match']['connectorID'] = connector
        connector_query.append(query_item)

    payload = {
        'method': 'Get',
        'index': index_name,
        'query': {
            'query': {
                'bool': {
                    'should': connector_query
                }
            },
            'sort': [{'timestamp': 'desc'}]
        }
    }

    try:
        #==============================================================================
        # No2.Elasticsearchアクセス
        # 環境変数で定義されているmyiot-rel-es-access-lambdaに
        # リクエストを実施し、結果を取得します。
        #==============================================================================
        res = boto3.client('lambda').invoke(
            FunctionName=ES_ACCESS_LAMBDA,
            InvocationType='RequestResponse',
            Payload=json.dumps(payload)
        )

        response = json.loads(res['Payload'].read())

    except Exception:
        import traceback
        logger.error(traceback.format_exc())
        return make_response(ERR_INTERNAL_ERROR, ERR_MSG_INTERNAL_ERROR)
    
    # 処理結果を返します。
    return make_response(response['statusCode'], response['body'])


# Lambda関数の処理の入口
def lambda_handler(event, context):

    # ElasticSearchアクセスAPIを呼び出し、データの検索を行います。
    return es_query_access(event)

Elasticsearch検索結果の例(cURLコマンドを使用)

APIGatewayを利用したElasticSearchへの検索リクエスト及びレスポンス例を示します。

<リクエスト>
curl https://<RestApiId>.execute-api.<リージョン>.amazonaws.com/sample-stage/<パス> \
> --get'
<レスポンス>
{"result": {"took": 8, "timed_out": false, "_shards": {"total": 5, "successful": 5, "skipped": 0, 
 "failed": 0}, "hits": {"total": {"value": 1, "relation": "eq"}, "max_score": 1.0, 
 "hits": [{"_index": "sample-index", "_type": "_doc", "_id": "AAAABBBBCCC", "_score": 1.0, 
 "_source": {"Id": "1001", "temperature": 36.3, "time": 1626188400}}]}}}