2011/11/24

直積量子化(Product Quantization)を用いた近似最近傍探索についての簡単な解説

"aka motsu-nabe" by chatani
概要
冬の寒さも一段と厳しくなってまいりました。おでんや鍋が恋しくなる季節です。

さて、最近ようやっと一仕事が終わりまして、長ったらしい記事が書けるようになりました。ですので、今回は2011年にTPAMIで発表された、近似最近傍探索についての論文『Product quantization for nearest neighbor search』について簡単に紹介したいと思います。

この論文は2011年に発表された、最近傍探索アルゴリズムの決定打です。シンプルな理論でありながら既存手法を打ち破るほどの強力な性能を有し、速度も非常に高速、かつ省メモリなのでスマートフォンに載せ、リアルタイムで動作させることも可能です。

以前この手法はCV勉強会@関東で紹介されたらしいのですが、具体的に紹介しているページは(最近すぎるので当たり前ですが)現在のところあまり見かけていません。ただ個人的にこの手法はあと5年くらいは本線で戦えるものと思っておりますので、きちんと解説した記事を出すことはある程度有意義なものかなぁと、そういう風に考えています。

最近傍探索の課題
最近傍探索は最も古典的な手法でありながら、現在においても頻繁に利用されているアルゴリズムの一つです。近年のプロセッサの性能向上によって大規模なデータ処理が家庭のパソコンでもできるようになり、コンピュータビジョンにおいても数億件規模(10^8~9)の高次元ベクトルデータを最近傍探索を用いて処理することで、良い精度の結果を得ることが当たり前の時代になってきました。

ただそうは言っても、最近傍探索をそのまま用いることは現実的ではなく、いくつかの課題が浮上してきます。具体的には以下のとおりです:
  • いかに圧縮した状態でデータを格納するか: ベクトルデータをそのままの状態で保持することはメモリ容量の圧迫へ繋がります。例えば128要素のベクトルを考えた場合、floatを使用した場合でも32×128=4096bitを1エントリで消費してしまいます。IOアクセスが生じてしまうと速度は一気に低下してしまうため、ベクトルデータはすべてメモリ上に展開しておくことが望ましいー
    しかし、今回我々が扱いたいエントリ数というのは数億とかそういったオーダーです。そのオーダーのベクトルをそのまま格納するのは現在のメモリ容量だとなかなか厳しい。ですので、情報量を保ったままベクトルをいかに圧縮できるかが、実用的な面において重要な課題となります。
  • いかに高速に解を見つけられるか: 次元数の多いベクトルの距離計算は基本的に低速です。さらにそのまま実装してしまうとすべてのエントリに対して距離の計算をする必要があるため速度はO(N)となり、実用的ではありません。1エントリあたりの計算コストを削減したり、そもそも明らかに異なる(クエリより離れすぎている)エントリを計算しないことで、計算を高速に行うことも同じく重要となります。
これらの課題を克服するため、近年では近似最近傍探索と呼ばれる手法が盛んに研究されております。その中でも有名なアルゴリズムはLSH(Locality-Sensitive Hashing)であり、ハッシュ関数によって抽出される近傍エントリのみを計算することで、高速に最近傍探索を行います。ただこの手法は計算用のハッシュを別に格納する構造であるため、全体のサイズは減少するのではなく増大してしまいます。

また、Preferred Researchで紹介されているMinHashは、ベクトルの要素数が定まっておらず、成分が{0,1}の2値しかとらないような特殊なベクトルを計算する際においては有効な手法でありますが、そうでない場合においてはいったんベクトルをバイナリ表現に落とし込まなければならないので一般的に精度は低下してしまいます。今回紹介する直積量子化は、対象のベクトルが高次元であり、かつユークリッド距離による最近傍探索を行いたい場合において威力を発揮する手法です。

スカラ量子化
直積量子化について説明を行う前に、まず量子化とは何かについて説明していきます。物理をかじったことのある方ですと『量子化 = 物理量を演算子へ変換する』ものと思っちゃいますけど、そっちの量子化ではなくある数を特定のビット列で表現することの量子化です。
分割された領域に、特定のビットを割り当てる
例えば、0〜1の間をとる数を2ビットで表現しろという問題を考えた場合、普通は上図のように0〜1を4分割し、それぞれの数値を割り当てることで表現を行います。このように、少ないビット数で対象の数値を表現することを一般に『スカラ量子化(Scalar Quantization)』といいます。スカラ量子化によってfloat型(32bit)の数値は、精度を犠牲にすることによって僅か2bitで表現することができるようになりました。

