Java 8 並列ストリームのカスタムスレッドプール 質問する

Java 8 並列ストリームのカスタムスレッドプール 質問する

Java 8のカスタムスレッドプールを指定することは可能ですか並列ストリーム? どこにも見つかりません。

サーバー アプリケーションがあり、並列ストリームを使用したいとします。ただし、アプリケーションは大きく、マルチスレッドなので、区分化したいと考えています。アプリケーションの 1 つのモジュールで実行速度の遅いタスクが、別のモジュールのタスクをブロックすることは望ましくありません。

異なるモジュールに異なるスレッド プールを使用できない場合、実際のほとんどの状況で並列ストリームを安全に使用できないことを意味します。

次の例を試してください。CPU を集中的に使用するタスクが別個のスレッドで実行されています。タスクは並列ストリームを活用します。最初のタスクは壊れているため、各ステップに 1 秒かかります (スレッド スリープによってシミュレートされます)。問題は、他のスレッドがスタックし、壊れたタスクが完了するまで待機することです。これは不自然な例ですが、サーブレット アプリと、誰かが共有フォーク結合プールに長時間実行されるタスクを送信することを想像してください。

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

ベストアンサー1

実際には、特定のフォーク ジョイン プールで並列操作を実行する方法があります。フォーク ジョイン プールでタスクとして実行すると、そのプールに留まり、共通のプールは使用されません。

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

このトリックはForkJoinTask.forkこれは次のように指定します: 「該当する場合は、現在のタスクが実行されているプールでこのタスクを非同期的に実行するように手配します。そうでForkJoinPool.commonPool()ない場合は、inForkJoinPool()

おすすめ記事