Redshift設計での個人的知見のまとめ その3

データ収集とデータマート構築プログラム

ここでは、データの源泉となるシステムからデータ収集を行い、Redshiftへとデータを配置してデータマートを構築を行う制御プログラムについて説明する。

制御プログラムの実行内容:

①.データの源泉となるシステムからデータを取得し、CSVにしてS3に配置する
②.S3のCSVデータをRedshiftのステージングレイヤにロードする
③.ステージングレイヤのデータをオリジナルレイヤにアップサートする
④.オリジナルレイヤの有効期限を迎えたデータを除いてディープコピーを行う
⑤.モデルレイヤのテーブルを導出する
⑥.データマートレイヤのテーブルを導出する

これらの実行内容をEC2インスタンスを深夜に定期的に起動して実行させていた。

①.データの源泉となるシステムからデータを取得し、CSVにしてS3に配置する

データの源泉となるのは主にデータベースだろう、データベース以外だとログファイルなどだろうか。RedshiftはS3上のCSVやJSONなどからデータロードすることができるが、私の場合には、データベースの内容をJDBCを通し、作成日時と更新日時を確認して必要な差分データを取得できるSQLを発行し、10000件ごとのGZIP圧縮されたCSVにフォーマット変換してローカルファイル出力しつつ、それを順次S3にアップした。
10000件ごとにCSVファイルにするのはいくつか理由がある。まずデータをすべて取得する前にS3への転送を始められるために転送完了までの時間を短縮できること。それから制御プログラムのEC2インスタンスに滞留するデータが少なくなるために必要とされるメモリやディスク容量を少なく保てること。そして、Redshiftは複数のCSVファイルを指定してCopyすると並列ロードが行われてロード時間を短縮することができるからだ。

②.S3のCSVデータをRedshiftのステージングレイヤにロードする

S3のCSVデータをRedshiftのステージングレイヤにロードするにはcopyコマンドを利用する。比較的高速に実行される処理で、差分データ量が1G程度と大きくなっても数十秒程度で完了する。
ところで、CSVと一口に言ってもそのフォーマットには様々な亜種が存在する。区切り文字は何か?文字列はダブルクォーテーションで囲むか?日付表現形式はどうするのか?さらにはファイルエンコードはUTF8でよいか?圧縮形式はGZIP圧縮か?。CSVのフォーマットの亜種が個別のシステムごとに存在すると保守しずらくなる。Redshiftで利用するCSVだけでなく、システム全体で必ず標準化しておこう。copyコマンドには様々なCSVの形式をロードするためのオプションがついているので標準化したCSV形式に合わせて調整する。

③.ステージングレイヤのデータをオリジナルレイヤにアップサートする

ステージングレイヤにcopyコマンドで転送したデータをオリジナルレイヤにマージする。多くのテーブルには主キーが定義されているだろうから、ステージングレイヤとオリジナルレイヤのデータの主キーを比較すればそれが新しく挿入されたデータなのか更新されてデータなのかは判断できる。またRedshift内のテーブルの話なので、それぞれ新規データの差分を反映するSelect-Insertと、更新データ差分を反映するSelect-Updateを実行することでマージが実現できる。
実はこの①、②、③を1組として複数スレッドを使い、データの源泉となるシステムやテーブルに対して並列実行を行うことで転送時間を短縮することができる。ネットワーク帯域やRedshiftインスタンス種別によっても異なるだろうが、私のケースでは6〜8スレッド程度で同時に転送をすると最も転送時間を短縮することができた。最適なスレッド数は環境に依るのでいくらか試しながら数を調整していただければと思う。
また細かなテクニックではあるがデータ量の多いテーブルから並列にデータ転送を開始するようにするとクリティカルパスを短縮することができる。これは実行の最後の最後になってデータ量が多くて実行に時間がかかる処理が開始されて、それだけが単体実行される非効率な状態を防ぐことができるからだ。

④.オリジナルレイヤの有効期限を迎えたデータを除いてディープコピーを行う