ベクトル量子化
ところで、このスカラ量子化は対象のデータが数値であった場合には効率的ですが、ベクトルの場合だと効率的とは言えません。
スカラ量子化では、ベクトルを効率良く表現することができない
例えば上の図について考えてみましょう。この点群を1エントリ辺り1ビットで表現しようとした場合、スカラ量子化によって各成分を量子化してしまっては最低でも2ビットが必要となりますが、物事を空間的に考えることで発想を転換します。つまり、それぞれのクラスタを代表する点を予めコードブックとして保持しておき、各点は一番近い代表点のインデックスを保持しておくのです。これによってそれぞれの点は僅か1ビットで表現することができるようになりました。このような手法のことを一般に『ベクトル量子化(Vector Quantization)』といいます(*1)。
ベクトル量子化によって、ベクトルを2つの代表点によって表せるようになった

*1: 本手法のクラスタリングにはk-means algorithmを使用しています。k-meansに関しては『クラスタリングの定番アルゴリズム「K-means法」をビジュアライズしてみた』が視覚的に分り易いのでそちらを参照してください。

直積量子化
ベクトル量子化はスカラ量子化よりも少ないビット数で効率良くベクトルを表現できるのですが、同時に欠点もあります。次元が増大するにつれて使用するコードブックが指数的に増えてしまうのです。

これは次元の呪いに起因するものです。数次元〜数十次元くらいだったらそんなに問題とはならないんですけど、数百とか数千次元になってくるととんでもない量のコードブックが必要となり現実的でない。『直積量子化(Product Quantization)』はこの次元の呪いを解決するために用いられる、スカラ量子化とベクトル量子化の中間を行く手法です。
高次元のベクトルを直積表現によって分解する

具体的な手法は下記の通りです。まず、N次元のベクトルをM分割し、N/M次元のサブベクトルを生成します。そして、そのサブベクトルをそれぞれベクトル量子化することによって、インデックスを(i1, i2, ..., iM)のタプルで表現します。そうすれば次元の呪いは発生せず、データ表現も比較的低ビットで抑えられます。

例として128次元のベクトルを考えてみましょう。まず、このベクトルを16分割することで8次元のサブベクトルへ落とし込みます。そして、そのサブベクトルをベクトル量子化によって8bit表現に持ってきた場合、対象のベクトルは8×16=128bitで表現でき、かつ使用するコードブックサイズも16×256=4096と少数で済みます。float型のサイズで考えると30倍以上の圧縮効率です。このように、少数のコードブックサイズで効率よくベクトルを表現できることが直積量子化の利点です。

距離計算
さて、直積量子化によって各ベクトルのエントリを圧縮した状態で格納したとしましょう。あるクエリベクトルが与えられたとして、そのユークリッド距離が最小となるエントリを見つけるにはどうすればいいでしょうか?

幸運なことに、ユークリッド距離の計算は各要素毎に独立しています。つまり、ただ単にクエリベクトルを各サブベクトルに分け、それぞれの領域で距離を計算し、その和を計算するだけで良いのです。よって、直積量子化を用いて最近傍探索を近似的に行うためには:
  1. クエリベクトルをM分割し、N/M次元のサブベクトルに分解する
  2. クエリのサブベクトルと、それぞれのコードブックに記された代表ベクトル間の距離を予め計算しておく
  3. それぞれのエントリがどのインデックスに位置しているのか確認し、2で計算した距離を参照して足し算を行う
  4. 最小距離をもつエントリを候補として提出する
ことで実現できます。これが直積量子化を用いた、近似最近傍探索の核となるアルゴリズムです。

詳細について
この距離計算は非対称性からある程度のバイアスがかかっているため、実際の距離を正確に見積もるためにはこのバイアスを取り除く必要があったりします。また、具体的な距離計算がコードブックだけで済むので省力化されているとはいえ、計算量は依然としてO(N)であり、そこらへんも大規模なデータ数では足を引っ張る原因となります。

んじゃあこれをどうやって解決するのかについては、現論文

http://lear.inrialpes.fr/pubs/2011/JDS11/jegou_searching_with_quantization.pdf

を参照してください。上記2つの懸念についての解決案が記述されており、上の知識があれば多分すらすらと読めるはずです。
本当はこれも解説しなきゃなんないんですけど、書いている内に疲れたちゃったのでとりあえずコアなとこだけということで勘弁してください。気が向いたら更新します。

2011/09/24

OpenCVアルゴリズムを高速に記述できる補助ライブラリcvutilを公開します

