ゆぴきたすなひとりごと

いつでもどこでも、どうでもいい情報を得られることをモットーとしたブログ

Lambda関数(node.js)で何かを待つときに必要だった処理

Amazon Alexaで遊んでいたら、困った事がおきました。

node.jsが非同期処理で動いており、私の想定する動きをしなかったので、その時の解決策を覚え書きします。

やりたかったこと

3秒くらいで書いた適当シーケンスですが、何がやりたかったのかといわれたらこんな感じです。

f:id:shikanikusausage:20180318112757p:plain

  1. ユーザーは、UI(Alexa)を操作し、要求を送信する。
  2. UI(Alexa)は、Lambdaの関数を実行する。
  3. Lambda関数は、AWS IoT経由で、制御対象に要求をPublishする。
  4. 制御対象は、任意のタイミング(実行完了後)で応答をAWSIoTへPublishする。
  5. AWSIoTは、Publishされた応答を、DynamoDBに格納する。
  6. Lambda関数は、4~5の間、応答がDynamoDBに格納されるか、一定時間経過するまでループし、その結果をUI(Alexa)に返す。
  7. UI(Alexa)は、結果をユーザーに返す。

問題となったのは手順の6でした。

 

原因

Lambda関数はnode.jsで記載しており、node.jsは非同期処理で動作していました。

「node.js 非同期」とかでググると、すぐに分かります。

 

対策

PromiseやGeneratorやasync/awaitを使って解消できました。

今回はPromiseを使いました。

 

タスク処理の初期化関数
/**
 * タスク処理の初期化
 */
const taskInitialize = (event, context) => {
    return new Promise((resolve, reject) => { 
        const stash = {
            env: {
                TableName: '*****',  // 応答が格納されるDynamoDBのテーブル名
                TopicName: '*****',  // 要求をPublishするトピック名
                DynamoEndPoint: 'https://*****.amazonaws.com',  // DynamoDBのエンドポイント
                IoTEndPoint: '*****.amazonaws.com',  // AWS IoTのエンドポイント
                QoS: 0,  // AWS IoTのMQTT通信のQoS
            },
            msg: {
                led: 0
            },
            startTime : Date.now(),
            result:''
        };
        resolve(stash);
    });
};

 

ここでは主にタスク間でやり取りする情報(stash)を定義しています。

今回だと、env(通信設定情報)、msg(例)、開始時刻、結果の4つの情報を持たせています。

 

AWS IoTへPublishする関数
/**
 * AWSIoTへPublishするタスク
 */
const taskPublish = (stash) => {
    return new Promise((resolve, reject) => {
        var iotdata = new AWS.IotData({endpoint:stash.env.IoTEndPoint});
        var params = {
            topic: stash.env.TopicName, 
            payload:stash.msg.led,
            qos: stash.env.QoS
        };
        iotdata.publish(params,function(err,data){
            if (err) {
                reject(err);
                return;
            } else {
                resolve(stash);
                return;
            }
        });
    });
}

 

stashを受け取り、その中の情報を元にパラメータを作成し、AWSIoTにPublishします。

失敗した時はreject()し、成功した場合はresolve(stash)を呼び出す事を忘れないように。

 

結果を取得する関数

こいつが今回のキモです。

/**
 * 結果を取得するタスク
 */
const taskGetResult = (stash) => {
    return new Promise((resolve, reject) => {
        AWS.config.update({endpoint: stash.env.DynamoEndPoint});
        var dynamoDB = new AWS.DynamoDB.DocumentClient();
        
        var param = {
            TableName : stash.env.TableName,
            // ここのパラメータは自分が取得したいデータに応じてカスタム
        };
        
        dynamoDB.query(param, (error, data) => {
            if (error) {
                reject(error);
                return;
            } else {
                // Queryの結果、アイテムが1つ以上ある場合
                if(data.Items.length > 0){
                        if(結果){
                            stash.result = true;
                            resolve(stash);
                            return;
                        } else {
                            stash.result = false;
                            resolve(stash);
                            return;
                        }
                    }
                }
                // Queryの結果、アイテムが1つもない場合
                if((Date.now() - stash.startTime) < TIMEOUT){
                    // タイムアウトしていない場合は、taskGetResultを実行。
                    taskGetResult(stash)
                        .then(resolve)
                        .catch(reject);
                } else {
                    // タイムアウトの場合は、失敗。
                    reject(error);
                }
            }
        });
    });
}

DynamoDBへデータを要求し、データが存在している場合はその結果を判定し、resolve(stash)で結果を返す。

データが存在していない場合は、タイムアウトしていないかを確認し、タイムアウトしていなければ、自分の関数を呼び出す。

タイムアウトしていればreject()する。

3つの関数の呼び出しをする関数
/**
 * リクエストを実行する
 */
function execRequest(event, context, callback){
    taskInitialize(event, context)
        .then(taskPublish)
        .then(taskGetResult)
        .then(callback.bind(null, null))
        .catch(callback);
}

これだけです。

 

所感

初めてnode.jsを触ったが、非同期処理だと知らなかった。

非同期処理だとこういう問題に当たり、知識が必要になってくるんだなと思った。

このソースは結構流用できていて、便利。

特にAlexaスキルの開発中はとても便利。