フォーム読み込み中
Alibaba Cloud LindormのLindorm ML機能を使って、SQLだけで時系列データから異常検知をしてみました。
Alibaba Cloud LindormはAlibaba Cloudが提供するCloud-Native Multi-Modal Databaseです。このLindormの時系列エンジンにはLindorm MLがあり、この機能はSQLだけで異常検知、予兆検知、クラスタリング、外れ値検知、SVMなど時系列用機械学習を簡単かつ素早く実現することができます。
今回は、Lindorm MLを使って、時系列データの例外を自動的かつ定期的に検出するために、連続クエリの作成方法について説明します。
例えば、IoT等温度と湿度のセンサーがあり、関連するデータを毎秒おきにLindormへアップロードすると仮定します。通常、温度と湿度には急激な変動はありません。もし、極端に高い値や低い値があれば、それはデータの例外とみなされます。
同じRegion/Zone配下にECS、Lindormを配置し、温度・湿度センサーの代わりにECS上からスクリプトを使って毎秒あたりデータをLindormへ書き込み処理します。そのあと、LindormにてSQLを使ってLindorm MLによるタイミング例外検知を試すという流れになります。
Lindorm MLは時系列エンジンのみ提供しているため、時系列エンジンを持つLindormインスタンスを作成します。
Lindormコンソールの「Create」ボタンをクリックします。
Region、Zone、ネットワーク、名前などのインスタンスを設定します。
特定のノードを追加し、インスタンスで時系列エンジンを有効にします。
「Buy Now」ボタンをクリックし、注文確認画面へ移動します。
利用規約のチェックボックスにチェックを入れます。
「Activate Now」ボタンをクリックし、操作を実行します。
そうすると、「Creating」のステータスで新しいインスタンスができます。
Lindormはエンジンの種類によって、最低限配置が必要なノード台数などが仕様として決められています。時系列エンジンとして、ノード数が要件を満たすかを確認します。もしノード数が不足している場合、「Buy Now」ボタンはエラーで無効化されます。
「Running」状態になると、メンテナンス時間の変更、インスタンス名の変更、ホワイトリストの更新など、いくつかの設定を行うことができます。
一般的な設定項目は詳細ページに記載されていますが、対象の項目の後にある「Modify」ボタンをクリックすると、設定を変更することができます。
ネットワークは他のエンジンと同様に、要件に応じたボタンをクリックすることで、パブリックエンドポイントの申請ができます。
ホワイトリストも付帯しています。インスタンスはセキュリティのため、ホワイトリストからの接続しか受け付けません。ここでIPアドレスを追加しておかないと、接続時にエラーが発生します。
今回は、同Region/Zone/VPN配下のECSからリクエストを送るので、そのイントラネットのIPアドレスをホワイトリストに追加するだけで、パブリックエンドポイントの申請は不要です。
作業環境と同じVPC下にあるECSインスタンスを使用します。事前に準備したECSインスタンスに接続し、いくつかの設定を行います。
Lindorm CLIはLindorm時系列エンジンに接続し、Lindorm MLによるデータの例外検出機能を使います。Lindorm CLIインストーラーをダウンロードし、解凍すると、lindorm-cliという実行可能なツールが得られます。著者はECS上に構築するため、Linuxを使用していますが、他のOSをお使いの方は、Lindorm CLIを参考にしてください。
wget -O lindorm-cli-linux-latest.tar.gz https://tsdbtools.oss-cn-hangzhou.aliyuncs.com/lindorm-cli-linux-latest.tar.gz?spm=a2c63.p38356.0.0.338d5a2egBGtdx&file=lindorm-cli-linux-latest.tar.gz
tar -xvf lindorm-cli-linux-latest.tar.gz
温度・湿度センサーを想定したダミーデータは、ECS上で構築したJavaコードによって生成・送信します。ECSインスタンスでJavaファイルを実行するには、Java環境を用意する必要があります。
sudo apt-get update
sudo apt-get install openjdk-8-jdk
java -version
Lindorm CLIでLindormインスタンスに接続し、必要なデータベースとテーブルを作成します。このステップはJavaコードのために残しておくこともできますが、Javaコードを何度か実行する可能性があるので、作成ステップをここに移してデータ生成処理を別に処理した方がよいです。
コンソールのデータベース接続ページで接続情報を確認します。
Lindormインスタンスに接続するためのコマンドを実行します。
./lindorm-cli -url jdbc:lindorm:tsdb:url=http://ld-xxx-proxy-tsdb.lindorm.rds.aliyuncs.com:8242 -username root -password root
SQL文を使ってデータベースとテーブルを作成します。ここで、独自のテーブル構造を使用することもできます。その場合、後でJavaコードの関連情報を変更する必要があります。
create database demo;
use demo;
create table demo.sensor (device_id varchar tag,region varchar tag,time bigint,temperature double,humidity double,primary key(device_id));
Lindorm MLは、Alibaba GroupのDAMO academyが独自に開発したデータベース内のMachine Learning・Deep Learning技術です。
今回はデータの例外検出を行います。データの例外検出は検出アルゴリズムがデータを時系列で学びつつ、そのデータから外れ値があれば検出するという流れです。サポートされている検出アルゴリズムは、esd、nsigma、ttest、Online STL with T-test、Online STL with ESDです。今回はLindorm MLによる簡単なデモとして、以下のコードで単一データポイントやスパイク型例外に適したesdを使用することにします。
シナリオとして、センサーが環境中の温度と湿度のデータを記録し、1秒ごとにLindormインスタンスへデータを送信します。データの例外を発生させるために、途中で高ダミーや低ダミーのデータが発生する確率を低くする必要があります。
時系列エンジンへの接続、データの生成、送信はJavaネイティブSDKで時間単位で行います。Helpページにサンプルとなるソースコードがあるので、それを引用します。
mavenプロジェクトのpom.xmlに以下の部分を追加します。dependencyセクションには、Lindorm Java native SDKの必要なパッケージが記述されています。ビルドプラグインセクションには、実行可能なパッケージをビルドするために使用するツールを記述します。<mainClass>セクションは、手持ちのプロジェクトの構成にあわせて更新する必要があります。
<dependencies>
<dependency>
<groupId>com.aliyun.lindorm</groupId>
<artifactId>lindorm-tsdb-client</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>xxxxxxx</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
</plugin>
</plugins>
</build>
以下は同じコードです。コンソールからご自身の接続情報を元に更新してください。
package org.lindorm.demo;
import com.aliyun.lindorm.tsdb.client.ClientOptions;
import com.aliyun.lindorm.tsdb.client.LindormTSDBClient;
import com.aliyun.lindorm.tsdb.client.LindormTSDBFactory;
import com.aliyun.lindorm.tsdb.client.exception.LindormTSDBException;
import com.aliyun.lindorm.tsdb.client.model.Record;
import com.aliyun.lindorm.tsdb.client.model.WriteResult;
import com.aliyun.lindorm.tsdb.client.utils.ExceptionUtils;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class MainAPP {
public static void main(String[] args) {
// 1.接続URLを元にLindormクライアントを作成する
String url = "http://ld-xxxxx-proxy-tsdb.lindorm.rds.aliyuncs.com:8242";
// LindormTSDBClient - スレッドセーフ、再利用可能、頻繁に作成と破棄をする必要がなくなります
ClientOptions options = ClientOptions.newBuilder(url).build();
LindormTSDBClient lindormTSDBClient = LindormTSDBFactory.connect(options);
// 2.データベース「demo」、テーブル「sensor」を作成
// この操作を行うには、次のステートメントを使用します
// この場合、この関数を複数回実行する可能性があるため、HTTP API の SQL インターフェースを使用します
/*lindormTSDBClient.execute("CREATE DATABASE demo");
lindormTSDBClient.execute("demo", "CREATE TABLE sensor (device_id VARCHAR TAG,region VARCHAR TAG,time BIGINT,temperature DOUBLE,humidity DOUBLE,PRIMARY KEY(device_id))");*/
// 3.生成したデータでテーブルを挿入
long currentTime = System.currentTimeMillis();
SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for(int j = 0; j < 5; j++) {
// 1時間分のデータを準備
int numRecords = 3600;
List<Record> records = new ArrayList<>(numRecords);
currentTime += 3600 * 1000;
for (int i = 0; i < numRecords; i++) {
Record record = Record
.table("sensor")
.time(currentTime + i * 1000)
.tag("device_id", "demo")
.tag("region", "tokyo-jp")
.addField("temperature", generateRandomValue(15.0))
.addField("humidity", generateRandomValue(50.0))
.build();
records.add(record);
}
System.out.println("Insert data from: " + format.format(new Date(currentTime)));
CompletableFuture<WriteResult> future = lindormTSDBClient.write("demo", records);
// 非同期書き込み結果の処理
future.whenComplete((r, ex) -> {
// 書き込み失敗の処理
if (ex != null) {
System.out.println("Failed to write.");
Throwable throwable = ExceptionUtils.getRootCause(ex);
if (throwable instanceof LindormTSDBException) {
LindormTSDBException e = (LindormTSDBException) throwable;
System.out.println("LindormTSDBExceptionが発生しました。これは、リクエストがLindorm TSDBに届いたものの、"
+ "何らかの理由でエラーレスポンスとして拒否されたことを意味します");
System.out.println("Error Code: " + e.getCode());
System.out.println("SQL State: " + e.getSqlstate());
System.out.println("Error Message: " + e.getMessage());
} else {
throwable.printStackTrace();
}
} else {
System.out.println("Write successfully.");
}
});
// 例として、以下のような簡単な同期待ちがあります
System.out.println(future.join());
}
lindormTSDBClient.shutdown();
}
private static double generateRandomValue(double baseValue){
Random random = new Random();
int flag = random.nextInt(100);
double results = 0.0;
if(flag > 5){
results = baseValue + random.nextDouble()*(random.nextInt() % 5);
}else {
results = baseValue + random.nextDouble()*(random.nextInt() % 5) + 100;
}
return Double.parseDouble(String.format("%.1f",results));
}
}
mvn package コマンドで実行可能なパッケージを構築します。
ECSインスタンスにパッケージをアップロードして実行すると、ダミーデータが生成されます。
生成されたダミーデータをLindormインスタンスで確認します。
あらかじめ定義されたanomaly_detect関数でデータの例外を検出します。anomaly_detectはLindorm MLに付帯されてる教師あり機械学習で異常検知をするための関数です。anomaly_detect関数に検知対象のフィールド名、アルゴリズム、検知した際の挙動を引数として入れる必要があります。前述したように、ここではesdアルゴリズムを使用します。例外を手動で見つけるために、下の画像のようにダミーデータと一緒に抑止結果を結合します。一般的な温度は15度前後ですが、例外データは114.1度です。
select device_id, region, time, anomaly_detect(temperature, 'esd', 'adhoc_state=true') as detect_result from sensor where device_id in ('demo') and time >= '2022-11-16 15:00:00' and time < '2022-11-16 15:01:00' sample by 0;
select device_id, region, time, temperature from sensor where device_id in ('demo') and time >= '2022-11-16 15:00:00' and time < '2022-11-16 15:01:00';
ダミーデータは実行時間に基づいて生成されますので、状況に応じてSQL文の時間帯を変更してください。
連続クエリとは、時系列エンジン内で自動的かつ定期的に実行される時系列クエリです。
連続検出クエリでは、データ例外検出関数の同じアルゴリズムとパラメータが使用されます。そのため、クエリは同じ例外検出ステータスを共有します。2回目の検出クエリは、1回目のクエリから返された例外検出ステータスに基づいて実行され、より正確な結果を返します。
次は10秒ごとにダミーデータを送信するようにJavaコードを更新します。継続的なクエリを行うためには、継続的なダミーデータを生成することが望ましいです。サンプルのJavaコードを以下のように更新すると、10秒ごとにダミーデータが送信されるようになります。
package org.lindorm.demo;
import com.aliyun.lindorm.tsdb.client.ClientOptions;
import com.aliyun.lindorm.tsdb.client.LindormTSDBClient;
import com.aliyun.lindorm.tsdb.client.LindormTSDBFactory;
import com.aliyun.lindorm.tsdb.client.exception.LindormTSDBException;
import com.aliyun.lindorm.tsdb.client.model.Record;
import com.aliyun.lindorm.tsdb.client.model.WriteResult;
import com.aliyun.lindorm.tsdb.client.utils.ExceptionUtils;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class MainAPPInterval {
public static void main(String[] args) throws InterruptedException {
// 1.接続URLを元にLindormクライアントを作成する
String url = "http://ld-xxxxx-proxy-tsdb.lindorm.rds.aliyuncs.com:8242";
// LindormTSDBClient - スレッドセーフ、再利用可能、頻繁に作成と破棄をする必要がなくなります
ClientOptions options = ClientOptions.newBuilder(url).build();
LindormTSDBClient lindormTSDBClient = LindormTSDBFactory.connect(options);
// 2.データベース「demo」、テーブル「sensor」を作成
// この操作を行うには、次のステートメントを使用します
// この場合、この関数を複数回実行する可能性があるため、HTTP API の SQL インターフェースを使用します
/*lindormTSDBClient.execute("CREATE DATABASE demo");
lindormTSDBClient.execute("demo", "CREATE TABLE sensor (device_id VARCHAR TAG,region VARCHAR TAG,time BIGINT,temperature DOUBLE,humidity DOUBLE,PRIMARY KEY(device_id))");*/
// 3.生成したデータでテーブルを挿入
SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for(int j = 0; j < 2 * 3600; j++) {
// データの準備
int numRecords = 10;
List<Record> records = new ArrayList<>(numRecords);
long currentTime = System.currentTimeMillis();
for (int i = 0; i < numRecords; i++) {
Record record = Record
.table("sensor")
.time(currentTime + i * 1000)
.tag("device_id", "demo")
.tag("region", "tokyo-jp")
.addField("temperature", generateRandomValue(15.0))
.addField("humidity", generateRandomValue(50.0))
.build();
records.add(record);
}
System.out.println("Insert data from: " + format.format(new Date(currentTime)));
CompletableFuture<WriteResult> future = lindormTSDBClient.write("demo", records);
// 非同期書き込み結果の処理
future.whenComplete((r, ex) -> {
// 書き込み失敗の処理
if (ex != null) {
System.out.println("Failed to write.");
Throwable throwable = ExceptionUtils.getRootCause(ex);
if (throwable instanceof LindormTSDBException) {
LindormTSDBException e = (LindormTSDBException) throwable;
System.out.println("LindormTSDBExceptionが発生しました。これは、リクエストがLindorm TSDBに届いたものの、"
+ "何らかの理由でエラーレスポンスとして拒否されたことを意味します");
System.out.println("Error Code: " + e.getCode());
System.out.println("SQL State: " + e.getSqlstate());
System.out.println("Error Message: " + e.getMessage());
} else {
throwable.printStackTrace();
}
} else {
System.out.println("Write successfully.");
}
});
// 例として、以下のような簡単な同期待ちがあります
System.out.println(future.join());
Thread.sleep(10000);
}
lindormTSDBClient.shutdown();
}
private static double generateRandomValue(double baseValue){
Random random = new Random();
int flag = random.nextInt(100);
double results = 0.0;
if(flag > 5){
results = baseValue + random.nextDouble()*(random.nextInt() % 5);
}else {
results = baseValue + random.nextDouble()*(random.nextInt() % 5) + 100;
}
return Double.parseDouble(String.format("%.1f",results));
}
}
実行可能なパッケージをビルドし、前回と同様にECSインスタンスで実行します。
継続的にクエリ結果を保存するために、新しいテーブルが必要です。クエリと共に新しい結果テーブルを作成します。
CREATE TABLE demo.anomaly_points(
device_id varchar tag,
region varchar tag,
time bigint,
anomaly_result boolean,
PRIMARY KEY(device_id)
);
CREATE continuous query demo.cq_detector WITH(
interval = '1m'
) AS
INSERT INTO demo.anomaly_points(
device_id,
region,
time,
anomaly_result
)
SELECT
device_id,
region,
time,
anomaly_detect(temperature, 'esd') AS anomaly_result
FROM
demo.sensor
WHERE
device_id = 'demo' sample BY
0;
show continuous queries;
この例では、esdアルゴリズムを使って、1分ごとにダミーデータテーブルの直前1分間のデータから例外を検出します。
以下のコマンドで、データの差分を表示することができます。
select count(*) from demo.anomaly_points;
select max(time) from demo.anomaly_points;
テーブルのデータ数と最大時間は、1分ごとに連続クエリで更新されていることが見えます。
検証終了後、連続クエリを削除することができます。
drop continuous query demo.cq_detector;
定義済みの検出関数は、maxAnomalyRatioやwarmupCountなどのパラメータを受け付けます。関連情報は、ヘルプドキュメントで確認することができます。
SELECT
device_id,
region,
time,
anomaly_detect(temperature, 'esd', 'lenHistoryWindow=30,maxAnomalyRatio=0.1') AS detect_result
FROM
sensor
WHERE
device_id IN('demo')
AND time >= '2022-11-16 15:00:00'
AND time < '2022-11-16 00:01:00' SAMPLE BY
0;
入力パラメータの違いにより、検出結果は異なります。
一方、入力パラメータを間違えると、いくつかのエラーが発生します。例えば、esd アルゴリズムの lenHistoryWindow は 20 未満であってはならない。もしこれを2にしてしまうと、クエリ結果の代わりにNullPointerExceptionが発生します。
lindorm:demo> SELECT device_id, region, time, anomaly_detect(temperature, 'esd', 'lenHistoryWindow=2,maxAnomalyRatio=0.8') AS detect_result FROM sensor WHERE device_id in ('demo') and time >= '2022-11-16 15:00:00' and time < '2022-11-16 00:01:00' SAMPLE BY 0;
ERROR 9000 (HY000): Server internal error; Please try again, or follow the error message to troubleshoot the problem.
Caused by: java.lang.NullPointerException
at com.alibaba.lindorm.tsdb.tsql.connector.LindormTSDBConnector.runDownsampleQuery(LindormTSDBConnector.java:141)
at com.alibaba.lindorm.tsdb.tsql.connector.downsample.DownsampleExecutorImpl.getDataRows(DownsampleExecutorImpl.java:49)
at com.alibaba.lindorm.tsdb.tsql.simple.SimpleExecutorCommand.lambda$execute$0(SimpleExecutorCommand.java:88)
at com.alibaba.lindorm.tsdb.tsql.jdbc.LindormTSQLMetaImpl$LindormTSQLSignature$1.enumerator(LindormTSQLMetaImpl.java:430)
at org.apache.calcite.linq4j.AbstractEnumerable.iterator(AbstractEnumerable.java:33)
at org.apache.calcite.avatica.MetaImpl.createCursor(MetaImpl.java:90)
at org.apache.calcite.avatica.AvaticaResultSet.execute(AvaticaResultSet.java:186)
at org.apache.calcite.avatica.AvaticaConnection$1.execute(AvaticaConnection.java:666)
このエラー ERROR 8012(42000) は、検出されたフィールドが検出機能付きselect文の中にあるためです。つまり、フィールドの値と検出結果を一緒に表示することができなかったのです。なので、検出されたフィールドをselect文から削除すると、この問題は解決します。
lindorm:demo> select device_id, region, time, temperature, anomaly_detect(temperature, 'esd', 'adhoc_state=true') as detect_result from sensor where device_id in ('demo') and time >= '2022-11-16 15:00:00' and time < '2022-11-16 15:01:00' sample by 0;
ERROR 8012 (42000): Unsupported operation; Field aggregator must be specified in downsample query: temperature
本記事ではLindorm MLによる、SQLで外れ値検出の例をしました。
LindormはMulti-Modal Databaseでありながら、時系列データを素早く格納すると同時にデータから異常や脅威を早期に検知することができます。そのため、例えばIoTで時系列データは時系列エンジン、メトリクス等JSONデータはWide-columnのWide table engineへ格納し、時系列データで異常があればそれに関連するJSON情報を素早く引き出すといったアプローチも可能です。機械学習用のプロダクトサービスやETL、データ転送系プロダクトサービスを準備せずにデータベース内で教師あり/教師なし機械学習ができるのは、サービス全体からみて非常に大きいポテンシャルです。
条件に該当するページがございません