"The Dunny Collective - Brisbane River" by KayVee.INC
概要
OpenCVを用いてアルゴリズムを組む際、ある画像からある画像へ変換処理を行ったり、関数fを用いて新規に画像を生成したり、画像の結果を1つの値へと集約するなどの定型的な処理は非常に多く行われておりますが、並列処理を行いかつ速度を考慮したプログラムを組もうとするとある程度のコード量を必要とするためそれなりの手間となります。また画像処理によくあるループ文のネストが大量に発生するため、普通に組んでしまうと一見何をやっているのか非常に分かりづらいコードとなってしまいます。

cvutilはこのような冗長な作業を定型化し、複数のアルゴリズム関数とC++0xに新しく搭載されたラムダ式を使用して高速にアルゴリズムを記述することができます。このようにして作られたアルゴリズムは自動的にOpenMPによって並列化されるため、研究用途などの速度をある程度担保しつつもメンテナンスのし易いコードを記述するための手助けとなります。

インストール
cvutilgithubにて公開しております。ダウンロード先は下記のURLを参照してください。

https://github.com/rezoo/cvutil

cvutilはヘッダオンリーのライブラリです。バージョンがOpenCV 2.x系統であれば、対象のincludeディレクトリにcvutilをコピーするだけで使用できます。
C++0xのラムダ式を用いることで端的に記述することができますが、関数オブジェクトを使用すればある程度古いコンパイラでも正常に動作するでしょう(そこまでして使う利点があるのかどうかは分かりませんが)。

ちなみに、元々は自分用に作られた適当なライブラリなので、バージョンアップに伴ってある程度破壊的な変更を行うこともありますのでご了承ください。その際はバージョンを0.1上げることによって対応します。

アルゴリズム一覧
ここではcvutilに搭載されているアルゴリズム一覧の一部を紹介いたします。これらのアルゴリズムはSTLのそれと似ているため、C++を触ったことのある方は名前から直感的に機能を類推できるでしょう。

transform_image
transform_image関数は対象の画像を、ユニタリー関数を通して出力先へ変換します。cvutilの中では最も汎用性の高い関数となります。
cvutil::transform_image(src, dst, [](uchar x) {
    return cv::saturate_cast<uchar>(x*2); });
この関数は複数の画像を高度に合成するような場合においても利用できます。アリティ(渡される引数の数)によって、最も適したアルゴリズムが自動的に呼び出されます。
cvutil::transform_image(src1, src2, dst, [](uchar x, uchar y) {
    return (x + y)/2; });


generate_image
generate_image関数はバイナリー関数を用いて、出力先に画像を新しく生成します。generate_imageをそのまま用いることは通常ありませんが、x, y座標を元に画像を生成する関数generate_image_xyや、画像中央に原点がある右手系の座標系u, v座標を元とした関数generate_image_uvは比較的使い勝手が良いと思います。
int width = 512;
int height = 512;
cv::Mat_<cv::Vec3b> dst2(height, width);
cvutil::generate_image_xy(dst2, [=](int x, int y) {
    return cv::Vec3b(0, x*256/width, y*256/height); }); // BGR
reduce_image
reduce_image関数は対象の画像を、バイナリー関数を通して1つの結果へと集約し、その値を返します。
cv::Mat_<uchar> src3(3, 3);
src3(0, 0) = 0; src3(0, 1) = 1; src3(0, 2) = 2;
src3(1, 0) = 3; src3(1, 1) = 4; src3(1, 2) = 5;
src3(2, 0) = 6; src3(2, 1) = 7; src3(2, 2) = 8;
int result = cvutil::reduce_image(src3, (int)0, std::plus<int>()); // 36
上のサンプルは全要素に対しての足し算を行っています。transfromの並列化は兎も角、reduceの並列化は一般的に面倒なのですが、reduce_imageはそこらへんの処理もきちんと並列化してくれます。

transform_reduce_image
transform_reduce関数は1枚または複数の画像を対象の関数で1つの値へ変換した後に、reduce関数を適用します。2種類の結果を合併した集約結果を返す場合などに役立つと思います。

minmax_element_image
minmax_element_image関数は画像の最小値と最大値を返します。
std::pair<uchar, uchar> minmax_result =
    cvutil::minmax_element_image(src); // (min, max)

integrate_image
integrate_image関数は画像を2次元に積分します。具体的には、inclusive_scan関数を2次元的に適用します。バイナリー関数にstd::plusを適用した場合、この関数は積分画像の生成と等価です。また、画像の型が対象の二項演算子に関してアーベル群を成していた場合、任意の矩形領域に関してのreduceintegrate_imageによって生成された画像を用いて、定数時間による計算が可能となります(参考: 並列環境におけるreduceとscanアルゴリズム)。

