LoginSignup
1
1

More than 5 years have passed since last update.

AWS IoTとBME280で遊んでみた - 2

Last updated at Posted at 2018-03-14

AWS IoTに送信したセンサー(BME280)のデータを、Kinesis Analyticsで分析してみました。

必要なもの

データの流れ

IoT_Kinesis.png

センサー(BME280)からAWS IoTに送信した温度・湿度・気圧データに対し、Kinesis Data Analyticsで異常検知のデータ(ANOMALY SCORE)を付加してS3に出力し、Athena + QuickSightでグラフ化してみました。

前回(AWS IoTとBME280で遊んでみた)は、BME280からAWS IoTへ温度・湿度・気圧データを送信するところまででしたが、それ以降のデータの流れの部分を作成します。

  1. BME280からAWS IoTへ温度・湿度・気圧データを送信
  2. AWS IoTルールにより受信した温度・湿度・気圧データをKinesis Data Firehoseへ送信
  3. Kinesis Data Firehoseで受けたデータをS3(バケット:iot-bme280)に出力すると同時に、Kinesis Data AnalyticsでANOMALY SCOREを付加し、別のKinesis Data Firehoseに送信
  4. Kinesis Data Firehoseで受けたANOMALY SCORE付データをS3(バケット:iot-bme280-2)に出力
  5. 上記S3出力は、JSON Lines形式から改行を省いた形式になっているので、Lambdaで各JSON毎に改行を入れてJSON Lines形式に変更し、S3(バケット:iot-bme280-3)へ出力
  6. S3(バケット:iot-bme280-3)に対し、Glueのクローラーでテーブルを定義してAthenaでクエリを実行
  7. QuickSightでグラフ化

0. サンプルプログラム(basicPubSub.py)の修正

前回の修正では現在日時は送信していませんでしたが、現在日時も送信するようにサンプルプログラム(basicPubSub.py)を修正してRaspberryPi上で実行します。

basicPubSub.py
# Publish to the same topic in a loop forever
loopCount = 0
while True:
    if args.mode == 'both' or args.mode == 'publish':
        temperature, pressure, humidity = bme280.read_bme280()

        message = {}
        message['datetime']    = str(datetime.utcnow())

1. S3バケット2個、Kinesis Data Firehose2個の作成

Kinesis Data Firehose2個とそれぞれのFirehoseの出力用にS3バケット2個作成します。

Firehose名 S3バケット名
1段目 bme280 iot-bme280
2段目 bme280-2 iot-bme280-2

firehose.png

2. AWS IoT ルールの作成

"MyTopic"というトピック名で受信したデータを全てKinesis Data Firehose(名前:bme280)に送信するルールを作成します。

rule1.png

rule2.png

※上記のように、Separatorは"\n (改行)"を選択します。

この時点でS3バケット(iot-bme280)にデータがたまりはじめます。

3. Kinesis Data Analyticsの設定

下記のようにFirehoseをsourceとdestinationに指定します。

source destination
bme280 bme280-2

SQLは、Example: Detecting Data Anomalies on a Streamを参考にRANDOM_CUT_FOREST関数を実装し、データにANOMALY SCOREを付加します。

kinesisAnalytics11.png

sql1.png

SQL部分

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("datetime" TIMESTAMP, "temperature" REAL,"humidity" DOUBLE, "pressure" DOUBLE, ANOMALY_SCORE DOUBLE);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM
        "datetime","temperature","humidity","pressure",
        ANOMALY_SCORE 
    FROM TABLE(RANDOM_CUT_FOREST(CURSOR(SELECT STREAM "datetime", cast("temperature" as DOUBLE) as"temperature","humidity", "pressure" FROM "SOURCE_SQL_STREAM_001")));

ここまでで、この時点でS3バケット(iot-bme280-2)にANOMALY SCOREが付加されたデータがたまりはじめます。

4. LambdaとS3バケットの作成

Firehoseは複数のJSONが1行にくっついた形式でS3に出力しますがこれではAthenaで読めないため、1行1JSONのJSON Lines形式に変更する必要があります。

