SQLAlchemyのbulk insert/updateでドハマりしたまとめ

bulk insert / bulk update

bulk insert使ってますか?

forで1行1行insertやupdateを発行してたりしませんか?まぁしている人はこのページにたどり着かないでしょう。
そしてこのページを見ているということはエラーで奇声を上げて苛ついていることでしょう(経験談)

私自身も役に立たないリファレンス、数少ないstack overflow、そして幾多のNoneとNULLと戦い、2日かけて終戦を迎えました。

というわけでSQLAlchemyのbulk系メソッドについてハマった箇所と共にメモに残しておきたいと思います。

公式のレファレンスはSQLAlchemy 1.3 Documentation – Additional Persistence Techniques –です。
ここのBulk Operationsの箇所に説明があります。(が、役に立ちませんでした)
その他の参考ページは下にまとめておきます。

bulk系メソッドの挙動

まずはbulk系メソッドの一覧です。

  • session.bulk_insert_mappings()
  • session.bulk_update_mappings()
  • session.bulk_save_object()

これらはmappingからinsertかupdateをするもの、そしてmappingを使用するかという点が異なります。
これらのメソッドはsession.commit()をすることでデータベースに書き込まれます。

発行されているSQLはcreate_engine()の引数にecho=Trueを与えて見ることができます。

実際にSQLを見てみると、bulk系メソッドが呼ばれた時にトランザクションを開始し、
commit()が呼ばれた段階でCOMMITしています。
逆に言えばcommit()が呼ばれないとデータベースに書き込まれないので書き忘れに注意。

またトランザクションの中でエラーが起きた場合、エラー処理なしではコミットもロールバックもされない点に注意してください。
デッドロックが発生します。
これの解決で20分悩みました。

session.bulk_insert_mappings()

実際のコードを見てから説明したほうがわかりやすそうなので、まずはコードを貼ります。

def generate_transaction_log(self, db_client, summary_id):
        log_dict = {
            "backtest_summary_id": int(summary_id),
=====================(略)=====================
            "profit_size": float(self.profit_size),
            "profit_percentage" : float(self.profit_percentage)
        }
        return log_dict
def update_summary(self):
    summary_dict = {
        "id": self.summary_id,
        "total_entry": len(self.closed_positions_df),
        "total_max_holding_ms": self.closed_positions_df["holding_time"].max().to_pytimedelta(),
=====================(略)=====================
        "relative_drawdown": float(drawdowns["relative_drawdown"]),
        "profit_factor": float(win_row.profit_size.sum() / abs(lose_row.profit_size.sum())),
        "recovery_factor": recovery_factor
        }

        self.db_client.session.bulk_update_mappings(BacktestSummary, [summary_dict])

上から順に。

まずbacktest_summary_idは外部キーです、次のbulk_update_mappings()で出てきます。

1つ目のメソッドのgenerate_transaction_log()はただの辞書を返しています。

2つ目のメソッドのinsert_backtest_transaction_logs()ではtransaction_logsというリストにforループの中でappend()をしています。

つまりtransaction_logsは辞書のリストということになりますね。
そして最後の行のループを抜けたときの処理で、bulk_insert_mappingsが登場します

BacktestTransactionLogはdeclarative_base()を継承したモデルです。
よってbulk_insert_mappingsの第1引数はモデルです。
第2引数は辞書のリスト
でしたね。

私は最初、bulk_insert_mappings()で辞書のリストの部分を、ただの辞書を渡してループ内で渡していたため、随分と沼にハマりました。
1つだけ値を渡しておき後でアップデートするのは

session.bulk_insert_mappings(<Model object>, [a_thing])

のように単数オブジェクトを一旦配列に変換して渡してやる必要があります。

このsummary_dictは単数なので、リストに変換してbulk_update_mappingsに渡しています。
BacktestSummaryは先ほどのBacktestTransactionLogと同様のモデルです。

session.bulk_update_mappings()

引き続きコードを見ていきます。

def update_summary(self):
    summary_dict = {
        "id": self.summary_id,
        "total_entry": len(self.closed_positions_df),
        "total_max_holding_ms": self.closed_positions_df["holding_time"].max().to_pytimedelta(),
=====================(略)=====================
        "relative_drawdown": float(drawdowns["relative_drawdown"]),
        "profit_factor": float(win_row.profit_size.sum() / abs(lose_row.profit_size.sum())),
        "recovery_factor": recovery_factor
        }
    self.db_client.session.bulk_update_mappings(BacktestSummary, [summary_dict])

ただ先ほどと異なる点として、idを指定しています。
先程のbacktest_summary_idはただの外部キーでしたが、今回はsummary_idがプライマリキーとして使われています。
ここで指定した、プライマリキーであるidの行に対してupdateを行うのがbulk_update_mappings()です。

このsummary_dictは単数なので、リストに変換してbulk_update_mappingsに渡しています。
BacktestSummaryは先ほどのBacktestTransactionLogと同様のモデルです。

session.bulk_save_object()

bulk_save_objectはdeclarative_baseを継承しているモデルのインスタンスを用いてinsertまたはupdateを行います。
実際にinsertがなされることは確認しましたが、insertとupdateを自動で識別してくれるかは検証していないためわかりません。

ただ、先ほどのbulk_update_mappings()は、プライマリキーを用いてどの行をupdateするか特定しています。
それに加えて私の環境ではbulk_insert_mappings()とbulk_update_mappings()の両方をsession.commit()を呼ぶ前に実行していますが、正常に動作しています。

そこから予想すると、insertとupdateの判定もプライマリキーを持つ行が存在するかどうかで自動判定していそうです。

つまりinsertならプライマリキーを指定せずにリストに追加する。
updateならプライマリキーを指定してリストに追加する、というような挙動が予想されます。

bulk_save_object()はbulk_insertを試した時にしか使っていないので、実際の環境で検証してみてください。
(もし検証された方がいたらコメントしてもらえると嬉しいです)

 bulk_insert/updateとは別にinsertやupdateをしたい

例えば、あるテーブルのidが確定しないと、あるテーブルの外部キーがわからない問題がある時。

私のケースで言えば、backtest_transaction_logの外部キーはbacktest_summaryのidですが、このidはinsertしないと確定しない時。
(後で気づいたがsummaryに保存されているtransaction resultの結果をまとめたテーブルを別に作れば良いだけである、テーブル設計は慎重に!)

この場合に、最初のinsertでsession.commit()を使ってしまうと、溜め込んでいたbulk_insertやupdateも一緒に書き込まれてしまい、効率が悪くなってしまいます。

このような場合にはengine.executeを使うことができます。

    def init_summary(self):
        summary = BacktestSummary().__table__
        init_summary = {
            "bot_name": self.bot_name,
            "initial_balance": self.initial_balance,
            "account_currency": self.account_currency
        }

        self.db_client.engine.execute(summary.insert().values(init_summary))

まずモデルの中からテーブル情報のインスタンスを取り出します。
そしてこのテーブルへのinsert()を、engine.execute()に渡してやることで、session.commit()することなくinsertができます。
updateについてはupdate()を用いれば良いでしょう。

もっとも、テーブル定義を見直せば解決できたりするので、この章はテーブル設計は慎重にやりましょうというオチでした。

参考

  1. SQLAlchemy 1.3 Documentation – Additional Persistence Techniques –
  2. stack overflow – Bulk insert with SQLAlchemy ORM –
  3. Qiita -[Python] SQLAlchemyを頑張って高速化 –
  4. tutorials.technology – Fast bulk insert with sqlalchemy –