OpenCVにも積分画像を生成する関数は存在しているのですが、この関数は並列化を行いかつ任意の二項演算子を適用できる点が異なります。
cv::Mat_<uchar> src3(3, 3), dst3(3, 3);
src3(0, 0) = 0; src3(0, 1) = 1; src3(0, 2) = 2;
src3(1, 0) = 3; src3(1, 1) = 4; src3(1, 2) = 5;
src3(2, 0) = 6; src3(2, 1) = 7; src3(2, 2) = 8;
cvutil::integrate_image(src3, dst3, std::plus<uchar>());
// [0, 1, 2;     [0,  1,  3;
//  3, 4, 5;  =>  3,  8,  15;
//  6, 7, 8]      9,  21, 36]

cvutilはその他にも色々と関数がありますが、多分名前からどんなことやってるのか大体は理解できると思いますので、とりあえず代表的な関数をいくつか紹介いたしました。

License
自分用に作っているだけなのですが、一応ライセンスはMIT Licenseを適用します。誰でも自由に無制限に使ってかまいません。

とりあえずはそんな感じです。もし何か使っていく上で質問や疑問、あるいはご指摘等ございましたらお気軽にコメントまたはメールをお願いします。

追記
お気づきの通り、本来の文脈ならば、アルゴリズムに渡す引数はcv::Mat_<T>ではなく何かしらの抽象構造Viewを渡すべきなのです。C++のSTL Algorithmはコンテナでなくイテレータという抽象に依存することによって、任意のデータ構造による適用が可能となり、さらにtransformed_iteratorなど、変形を遅延させるような構造を取ることができる。つまり、cvutilにおいても本来はそのような『抽象に依存する』データ構造を起点にアルゴリズムを組んでいくべきなのです。
// こんな感じで書けたら最高なんだけど…
cvutil::transform_image(
    cvutil::make_resized_view(src, dst.size()),
    dst,
    any_functor);
ただこのアナロジーを画像に適用しましょうとなるとなかなか難しく、現状としてはboost.GILしか存在していない。そういった意味で確かにboost.GILは魅力的なライブラリなのですが、GILに適用するalgorithmが殆ど存在していませんし、なによりコンパイル時間がboooooooooooostしてしまう。ぱぱっと書いてぱぱっと書き捨てられるような構造がむしろ研究にとっては重要なわけで、そんな面倒なことするよりも多少汚いOpenCVを採用したほうが使い勝手としては向上するよねという、まぁそういった言い訳です。

2011/09/16

OpenCVの画素値アクセスについて

"Fire King" by A Continuous Lean
何度も何度も繰り返されている議題ですが、今回は画像処理に良くある『全画素を変換する』という処理のパフォーマンスについて考察します。
あなたはOpenCVのcv::Mat_<T>を使って、全ての画素値をある関数funcで変換するという処理を行いたいとします。それで、たぶんなんですけど、普通に何も考えず組むとしたらこんな感じのコードになると思います:
for(int y=0; y<src.rows; ++y) {
    for(int x=0; x<src.cols; ++x) {
        dst(y, x) = func(src(y, x));
    }
}
なんの変哲もない普通のループです。このコードは普通に動きます。おわり。ちゃんちゃん。

…そういきたいのですが、パフォーマンスを気にする方々はここでいろいろとした疑問が沸き起こってくるわけです。それじゃあどこが悪いのか。1つずつ疑問点を上げ、その上で上記のコードがある程度の合理性を持っているコードであることを示していきます。

疑問1: 関数呼び出しを行っているからその分遅い?
src(y, x)という処理は結局cv::Mat<T>::operator()という関数を呼び出していることに相当します。なので、関数呼び出しを数十万のピクセルに対して行っているため、関数呼び出しにかかるコストが無視できないレベルで発生していくのではないか??

回答: 遅くならない
OpenCVのcv::Mat_<T>::operator()はインライン展開されます。なので、関数の呼び出しに対して発生する速度的なコストは発生しません。安心して使いましょう。

疑問2: 掛け算を余計に行なっているために遅い?
結局cv::Mat_<T>::operator()の行っていることはコードに直すとこんな感じです。
src(y, x) == ((T*)(src.data + src.step.p[0]*y))[x]
でも、この項をよく見てみると、xのループに対してyの値は固定されているため、第二項の計算は正直不要でしょう。なのに掛け算の計算でコストが余分にかかってしまうため、速度が遅くなるのでは??