その変換用のLambdaと変換後のデータを出力するS3バケット(iot-bme280-3)を作成し、S3バケット(iot-bme280-2)に対しオブジェクトが作成されたらLambdaを起動するようにイベントを設定します。

  • Firehoseの出力形式 = 複数のJSONが1行にくっついている形式
{"datetime":"2018-03-12 12:01:13.995","temperature":15.41,"humidity":72.4736328125,"pressure":1021.46828125,"ANOMALY_SCORE":1.2377089110080235}{"datetime":"2018-03-12 12:01:15.265","temperature":15.41,"humidity":72.4736328125,"pressure":1021.5043359375001,"ANOMALY_SCORE":1.2386195886695075}{"datetime":"2018-03-12 12:01:16.535","temperature":15.41,"humidity":72.4638671875,"pressure":1021.46828125,"ANOMALY_SCORE":1.2401199616651675}
  • Athenaで読める形式
{"datetime":"2018-03-12 12:01:13.995","temperature":15.41,"humidity":72.4736328125,"pressure":1021.46828125,"ANOMALY_SCORE":1.2377089110080235}
{"datetime":"2018-03-12 12:01:15.265","temperature":15.41,"humidity":72.4736328125,"pressure":1021.5043359375001,"ANOMALY_SCORE":1.2386195886695075}
{"datetime":"2018-03-12 12:01:16.535","temperature":15.41,"humidity":72.4638671875,"pressure":1021.46828125,"ANOMALY_SCORE":1.2401199616651675}
  • 変換用Lambda
import boto3
import urllib.parse

BUCKET_OUT = 'iot-bme280-3'

s3 = boto3.resource('s3')


def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    s3obj_in = s3.Object(bucket, key)
    body_in = s3obj_in.get()['Body'].read().decode('utf-8')
    body_out = body_in.replace('}{', '}\n{')

    s3obj_out = s3.Object(BUCKET_OUT, key)
    s3obj_out.put(Body=body_out.encode('utf-8'))

※ 変換部分はちょっと手抜きしてます。

この時点でS3バケット(iot-bme280-3)にAthenaで読めるデータがたまりはじめます。

5. Glue Data CatalogにTableを作成

GlueでS3バケット(iot-bme280-3)に対しクロールして、Data CatalogにTableを作成します。

Table.png

AthenaでSQLを実行してデータが取得できることを確認します。

athena2.png

※変換用Lambdaで出力したS3のキーのフォーマットがHiveフォーマットでないため、クロールした時点で存在するパーティションのデータしか確認できません。再度クロールすれば、新しくできたパーティションのデータを確認できるようになります。
https://docs.aws.amazon.com/ja_jp/athena/latest/ug/partitions.html

  • 現在のフォーマット s3://iot-bme280-3/2018/03/14/19/..
  • Hiveフォーマット s3://iot-bme280-3/year=2018/month=03/day=14/hour=19/..

6. QuickSightでグラフを作成

QuickSightの画面で以下のところで
 [New Analysis] -> [data set] -> [New Data Set] -> [Athena] -> [Create Data Set]
作成したTableを指定してグラフを作成します。

山になっているところは、BMS280に対しドライヤー攻撃をした結果になります。
温度の急上昇に合わせAnamaly Scoreも上昇していることが確認できます。

graph2.png

はまりポイント

1. RANDOM_CUT_FOREST関数がうまく動作しない

温度は 15.25 のように整数2桁小数2桁ですが多分DECIMALと判断されてしまったため、RANDOM_CUT_FOREST関数に入力しても入力データとしては無視されました。(BME280にドライヤー攻撃してもANOMALY_SCOREの値が全然変化しない)
そのため、明示的にDOUBLEに型変換してからRANDOM_CUT_FOREST関数に入力しています。

RANDOM_CUT_FOREST
 -- DECIMAL is not a supported type. Use DOUBLE instead. --

2. QuickSightがエラーになる

QuickSightからAthena経由でアクセスする場合は、S3へのアクセス権限を設定する必要があります。
こちらの記事(Amazon QuickSightで「Insufficient permissions to execute the query」のエラーがでたときの対処法)がとても参考になりました。

雑感

  • BME280にドライヤー攻撃をしたところ1個BME280を壊してしまいました。センサーは意外にデリケートです。復活しました。
  • FirehoseからS3へ出力する際に改行コード等のセパレータが指定できないのは不便。将来指定できるようになるといいなー。
  • 次はSageMakerにデータを入れてみようかな。
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1