Fork/Join Framework についてちょろっと調べてみた

同僚と Fork/Join Framework について話す機会があり、ちょっと気になったので調べてみました。ちなみに、ソースは jdk1.7.0_25 です。

native なのか Java なのか?

native とかは使わないで普通に Java の Thred クラスを利用して実装しているようでした。そのように判断した理由は以下の通りです。

  • ForkJoinWorkerThread 自体は Thread クラスを継承している
  • ForkJoinPool#invoke から RecursiveTask#compute までのソースを見た感じ、普通に Java のコードだった
ForkJoinWorkerThread の生成タイミング

ForkJoinPool#invoke の中で ForkJoinWorkerThread を生成しているみたいでした。

registerWorker(ForkJoinWorkerThread) : int - java.util.concurrent.ForkJoinPool (3 matches)
	ForkJoinWorkerThread(ForkJoinPool) - java.util.concurrent.ForkJoinWorkerThread
		newThread(ForkJoinPool) : ForkJoinWorkerThread - java.util.concurrent.ForkJoinPool.DefaultForkJoinWorkerThreadFactory
			addWorker() : void - java.util.concurrent.ForkJoinPool
				signalWork() : void - java.util.concurrent.ForkJoinPool
					addSubmission(ForkJoinTask) : void - java.util.concurrent.ForkJoinPool
						invoke(ForkJoinTask)  : T - java.util.concurrent.ForkJoinPool

ちなみに、 ForkJoinPool#registerWorker まで書いているのはこの中で ForkJoinPoolクラス の workers フィールドに新しく生成した ForkJoinWorkerThread のインスタンスを代入しているように見えたためです。なお、 workers フィールドの初期化は ForkJoinPool のコンストラクタの中で行なっているようで、引数なしのコンストラクタを利用した場合はプロセッサ数の 2 倍の数を生成するようでした。

    public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        Thread.UncaughtExceptionHandler handler,
                        boolean asyncMode) {

// ... 中略 ...

        // initialize workers array with room for 2*parallelism if possible
        int n = parallelism << 1;
        if (n >= MAX_ID)
            n = MAX_ID;
        else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
            n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
        }
        workers = new ForkJoinWorkerThread[n + 1];

引数なしのコンストラク

    public ForkJoinPool() {
        this(Runtime.getRuntime().availableProcessors(),
             defaultForkJoinWorkerThreadFactory, null, false);
    }
ForkJoinPool#invoke から RecursiveTask#compute の呼び出し階層

以下の様な感じで RecursiveTask が呼ばれてました。 work-stealing の実装部分は読んでいませんw

compute() : V - java.util.concurrent.RecursiveTask
     exec() : boolean - java.util.concurrent.RecursiveTask
          doJoin() : int - java.util.concurrent.ForkJoinTask
               join() : V - java.util.concurrent.ForkJoinTask
                    invoke(ForkJoinTask)  : T - java.util.concurrent.ForkJoinPool

ExecutorService との違いは?

2013-07-25 追記

  • 最初にいきなり native なのか Java なのかと書いてありますが、これは全く予備知識がなかったためです。ただ、たくさんのスレッドを利用するのだろうなという漠然としたイメージしかなく、同僚が native 使ってるかもと言ってたのでそこから調査しました。
  • 分割統治法を全体としており、再帰処理で分割する、かつ、 fork で並行処理するので幅優先探索を利用する処理に向いてそうという理解です。
    • 深さ優先だとシングルスレッドと変わらないはずなので。
    • 手軽に使えるのはいいですが、計算量の事前計算が必要だとは思いました。結局、タスクが大量に生成されるわけで、末端まで行かないとタスクが減り始めないので。
      • そういう場合は、プロセスで利用可能なメモリサイズを確認した上で、処理可能なサイズになるまでは自前でタスク分割と実行の制御を入れればいいんでしょうが。
  • MapReduce は分割統治法には向いてないのかなとは思いました。まぁ、処理時間を気にしなければ計算量を無視出来るオーダーが増えるので、そもそも比較すること自体がナンセンスですがw
    • 結局、 MapReduce って fork & join の深さ 1 ということなので、深さが 2 以上だとジョブ自体のループを実装することになり、面倒くさそうだなぁと。
  • 再帰なのでスレッドのディスパッチコストは低いのかなと思います。結局、末端からタスクが処理されるので、親は fork した子の処理が終わるまでスレッドの割り当てが戻ってこないので。 I/O 待ちでスレッド同士を切り替えるみたいな動きはあまりないという理解です。