Java 8 ストリームと RxJava オブザーバブルの違い 質問する

Java 8 ストリームと RxJava オブザーバブルの違い 質問する

Java 8 ストリームは RxJava オブザーバブルに似ていますか?

Java 8 ストリーム定義:

新しいjava.util.streamパッケージのクラスは、要素のストリームに対する関数型の操作をサポートする Stream API を提供します。

ベストアンサー1

短い答え

すべてのシーケンス/ストリーム処理ライブラリは、パイプライン構築用の非常に類似した API を提供しています。違いは、マルチスレッドとパイプラインの構成を処理するための API にあります。

長い答え

RxJava は Stream とはまったく異なります。JDK のすべてのものの中で、 に最も近いのはrx.Observableおそらく +コンボです (これには、追加のモナド レイヤーを処理するコスト、つまりと間の変換を処理する必要があります)。 java.util.stream.Collector StreamCompletableFutureStream<CompletableFuture<T>>CompletableFuture<Stream<T>>

Observable と Stream には大きな違いがあります。

  • ストリームはプルベースで、Observable はプッシュベースです。これは抽象的すぎるように聞こえるかもしれませんが、非常に具体的な重要な結果をもたらします。
  • Stream は一度しか使用できませんが、Observable は複数回サブスクライブできます。
  • Stream#parallel()シーケンスをパーティションに分割したり、Observable#subscribeOn()分割Observable#observeOn()しなかったりします。Observable で動作をエミュレートするのは難しいです。Stream#parallel()以前は.parallel()メソッドがありましたが、このメソッドは多くの混乱を引き起こしたため、.parallel()サポートは別のリポジトリに移動されました。ReactiveX/RxJavaParallel: RxJava の実験的な並列拡張詳細は別の答え
  • Stream#parallel()RxJavaメソッドのほとんどがオプションのスケジューラを受け入れるのとは異なり、使用するスレッドプールを指定することはできません。全てJVM 内のストリーム インスタンスは同じフォーク/結合プールを使用するため、追加すると.parallel()プログラムの別のモジュールの動作に誤って影響を与える可能性があります。
  • Observable#interval()ストリームには、、その他多くの時間関連の操作が欠けていますObservable#window()。これは主に、ストリームがプルベースであり、上流が制御できないためです。いつ次の要素を下流に放出します。
  • ストリームは、RxJavaと比較して、制限された操作セットを提供します。たとえば、ストリームにはカットオフ操作(takeWhile()takeUntil())がありません。回避策はStream#anyMatch()限られています。これはターミナル操作であるため、ストリームごとに複数回使用することはできません。
  • JDK 8 では、Stream#zip()場合によっては非常に便利な操作はありません。
  • ストリームは自分で構築するのが難しいが、Observableはさまざまな方法で構築できる 編集:コメントで述べたように、Stream を構築する方法はいくつかあります。ただし、非終端短絡がないため、たとえばファイル内の行の Stream を簡単に生成することはできません (ただし、JDK はすぐに使用できる機能を提供しておりFiles#lines()BufferedReader#lines()他の同様のシナリオは Iterator から Stream を構築することで管理できます)。
  • Observable offers resource management facility (Observable#using()); you can wrap IO stream or mutex with it and be sure that the user will not forget to free the resource - it will be disposed automatically on subscription termination; Stream has onClose(Runnable) method, but you have to call it manually or via try-with-resources. E. g. you have to keep in mind that Files#lines() must be enclosed in try-with-resources block.
  • Observables are synchronized all the way through (I didn't actually check whether the same is true for Streams). This spares you from thinking whether basic operations are thread-safe (the answer is always 'yes', unless there's a bug), but the concurrency-related overhead will be there, no matter if your code need it or not.

Round-up

RxJava differs from Streams significantly. Real RxJava alternatives are other implementations of ReactiveStreams, e. g. relevant part of Akka.

Update

There's trick to use non-default fork-join pool for Stream#parallel, see Custom thread pool in Java 8 parallel stream.

Update

All of the above is based on the experience with RxJava 1.x. Now that RxJava 2.x is here, this answer may be out-of-date.

おすすめ記事