オリジナルレイヤのデータは差分更新のためのマージinsertやupdateが行われる。そうすると未ソート領域にこれらのデータが溜まってクエリ速度が遅くなってしまう。こうした未ソート領域を開放するのがVacuumだ。だが、データ量が多くなった場合にはメモリを増やしても、Vacuumでエラーが発生してしまって実行させることができなくなるケースに遭遇した。それでVacuumを使うことは諦め、代わりとして利用したのがディープコピーと呼ばれる方法だ。要は全く同じスキーマの新しいテーブルを作ってそちらにSelect-Insertでデータを全件投入してしまうという方法だ。このときについでに有効期限切れたデータをコピー対象から外すことで、有効期限切れデータの削除の目的も達成できる。
なお、有効期限切れのデータはRedshiftから削除する前に、S3上に念のため保存しておくと安心だ。unloadコマンドで簡単に任意のクエリ結果をS3上に保存することができる。もちろん様々なCSV形式でエクスポートするためのオプションがあるので標準化したCSV形式に合わせて調整する。
RedshiftのSelect-Insertはとても早い。しかしディープコピーはデータを全件移動することになるのでやはりそれなりに時間のかかる処理になる。一方、未ソート領域があってもクエリが動作しなくるわけでもないし、一般ユーザからのクエリの対象はデータマートのレイヤであってオリジナルレイヤへ求める速度の優先は全く高くない。それぞれシステムの性質にも依るだろうが数日、場合によっては1ヶ月程度であっても、放置による動作の低下は大きな問題にならないかもしれない。私の場合には、ディープコピーを行うだけの時間が日次で確保可能だったので毎日ディープコピーを行ったが、週次などの実行でも問題にならないシステムが多いのではないかと思う。それぞれの状況に応じてディープコピーの頻度は調整すると良い。
ところで、この④からの処理は処理の主体がRedshiftに移り、Redshift自体に大きな負荷をかける。Redshiftは同時実行性能が低いので、これ以降の処理はシリアル実行としていた。

⑤.モデルレイヤのテーブルを導出する

これはオリジナルレイヤのデータからモデルレイヤのテーブルを導出する処理だ。だが、以前にも説明したようにモデルレイヤのテーブルはそのほとんどをビューとしていたので、実際にはほとんど処理がない。パフォーマンス上の理由からごく一部だけをビューではなくテーブルとしており、そのテーブルだけを導出するにとどめていた。

⑥.データマートレイヤのテーブルを導出する

これはモデルレイヤのデータからデータマートレイヤのテーブルを導出する処理だ。モデルレイヤはほとんどをビューで構成していたので、物理的にはオリジナルレイヤのデータが使われるが、モデルレイヤにて各種データが正規化されていればデータマートレイヤのテーブル導出が簡単に行える。

私の場合にはSelect-Insertを使ってテーブルのデータを全件を導出し直した。もちろんある程度時間はかかるが、RedshiftにとってSelect-Insertは得意な処理で、最も時間のかかるテーブルであっても20分程度で完了した。また全件導出によるディープコピーとなるので未ソート領域のないクエリに適したテーブルを導出できる。

テーブル導出の処理ステップ

テーブル導出する場合は、実はいくつかの処理ステップにて導出する必要がある。またディープコピーでもほぼ同様の処理ステップになる。
-- 1.新テーブルを作成する
create table a_table_new ( DDL... );

-- 2.新テーブルに既存テーブルのデータを挿入する(有効期限内のもの)
insert into a_table_new (select * from a_table where 有効期限内...);

-- 3.新しいテーブルと古いテーブルを入れ替える
start transaction;
alter table a_table rename to a_table_old;
alter table a_table_new rename to a_table;
commit;

-- 4.新しいテーブルへ向き先が変わるようにビューを再定義する
create or replace view dependent_a_view as ( DDL... );
create or replace view dependent_b_view as ( DDL... );

-- 5.古くなったテーブルを削除する
drop table a_table_old;
わかりにくいのは、4.の新しいテーブルへと向き先を変えるためにビューを再定義しているところだろう。実はテーブル名を変更してもビューは古いテーブルへと向き先が自動で変わって新しいテーブルを参照してくれない。だから新しいテーブルを作ったら再度ビューを定義することで新しいテーブルへと向き先を変更する必要があるのだ。

なお、上記のステップを使えば、そのステップの実行のいずれの時点であっても、テーブルやビューへのクエリが利用できなくなることがない。

制御に必要な実行履歴テーブルとロックテーブル

細かな話ではあるが、データマートを構築するまでの制御プログラムの実行を実現するためには、実行履歴を保存しておくことが必要だ。差分データ連携を実現するには前回の実行日時を、連携するテーブルごとに保存しなければならない。この実行履歴には保守の目的を含めて、実行対象テーブル、挿入データ数、更新データ数、実行開始時刻、実行終了時刻、実行結果状態などを保存しておくとよい。

また、処理を行っているテーブルに対して、上記の制御プログラムが2重に実行されるなどした場合にはデータが破損しかねない。オンライン処理ならDBドランザクションを用いたロックで処理させるところだが、多量のデータをSelect-Insertで処理するこのようなプログラムでDBトランザクションを用いるのはあまり良い手とはいえない。処理中のテーブルを論理的にロックできるように、ロックテーブルを作ってプログラム中から必要に応じて状態を確認し、他のプロセスで処理中であればエラーとして処理を中断するなどさせると安心だ。特にオリジナルレイヤのデータの破壊だけは絶対に避けなければならない。

人気の投稿