KNIMEによる消費者行動予測-メモリに載るだけのレコードに分割して処理する


KNIMEによる消費者行動予測-問題設定編-で設定した問題の解決法について述べます。今回は最終回として

  1. 学習したモデルを保存して再利用する
  2. 作成したworkflowを持ち運べるようにし、さらに環境に依存する設定情報を外部化する
  3. メモリに載るだけのレコードに分割して処理する←ここ

を説明します。

動機・着想

KNIMEのノードには、Memory Policyという設定があり、必ずしもデータ全体がメモリに載るわけではないようなのですが、それでもOut of Memoryになることがあります。数百万件のレコードをRDBから読み出してNaive Bayes Predictorに入力しなければなりませんが、Database Readerで全件selectを掛けると画面が応答しなくなり、やがてOut Of Memoryでエラーになります。

Naive Bayes Predictorに限らず大抵の分類器はレコード全件を使って推論することはありませんから、RDBから一度に読み込むレコード数を制限し、推論を繰り返せば良さそうです。ただこれを素直に実現しようとすると以下の問題に気づきます。

  1. 繰り返しの分だけ画面操作が必要
  2. 対象件数が増えていった時に繰り返し回数が変わるかも知れない

そこで、この繰り返しを自動化出来ないかを考えます。幸いKNIMEには処理をループさせるためのノードがあります。また、RDBから一度に読み込むレコード数を制限することは、SQL的にはレコード全体をあるキーでソートした上でoffsetとlimitを付ければ可能です。

つまり、以下のような手順を考えます。

  • 一回の読み出しで処理出来るレコード件数を見極める。例えば25万件(以下これをworkflow variableのlimitで表わす)
  • クエリのoffsetを0からレコード全件をカバーする数まで生成し、flow variableのoffsetに紐付け(例えば0, 25万, 50万, ..., 175万)
  • flow variableが尽きるまで以下を繰り返す
    • offsetとlimitを使ってクエリを実行
    • 読み出したデータを使って推論を実行
    • 推論結果を保存

次にこれを実現していきます。

ワークフローの解説

作成したワークフローは下図のようになります。workflow_infer

一回の読み出し件数を見極める

まず、一回の読み出し件数をlimitというWorkflow variableに設定しておきます。Workflow Projects から このワークフローをポイントしてコンテキストメニューを開き、Workflow Variables...から設定出来ます。
Variable TypeはINTEGERで、250000としておきましょう

ループ制御について

KNIMEにおけるループはC言語の様にループカウンタで制御することも出来ますが、ここではTable Row To Variable Loop StartとVariable Loop Endを使うことにしました。クエリのoffsetを0からレコード全件をカバーする数まで生成し(例えば0, 25万, 50万, ..., 175万)ておき、それを使い果たすまでループを繰り返すという制御になります。クエリのoffsetを0からレコード全件をカバーする数まで生成するには、Database Queryにて以下の様なSQLを実行します。
select "offset"::integer from generate_series(0, (select count(1) from "推論対象"), {Ilimit}) as t("offset");
{Ilimit}がflow variableで置換される箇所です。generate_seriesについてはPostgreSQLのマニュアルをご覧下さい。

実際のデータを読み込む処理は、Database ConnectionとDatabase QueryとDatabase Connection Readerに分けました。Database Readerで{Ioffset}をクエリに埋め込むのが上手く行かなかったためです。

  • Database Connectionは読み出し先のテーブルとソート条件のみを指定します。
    select * from "推論対象" order by id;
  • Database Queryで読み出し範囲を指定します。
    SELECT * FROM #table# t offset {Ioffset} limit {Ilimit};
    #table#の部分はプレースホルダで、Database Connectionに指定したクエリが以下のようにサブクエリとして展開されます。
    SELECT * FROM (select * from "推論対象" order by id) t offset {Ioffset} limit {Ilimit};
  • Database Connection Readerで実際にクエリが実行されます

ループの終端はVariable Loop Endとなっています。ノードのリファレンスでは、

This node closes a loop but does not aggregate data created in the loop but only variables.
このノードはループを閉じるがループで作られたデータを集約しない。変数は別。

という説明となっています。このワークフローは、ループ毎に結果をCSVに追記していくものなのでこれを選択しました。他にもループの終端を成すノードはあるのですが、ループ内で加工されたデータを受け取って別の処理につなげるためのもののようです。CSV WriterからVariable Portでつながっていますが、単につながれば良いだけで受け渡される変数に意味はありません。