回答: 遅くなるけどそんなに問題じゃない
確かに遅くなりますが、そこまで問題となる箇所ではありません。時代は3GHzに突入しているのです。整数の掛け算にかかるコストなんてたかが知れています。

そんなところよりも、別の箇所を改善したほうが大抵の場合劇的に向上します。仮に、funcの中にstd::expstd::logが含まれていたとすると…
dst(y, x) = std::exp(-(src(y, x) - mu)*(src(y, x) - mu)); // ぐぎゃー
dst(y, x) = -src(y, x)*std::log(src(y, x)); // おもいー
…それだけで大分重くなってしまいます。sqrt, sin, cosはまだ我慢できるにしても(それでも掛け算よりは結構重い)、指数、対数関数は普通に使うとかなり重いんです。CV系統などの精度を求めないような計算でしたら、スプライン関数などの近似式に置き換えることを検討しましょう。

at<T>(), operator()は範囲チェックを行ってくれる
昔のOpenCVにはat<T>()と同じような処理をしてくれるCV_IMAGE_ELEMマクロなんて言うものもありましたが、これはもう時代遅れの代物です。なぜなら、cv::Matat<T>(), cv::Mat_<T>operator()は与えられたx, y座標が範囲内のものであるかどうか、きちんとチェックを行ってくれるからです。範囲外である場合には警告を送ってくれます。そういった意味でもCV_IMAGE_ELEMよりatoperator()を利用していくべきでしょう。

余計な範囲チェックを行っているために遅くならないのか?』という疑問もあるかもしれませんが、この範囲チェックはReleaseモードで消去されます。なので実際の動作には全く影響されないのです。

cv::Mat::at<T>(), cv::Mat_<T>::operator()は早い
at<T>(), operator()はきちんと考えられているメンバ関数です。ポインタを用いてがりがり書いたコードよりは少し遅いかもしれませんが、この遅さは全体のアルゴリズムに影響するような遅さではありません。ほっと胸を撫で下ろし、安心して使いましょう。

でもやっぱり速度が気になる
とはいっても、きちんと書かれていないという神経質な方もいるかもしれません。at<T>()なんて負けた気がして使いたくない……その場合のコードは大体以下のようになります。
for(int y=0; y<src.rows; ++y) {
    uchar* src_ptr = src.ptr<uchar>(y);
    uchar* dst_ptr = dst.ptr<uchar>(y);
    for(int x=0; x<src.cols; ++x) {
        dst_ptr[x] = func(src_ptr[x]);
    }
}
これだけじゃあ高速化されたといっても微々たるものなので、せっかくですからOpenMPによる並列化を行ってみましょう。さらにassertでサイズが合っているかチェックを行うようにして…そうすると最終的なループはこんな感じになります。
assert(src.size == dst.size);
#pragma omp parallel for
for(int y=0; y<src.rows; ++y) {
    uchar* src_ptr = src.ptr<uchar>(y);
    uchar* dst_ptr = dst.ptr<uchar>(y);
    for(int x=0; x<src.cols; ++x) {
        dst_ptr[x] = func(src_ptr[x]);
    }
}
大分行数が増えてしまいました。ただ自分がやりたいことは全画素に対してfuncを適用させたい、ただそれだけなのに、ここまで行数を書く必要があるのです。しかもぱっと見で何をしたいのか良く分からない。面倒ですし分かりにくい。

ここで、C++を少しかじったことのある人でしたら、ユニタリー関数を適用させる表式のアルゴリズム関数を適用すればいいんじゃね?といった考えに至るのは自然な発想でしょう。こんなふうに:
template<typename SrcType,
         typename DstType,
         typename UnaryFunction>
void transform_image(const cv::Mat_<SrcType>& src,
                     cv::Mat_<DstType>& dst,
                     UnaryFunction unary_op) {
    assert(src.size == dst.size);
    #pragma omp parallel for
    for(int y=0; y<src.rows; ++y) {
        // equivalent to ptr<T>(y)
        const SrcType* src_ptr = src[y];
        DstType* dst_ptr = dst[y];
        for(int x=0; x<src.cols; ++x) {
            dst_ptr[x] = unary_op(src_ptr[x]);
        }
    }
}
上の形の式をヘッダファイルあたりに定義してやれば、後は以下のようなラムダ式を利用して書くだけで自動的に最適なループ処理を行ってくれます:
transform_image(src, dst, [](uchar x) -> uchar {
    return cv::saturate_cast<uchar>(x*2);
});
この表式ですと、どんな変数がなんの変数に影響を及ぼすのか、全画素に対してどのような処理を行うのか一目瞭然です。また、引数に渡すunary_opはコンパイラの最適化処理によって自動的にインライン展開されるため速度的なオーバーヘッドはありませんし、OpenMPによる並列化も行ってくれます。また任意の関数オブジェクトを作用させることができるので、非常に使い回しの効く関数となっています。

