InfluxDB
なんか名前かっこいいし時系列DBってなんかよさそう!ロゴもUIもイケてるしこれは入門するしかないっしょ!
という乗るしかないこのビッグウェーブに感を出しつつ実際の運用では、
仮想通貨の取引botのログやメトリクスの収集と分析のために使います。
なんか金融時系列データとInfluxDBの相性良さそうだし。
この記事ではpythonのinfluxdbクライアントとコンソールを使用します。
pip install influxdbでクライアントをインストールしておいてください。
書いている人は二日酔いと翌日の期末テストの準備と徹夜カラオケ睡眠不足の三重苦で脳死しているので怪しい箇所が
多々あるかもしれませんがコメントで優しく指摘してもらえると嬉しいです。
御託はここまでで本題に入ります。
RDBとの共通点・相違点
InfluxDBを理解するために、まずはRDBとInfluxDBの共通点と異なる点を見てみましょう。
共通点
- データベースの概念
- ユーザー管理の仕組み
まぁInfluxDB使おうと思ってる人ならRDBの経験はありますよね
ということで説明しません。
相違点
- Measurementとそのスキーマ定義
- プライマリキーは複合キー
- 外部キーがない
- カラムの役割を細分化したタグとフィールド
1. Measurementとそのスキーマ定義
適切な言葉が思いつかなかったのでそのまま英語表記にしておきます、カタカナ表記は違和感がすごい。
まぁAWSの内容のわかりにくいサービス名やキモいオプション名の数々に比べたら、たかが英単語1つくらい屁でもないですよね。
Measurementは既存のRDBMSと違って予めスキーマを定義して読み込ませる必要がありません。
マイグレーションからの解放…すばらしい….
そのため動的にカラムを追加することができ、テーブル設計も比較的ゆるく行うことができます。
つまり、RDBが固定スキーマをテーブル作成に使うのに対して、InfluxDBはそうではないということですね。
2. プライマリキーは複合キー
RDBにおけるプライマリキーにあたるのは、InfluxDBではtimeとタグの複合キーです。
timeカラムはプライマリキーではないので一意である必要はありません。
ただし厳密に言えばこの複合キーはプライマリキーではありません。
もしtimeカラムとタグが完全に一致するようなリクエストが投げられた場合はUPDATEとして処理されるからです。
RDBではINSERTはINSERT, UPDATEはUPDATEとして処理され、既に存在するプライマリキーと同一のレコードにINSERTを
発行するとエラーになりますよね。
この複合キーをプライマリキーのように扱う性質から、例えば異なる仮想通貨botの取引ログを、
そのbotの名前と識別子をタグに設定することによって1つのデータベースに保存することができます。
より一般化すれば、異なるデータソースからの同じ時刻データであっても、その単位が同一であれば一つのデータベースの中にに正しく保存できるということです。
今までバックテストの環境にMySQLとSQLAlchemyを駆使してやってきました。
しかし仮想通貨取引botの作成にあたって直面したありとあらゆるDBの制約の問題はInfluxDBなら発生しない感がありますね。
本番だけじゃなくてバックテストの環境も完全にInfluxDBに移行しようかな…。Grafanaもかっこいいし()
3. 外部キーがない
InfluxDBはRDBではないので、プライマリキーも外部キーもjoinも1次以上の正規形も存在しません。
Measurementの設計についてはInfluxdata docs – InfluxDB schema design and data layout –を読んでください。
4. カラムの役割を細分化したタグとフィールド
どちらも実態としてはカラムですが、その役割によって名前が異なっています。
集合のノリで言えば、タグとフィールドは互いに背反なカラムの要素です。
タグ
タグは、そのデータソースの識別子です。
公式ドキュメントには、1つのタグにはただ1つのエンティティを割り当てることが推奨されています。
bot_param1_x_param2_bみたいなのはやめろということですね。
仮想通貨botで言うなら、あるbotの取引ログのMeasurementにおける注文タイプのカラムですね。
注文タイプはロングとショートですが、それを記録するカラムをタグと呼んでいます。
もう1つ例を出すなら都道府県別人口のテーブルがあったとして、その都道府県idないし都道府県名のカラムがタグにあたりますね。
フィールド
それに対してフィールドはデータそのものです。
仮想通貨botで言うなら、あるbotの取引ログのMeasurementにおけるエントリー価格や約定価格のカラムですね。
これらは識別子としての役割はありません。
もう1つ例を出すなら都道府県別人口のテーブルがあったとして、その人口の数値がフィールドにあたりますね。
初期設定
気が向いたら書きます。気になる人は記事最後の参考ページに載ってるのでそれ見ながらやってください。
コンソールからユーザー・DBを作る
気が向いたら書きます。気になる人は記事最後の参考ページに載ってるのでそれ見ながらやってください。
influxdbのpythonクライアントを使ってみる
こちらは完全にオレオレ環境になっているのでコードの内容を逐次説明していきます。
ログイン
from influxdb import InfluxDBClient
def influxdb_establish_connection():
return InfluxDBClient(
host=config['influxdb']['host'],
port=int(config['influxdb']['port']),
username=config['influxdb']['username'],
password=config['influxdb']['password'],
database=config['influxdb']['db_name']
)
connector = influxdb_establish_connection()
まず初期化です。引数として
- hostには”localhost”
- portには8086
- usernameには先ほど作成・設定したユーザー名の文字列
- passwordにはユーザーのパスワードの文字列
- databaseにはデータベース
を指定してconnectorというインスタンスを作っています。
データベース、Measurementを確認する
>>> connector.get_list_database()
[{'name': 'tradingbot'}]
>>> connector.get_list_measurements()
[]
>>> connector.get_list_retention_policies()
[{'name': 'autogen', 'duration': '0s', 'shardGroupDuration': '168h0m0s', 'replicaN': 1, 'default': True}]
Databaseは先ほど作ったものが確認できますね。
Measurementsはまだ作っていませんから空のリストが返ってきています。
retention policyについては何も設定していません、これらがデフォルト値のようです。
データの投入
投入するデータはハッシュのリストです。
ハッシュには
- fields
- measurement
- tags
- time(任意)
を設定しています。
ここでは、testという名前の都道府県別平均身長・体重を記録するMeasurementを作ります。
いちいちCREATE TABLEとか叩かずに、DBにぶん投げるデータに ‘measurement’: “test”と書くだけなので楽ちんです。
fieldにはデータそのものを、タグにはデータの識別子を設定します。
timeは任意です。設定しなければ自動でレコードの登録された時刻が割り振られます。
write_points()の返り値はboolで、書き込みに成功したらTrue,失敗したらFalseが返ってきます。
>>> from datetime import datetime, timedelta
>>>testdata = [
{'fields': {"average_width":170, "average_height":65},
'measurement':"test",
'tags': {"location":"tokyo"}
},
{'fields': {"average_width":195, "average_height":100},
'time':(datetime.now()-timedelta(days=1)),
'measurement':"test",
'tags': {"location":"gunnma-"}}
]
>>> result = connector.write_points(testdata)
>>> print(result)
True
ちゃんとTrueが返ってきていますね、データの投入は完了したようです。
データの確認
続いてMeasurementとレコードが存在するかを確認します。
>>> connector.get_list_measurements()
[{'name': 'test'}]
measurementが作成されています。
続いて中身を確認。
>>> result_query = connector.query('SELECT * FROM test')
>>> print(result_query)
ResultSet({'('test', None)': [
{'time': '2019-12-31T14:45:18.55313408Z', 'average_height': 100, 'average_width': 195, 'location': 'gunnma-'},
{'time': '2020-01-01T06:45:34.778127487Z', 'average_height': 65, 'average_width': 170, 'location': 'tokyo'}
]})
おや、1日差を指定したはずなのに時間に大きなズレがありますね。
これはdatetimeがシステムの時刻を利用しているのに対しDB内部ではutctimeを用いていることで発生しています。
そこで先ほどのtest dataをutctimeを使うようにしてtest0というmeasurementにぶち込みます。
>>> testdata = [
{'fields': {"average_width":170, "average_height":65},
'measurement':"test0",
'tags': {"location":"tokyo"}
},
{'fields': {"average_width":195, "average_height":100},
'time':(datetime.now(timezone.utc)-timedelta(days=1)),
'measurement':"test0",
'tags': {"location":"gunnma-"}}
]
>>> result_query = connector.query('SELECT * FROM test0')
>>> print(result_query)
ResultSet({'('test0', None)': [
{'time': '2019-12-31T06:54:45.132574976Z', 'average_height': 100, 'average_width': 195, 'location': 'gunnma-'},
{'time': '2020-01-01T06:54:56.657673577Z', 'average_height': 65, 'average_width': 170, 'location': 'tokyo'}]})
これで分単位では正しく時刻が投入されるようになりました。
ただ秒数が謎に微妙にずれているので、運用上ではtimeを使うか使わないかは統一したほうがよさそうですね。
ではでは
参考
- Qiita – InfluxDBをPythonから使う –
- Authentication and Authorization in InfluxDB
- InfluxDB用Pythonライブラリの使い方
- InfluxDBへデータを登録する
- [Python] datetimeでタイムゾーンを扱う(pytz利用、UTC/JSTの変換、など)
- Influxdata – Is timestamp the primary key? –
- Stackoverflow – how to define relationship between InfluxDB measurements (tables) –
- Influxdata – docs –