Java 8のカスタムスレッドプールを指定することは可能ですか並列ストリーム? どこにも見つかりません。
サーバー アプリケーションがあり、並列ストリームを使用したいとします。ただし、アプリケーションは大きく、マルチスレッドなので、区分化したいと考えています。アプリケーションの 1 つのモジュールで実行速度の遅いタスクが、別のモジュールのタスクをブロックすることは望ましくありません。
異なるモジュールに異なるスレッド プールを使用できない場合、実際のほとんどの状況で並列ストリームを安全に使用できないことを意味します。
次の例を試してください。CPU を集中的に使用するタスクが別個のスレッドで実行されています。タスクは並列ストリームを活用します。最初のタスクは壊れているため、各ステップに 1 秒かかります (スレッド スリープによってシミュレートされます)。問題は、他のスレッドがスタックし、壊れたタスクが完了するまで待機することです。これは不自然な例ですが、サーブレット アプリと、誰かが共有フォーク結合プールに長時間実行されるタスクを送信することを想像してください。
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
ベストアンサー1
実際には、特定のフォーク ジョイン プールで並列操作を実行する方法があります。フォーク ジョイン プールでタスクとして実行すると、そのプールに留まり、共通のプールは使用されません。
final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
このトリックはForkJoinTask.fork
これは次のように指定します: 「該当する場合は、現在のタスクが実行されているプールでこのタスクを非同期的に実行するように手配します。そうでForkJoinPool.commonPool()
ない場合は、inForkJoinPool()
」