画素云々について言及している方はこれまで数多く見かけましたが、このような画像に特化したアルゴリズム関数について言及している方は、何故か誰もいないというのが現状です。なのでまぁ自分用にざっくり書いているコードなのでいろいろと不備はありますけど、上記のようなアルゴリズムを詰め合わせた補助ライブラリを近いうちに上げていきたいと思います。

追記(2011/09/24): 記事のとおり、補助ライブラリであるcvutilgithubに上げました。詳細は別記事『OpenCVアルゴリズムを高速に記述できる補助ライブラリcvutilを公開します』を参照してください。

補足
『イテレータを使って標準のアルゴリズムを適用すればいいだけなのでは?』と思う方もいるかもしれません。つまりこんな感じですね:
std::transform(src.begin(), src.end(), dst.begin(), [](uchar x) -> uchar {
    return cv::saturate_cast<uchar>(x*2);
});
確かにこの式でも動くんですけど、速度は遅いです。なぜならこのイテレータの指しているデータは、メモリ上に連続していないからです。言い換えると、任意の自然数Nに対して『&(*(iter + N)) == &(*iter) + N』であることが保障されていないため、きちんと対象のデータを指し示すように補正しなければならない。その補正処理に余分な負担がかかって遅くなってしまう。ただ、もしメモリ上に連続であるならば直接ポインタをイテレータとして入れてやることで、(きちんと動作する保障はないですが)高速に動作してくれるでしょう。並列化はされていませんけれど。

また、上記のtransform_imageは並列化は行われておりますが、SIMDによる高速化は行われておりません。これをジェネリックに行うというのは現状としてなかなか厳しいのですが、Boost.SIMDの登場によってある程度の希望の光は見えてくるのではないかと思っています。

追記:
  • (2011/09/16)コードをコンパイルが通るように若干の修正。ucharを暗黙的に使うのではなく明示的に指定するようにした
  • (2011/09/17)記事を書いていて初めて知ったんですけど、ptr<T>(y)cv::Mat_<T>だとoperator[]でオーバーロードされていたんですね。恐らくこれを用いることがOpenCV側の意図しているところだろうと判断し、上の式を書き換えました。

2011/06/26

boost.gilを利用して透過画像を作る

対象の画像から、特定の色の箇所だけを透明にした透過画像を作る

こういった課題に一ヶ月近くかかって出来なかったとかいう話を聞いて、流石にそれはかかり過ぎだろうと脊髄反射で言ってしまったわけですけど、それじゃあ自分だったらどうだろうとか後で思ってちょっとやってみることにしました。好き勝手言って出来なかったとか恥ずかしすぎですから、やっぱりそこらへんは実際にやってみる責任みたいなものは感じているわけです。というわけでboost.gilを使って書いてみた結果が以下。
#include <boost/gil/gil_all.hpp>
#include <boost/gil/extension/io/png_io.hpp>

using namespace boost::gil;

template<typename Pixel>
struct transparent_functor {
public:
    explicit transparent_functor(Pixel bg_pixel): bg_color_(bg_pixel) {};
    Pixel operator()(const Pixel& src) {
        if(src == bg_color_) {
            Pixel dst = src;
            get_color(dst, alpha_t()) = 0;
            return dst;
        } else {
            return src;
        }
    }
private:
    Pixel bg_color_;
};

int main() {
    const char* filename = "src.png";
    const rgba8_pixel_t background_color(0, 255, 0, 255);

    rgba8_image_t src_image;
    png_read_image(filename, src_image);
    rgba8_image_t dst_image(src_image.dimensions());

    transform_pixels(
        const_view(src_image),
        view(dst_image),
        transparent_functor<rgba8_pixel_t>(background_color));
    
    png_write_view("dst.png", view(dst_image));

    return 0;
}
結果としてはこんな感じです。上が元の画像で、下が結果の画像。正常に抜き出されていることが分かります:
余談
  • プログラムより文章を紡ぎ出すのに時間がかかって、改めて自分の文章力の無さにうんざりしています
  • というか最近暑い!自分は寝たいのに梅雨ですごい蒸し暑いから寝付けず気になってこんな不毛なことやっちゃってるわけです。今度こそ寝ます!!!

2011/06/02

Thrustから見る、reduceを用いたアルゴリズム実装

"The Great Day of His Wrath" by John Martin
はじめに

