Java 8 ストリームは RxJava オブザーバブルに似ていますか?
Java 8 ストリーム定義:
新しい
java.util.stream
パッケージのクラスは、要素のストリームに対する関数型の操作をサポートする Stream API を提供します。
ベストアンサー1
短い答え
すべてのシーケンス/ストリーム処理ライブラリは、パイプライン構築用の非常に類似した API を提供しています。違いは、マルチスレッドとパイプラインの構成を処理するための API にあります。
長い答え
RxJava は Stream とはまったく異なります。JDK のすべてのものの中で、 に最も近いのはrx.Observable
おそらく +コンボです (これには、追加のモナド レイヤーを処理するコスト、つまりと間の変換を処理する必要があります)。
java.util.stream.Collector
Stream
CompletableFuture
Stream<CompletableFuture<T>>
CompletableFuture<Stream<T>>
Observable と Stream には大きな違いがあります。
- ストリームはプルベースで、Observable はプッシュベースです。これは抽象的すぎるように聞こえるかもしれませんが、非常に具体的な重要な結果をもたらします。
- Stream は一度しか使用できませんが、Observable は複数回サブスクライブできます。
Stream#parallel()
シーケンスをパーティションに分割したり、Observable#subscribeOn()
分割Observable#observeOn()
しなかったりします。Observable で動作をエミュレートするのは難しいです。Stream#parallel()
以前は.parallel()
メソッドがありましたが、このメソッドは多くの混乱を引き起こしたため、.parallel()
サポートは別のリポジトリに移動されました。ReactiveX/RxJavaParallel: RxJava の実験的な並列拡張詳細は別の答え。Stream#parallel()
RxJavaメソッドのほとんどがオプションのスケジューラを受け入れるのとは異なり、使用するスレッドプールを指定することはできません。全てJVM 内のストリーム インスタンスは同じフォーク/結合プールを使用するため、追加すると.parallel()
プログラムの別のモジュールの動作に誤って影響を与える可能性があります。Observable#interval()
ストリームには、、その他多くの時間関連の操作が欠けていますObservable#window()
。これは主に、ストリームがプルベースであり、上流が制御できないためです。いつ次の要素を下流に放出します。- ストリームは、RxJavaと比較して、制限された操作セットを提供します。たとえば、ストリームにはカットオフ操作(
takeWhile()
、takeUntil()
)がありません。回避策はStream#anyMatch()
限られています。これはターミナル操作であるため、ストリームごとに複数回使用することはできません。 - JDK 8 では、
Stream#zip()
場合によっては非常に便利な操作はありません。 -
ストリームは自分で構築するのが難しいが、Observableはさまざまな方法で構築できる編集:コメントで述べたように、Stream を構築する方法はいくつかあります。ただし、非終端短絡がないため、たとえばファイル内の行の Stream を簡単に生成することはできません (ただし、JDK はすぐに使用できる機能を提供しておりFiles#lines()
、BufferedReader#lines()
他の同様のシナリオは Iterator から Stream を構築することで管理できます)。 - Observable offers resource management facility (
Observable#using()
); you can wrap IO stream or mutex with it and be sure that the user will not forget to free the resource - it will be disposed automatically on subscription termination; Stream hasonClose(Runnable)
method, but you have to call it manually or via try-with-resources. E. g. you have to keep in mind thatFiles#lines()
must be enclosed in try-with-resources block. - Observables are synchronized all the way through (I didn't actually check whether the same is true for Streams). This spares you from thinking whether basic operations are thread-safe (the answer is always 'yes', unless there's a bug), but the concurrency-related overhead will be there, no matter if your code need it or not.
Round-up
RxJava differs from Streams significantly. Real RxJava alternatives are other implementations of ReactiveStreams, e. g. relevant part of Akka.
Update
There's trick to use non-default fork-join pool for Stream#parallel
, see Custom thread pool in Java 8 parallel stream.
Update
All of the above is based on the experience with RxJava 1.x. Now that RxJava 2.x is here, this answer may be out-of-date.