推論結果をまとめる

推論自体には特筆することはありませんが、三つのモデルからの結果をまとめる必要があります。ここでは、モデルを区別する項目を追加して3行にする方法を採りました。以下のようなイメージです。

id スコア 予測時点(nヶ月後)
1001 0.6 1
1001 0.7 3
1001 0.9 6
1002 0.2 1
1002 0.1 3
1002 0.3 6

予測時点(nヶ月後)にはモデル毎に1,3,6のいずれかの値を設定します。これを実現するためにJava Snippet(simple)というノードが使えます。Javaのコード片を書いて任意の処理を実現できます。設定方法としてはReplace or Append ColumnにてAppend Columnを選択して、カラム名を入力(monthsという名前にします)します。Method Bodyには
return 1;
というのを書くだけです。これで、monthsというカラムに定数1が設定されます。

3行を一つにまとめるにはConcatenateが使えます。2つのテーブルを行方向に連結するものなので、2段階に分ける必要があります。

結果の保存

推論結果を保存する方法としてはCSVへの出力をすることにしました。CSVファイルにはヘッダを出力したいのですが、普通に設定するとループの度にヘッダ行が出力されてしまいます。つまりループの初回のみヘッダ出力を設定し、それ以後はヘッダ無しで追記していく、という動きにしなければなりません。つまりCSV Writerの設定で

  • Writer column header
  • Don't write column headers if file exists

にチェックを入れておきます。ループ毎に同じファイルに追記しますので、If file exists...にはAppendを指定しなければなりません。

これで保存については問題ありませんが、ファイル名が固定なので、新しく推論し直す時は以前出力されたファイルを移動または削除しておかないと、過去の推論結果に追記されてしまうことになり面倒です。つまり、最初のループでは、If file exists...をOverwriteに設定し、以後のループではAppendに戻すという制御をしてみましょう。Java Edit Variable(simple)が使えます。

  1. ループ内部のノードにはループ回数を表すflow variableであるcurrentIterationが渡されます。これをJava Edit Variable(simple)に取り込むためループ内部のいずれかのノードからVariable portをつなぎます。
  2. Java Edit Variable(simple)ではCSV Writerの挙動を変えるためのflow variableを定義します。Define Variableを選択しfileOverwritePolicyと名付けます。Method bodyには
    return {IcurrentIteration} == 0 ? "Overwrite" : "Append";
    と記述します。currentIterationは0始まりなので、最初のループの時は"Overwrite"、そうでない時は"Append"という文字列をflow variableに代入するという意味です
  3. Java Edit Variable(simple)からCSV WriterにVariable portをつなぎます。説明が少なくてわかりにくいのですが、画面上でIf file exists...というセクションで選択しているのはfileOverwritePolicyという設定項目のようで、画面上の選択肢、Overwrite/Append/Abort はそのまま設定すべき項目の値に対応するようです。Java Edit Variable(simple)の設定内容はこの仕様を踏まえたものです。つまり、flow variable:fileOverwritePolicyを同名の設定項目へ紐付ければ良いことになります。flow variableと設定項目の名前を一致させましたが、これは分かりやすさを考えただけで必須ではありません。

補足:推論結果の保存手順についての紆余曲折

保存先をCSVにしましたが、元々はRDBに書き込むようにしたかったのです。さらに三つのモデルの推論結果はめいめいに同一のテーブルに書き込むようにするつもりでしたが、これはエラーとなり失敗しました(メッセージをメモし忘れていましたが、異なるノードから一つのテーブルに書き込むことは出来ないようです)。そこで一旦出力結果をConcatenateしてから一カ所で書き込むようにしましたが、今度は、実行時間が長すぎるためか処理中にRDBの接続が切れてしまうことが分かりました。保存手順がやけに複雑となっているのは以上のような経緯によるものです。

まとめ

この記事では、KNIMEにおけるループ制御の一例をお見せしました。つまり、以下三つがポイントでした。

  1. ループを開始、終了する方法
  2. ループを利用してクエリの読み出しを動的に行う方法
  3. ループ回数によってノードの振る舞いを変える方法

もう少し洗練出来る気もするのですが、そのうち機会があれば挑戦してみたいと思います。

それでは以上で解説を終わります。


This entry was posted in データマイニング. Bookmark the permalink.