世はまさに大並列時代。火を手にした人類が飛躍的な進化を遂げたように、並列化のパラダイムは、今まで到底不可能と思われていた速さで計算を行うことができるようになりました。

ところが、どんな手法にも欠点は存在するもので、実際に実装しようとすると非常に難しい。何故かというと、並列には並列化特有の問題が存在しており、愚直に実装してしまうとCPUより早くなってしまうどころか遅くなってしまうことだってあり得るのです。これを回避するにはGPUの内部構造についてきちんと理解をした上で、実装したいアルゴリズムそれぞれの場合に特化したコーディングを行う必要がある。

しかしよくよく考えるとおかしな話です。私たちが実装したいのはあくまで手法であり、ハードウェアではありません。なぜこのような詳細について把握する必要があるのでしょう。これはきちんとした抽象化がなされていない結果生み出された、言ってみれば妖です。この妖によって余計な手間が生み出され、コーディング量は増大し、余計なバグが生成され、絶望した開発者はうつになり、自殺者は増え、世界は暗雲に包まれるー

そのような混沌の中、ある一筋の光、Thrustが現れました。ThrustはCUDAを抽象化し、C++のパラダイムを用いることで本質にのみ注力できるよう開発されたライブラリです。ユーザはただIteratorに対してアルゴリズムを適用するかの如くdevice_vector<T>と各種アルゴリズムを駆使することで、流れるようなプログラムを組むことが可能となります。このような抽象を用いることで、ユーザは詳細な内部構造について頭を悩ませる必要が無くなりました。誰が書いているのか分からないから使いたくない?ThrustはNVIDIAに所属している開発者Jared HoberockNathan Bell氏によって書かれています。2人とも並列化のプロフェッショナルなので、普通の人が愚直に実装するよりも大抵の場合効率的でしょう。

さて、Thrustには並列化を行うための多種多様なアルゴリズムが存在しています。そして、Thrustはオープンソースです。これはつまり、このソースコードを読めば並列化のプロがどのようにして各種アルゴリズムの並列化を実装しているのかが分かるということです。

Reductions

ということで、最近はThrustのソースコードをちょくちょく読んでいるところです。まだ読み始めた最中なのですが、特に感動したのはReductionsの項でした。

数々あるSTLのアルゴリズムを普通に実装しようとすると、一つ一つのアルゴリズムすべてに対して、コアレッシングなどのGPGPU特有の問題を回避したコードを記述しなければならず、非常に面倒です。Thrustはこの問題をreduce関数によって解決しました。この関数は並列化が可能なので、これを用いて書かれた関数は自動的に並列化されます。つまり、Thrustはいわば『全から一を返す』アルゴリズムすべてをreduceを用いて記述することによって、冗長な記述を回避しているのです。

このreduceを用いた並列化の実装について簡単に説明していくことが、今回の記事の目的です。reduceを初めて聞いた方は前回の記事『並列環境におけるreduceとscanアルゴリズム』を参照してください。

ここで、本来のThrustはC++によって記述しているのですが、そのままC++で書くと非常に冗長で解り辛くなってしまうので、今回は本質のみを抽出するためHaskellを用いて説明します。あまりHaskellに慣れていないので不適切な書き方等あるかもしれませんが、そのへんはご指摘頂けると幸いです。また、今回のreduceにはfoldr関数を代わりに用いています。分かると思いますが一応。

実装
  • max_element, min_element, minmax_element
その名の通りリストの中の最大値、最小値、最大と最小値のタプルを返します。
*Main> max_element [1,3,2,4,3,1]
4
*Main> min_element [1,3,2,4,3,1]
1
*Main> minmax_element [1,3,2,4,3,1]
(1,4)
ここらへんはまぁ普通に分かります:
max_element :: Ord a => [a] -> a
max_element list = foldr max (head list) list

min_element :: Ord a => [a] -> a
min_element list = foldr min (head list) list

minmax_element :: Ord a => [a] -> (a, a)
minmax_element list = foldr minmax_func (first, first) $ zip list list where
    first = head list
    minmax_func (min1, max1) (min2, max2) = (min min1 min2, max max1 max2)

  • all_of, any_of, none_of
all_of, any_of, none_of関数は対象のリストと真偽値を判定する関数を引数に取り、
  • すべて当てはまっている
  • 一つ以上当てはまっている
  • すべて当てはまっていない
場合にはTrueを返す関数です。pythonではall(), any(), C++ではまんまのやつがありますね。
*Main> all_of (\x -> x > 3) [1,3,2,4,3,1]
False
*Main> any_of (\x -> x > 3) [1,3,2,4,3,1]
True
*Main> none_of (\x -> x > 3) [1,3,2,4,3,1]
False
これも実装的にはらくちん:
all_of :: (a -> Bool) -> [a] -> Bool
all_of binary_pred list = foldr (&&) True $ map binary_pred list

any_of :: (a -> Bool) -> [a] -> Bool
any_of binary_pred list = foldr (||) False $ map binary_pred list

none_of :: (a -> Bool) -> [a] -> Bool
none_of binary_pred list = all_of (not . binary_pred) list
関数合成を用いてエレガントに解決。
  • inner_product, transform_reduce
inner_productは2つのリストに二項演算子を作用させてから、その結果からなるリストをreduceする関数です。transform_reduceはリストをいったんユニタリー関数を用いて変形させてから、reduceを実行する関数です。
*Main> inner_product (*) (+) 0 [0,1,2] [3,4,5]
14 -- = 0*3 + 1*4 + 2*5

*Main> transform_reduce (\x -> x*2) (+) 0 [1,2,3]
12 -- = 1*2 + 2*2 + 3*2
C++だといろいろ冗長に書かないといけないですけど、Haskellだったら余裕です:
inner_product :: (a -> b -> c) -> (c -> c -> c) -> c -> [a] -> [b] -> c
inner_product binary_op1 binary_op2 init list1 list2 =
    foldr binary_op2 init $ map operate $ zip list1 list2 where
        operate (x, y) = binary_op1 x y

transform_reduce :: (a -> b) -> (b -> b -> b) -> b -> [a] -> b
transform_reduce unary_op binary_op init list =
    foldr binary_op init $ map unary_op list
  • count_if, count
count_ifは条件に合致している要素数を返す関数で、countは指定した値と同じ要素の数を返す関数です。
*Main> count_if (\x -> x < 3) [1,2,3,4,5]
2
*Main> count 3 [1,3,2,2,3]
2
count_if :: (a -> Bool) -> [a] -> Int
count_if binary_pred list = foldr (+) 0 count_list where
    count_list = map transform_func list where
        transform_func x = if binary_pred x then 1 else 0

count :: Eq a => a -> [a] -> Int
count value list = count_if (== value) list
ThrustではTrueだったら1, Falseだったら0となるリストを作成して、それを足していく手法をとっていました。
countcount_ifが思いつけば楽勝でしょう。
  • find_if, find_if_not
find_ifはリストの中で条件に合致している『一番最初の要素』を示すイテレータを返します。find_if_notは逆で、条件に合致していない『一番最初の要素』を示すイテレータを返します。今回は単純に要素のラベル番号を返すようにしました。なかったら最後のラベル+1が帰ってきます。
*Main> find_if (\x -> x > 3) [1,3,2,5,4]
3
*Main> find_if (\x -> x > 7) [1,3,2,5,4]
5
*Main> find_if_not (\x -> x > 3) [1,3,2,5,4]
0
これは微妙に入り組んでいます:
find_if :: (a -> Bool) -> [a] -> Int
find_if binary_pred list = snd result where
    result = foldr find (False, length list) tuple_list where
        find (b1, i1) (b2, i2)
            | b1 && b2  = (True, min i1 i2)
            | b1        = (b1, i1)
            | b2        = (b2, i2)
            | otherwise = (False, max i1 i2)
        tuple_list = zip (map binary_pred list) [0..]

find_if_not :: (a -> Bool) -> [a] -> Int
find_if_not binary_pred list = find_if (not . binary_pred) list
ただ、find_ifはちょっと本家と違います。インデックスが小さい項が常に左の引数となるため省略してるんだとは思いますけど、対称性を満たしていないのがちょっと気持ち悪いので、ここでは交換則を満たすように関数を変形しています。

実際の実装と課題
ここではhaskellを用いて実装しましたが、もちろん実際の実装は異なっており、zipzip_iterator, maptransform_iterator, タプルはtupleを用いて実装しています。これらのイテレータの評価は遅延的に行われるので、複数のメモリアクセスが発生せず、アルゴリズムの高速化に貢献します。

これらのfancy iteratorを使用した戦略は非常に賢いとは思いますが、同時に欠点もあります。複数のイテレータを大量に組み合わせるために、通常のSTL algorithmのようにbeginendを使用してしまうと、両方に対してfancy iteratorを組み合わせなければならないため、2倍の手間がかかってしまうのです。

これらの解決案としてはboost::rangeのようなRange構造を実装することです。これをパイプ演算子で繋げていくような形をとっていけば、明快で分かりやすく、速度も十分保証されたプログラムを記述できることでしょう。