Rxで履歴ストリームとライブストリームを進めることができます。質問する

Rxで履歴ストリームとライブストリームを進めることができます。質問する

私は通常、その下で通常のものを使用して実装するホット オブザーバブルを持っているのでSubject、関心のある人は通知のライブ ストリームをサブスクライブできます。

今、私はそのライブ ストリームを維持しながら、発生したすべてのイベントの履歴ストリームも公開し、それらの通知に絶対時刻を添付して、イベントが正確にいつ発生したかを把握できるようにし、また、サブスクライバーが時系列を再生する前に履歴ストリームを任意の時点に進めることができるようにしたいと考えています。

  • 私は、このほとんどが履歴スケジューラそして AdvanceTo メソッドですが、正確な方法はわかりません。
  • そして、タイムスタンプ付き必要なイベントの時間を節約するには?
  • そして、再生件名ライブ ストリームを履歴レコードにキャッシュし、それを HistoricalScheduler を使用して再生できるようにする必要がありますか?

これら 2 つのストリームを同じソースに対してどのように実装すればよいのでしょうか。言い換えれば、以下を現在の要件にどのように適合させることができるのでしょうか。

.net で時間を節約する方法

[ 見る「過去を再現する」見出し]

ベストアンサー1

これによって、HistoricalSchedulerスケジューラの仮想時間の進行を制御する機能が提供されます。

時間の経過に伴うランダム アクセスは実現できません。仮想時間が進むと、スケジュールされたアクションが実行されるため、事前にスケジュールする必要があります。過去にスケジュールされたアクション (つまり、値の後ろの絶対時間) はHistoricalScheduler.Now、すぐに実行されます。

イベントを再生するには、何らかの方法でイベントを記録し、 - のインスタンスを使用してスケジュールを設定しHistoricalScheduler、時間を進める必要があります。

時間を進めると、スケジュールされたアクションが予定された時間に実行され、オブザーバブルがOnXXX()サブスクライバーに送信すると、Nowスケジューラのプロパティに現在の仮想時間が設定されます。

各サブスクライバーは、他のサブスクライバーから独立して時間を制御するために、独自のスケジューラにアクセスする必要があります。これは、実質的にサブスクライバーごとにオブザーバブルを作成することを意味します。

以下は私が作成した簡単な例です (nuget パッケージ rx-main を参照すると LINQPad で実行されます)。

まず、ライブ ストリームを録画し (完全に非本番環境で)、イベントをリストに記録します。提案どおり、TimeStamp()タイミングをキャプチャするには を使用するとうまく機能します。

/* record a live stream */
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var log = source.Take(5).Timestamp().ToList().Wait();


Console.WriteLine("Time now is " + DateTime.Now);

これで、HistoricalScheduler と Generate の巧みな使用法を組み合わせてイベントをスケジュールできるようになりました。このアプローチでは、大量のスケジュールされたイベントが事前にキューに入れられるのを防ぎ、代わりに 1 つずつスケジュールするだけであることに注意してください。

var scheduler = new HistoricalScheduler();

/* set up the scheduling of the recording events */
var replay = Observable.Generate(
    log.GetEnumerator(),
    events => events.MoveNext(),
    events => events,
    events => events.Current.Value,
    events => events.Current.Timestamp,
    scheduler);

HistoricalSchedulerここでサブスクライブすると、のNowプロパティにイベントの仮想時間が含まれていることがわかります。

replay.Subscribe(
    i => Console.WriteLine("Event: {0} happened at {1}", i,
    scheduler.Now)); 

AdvanceTo最後にスケジュールを開始します(Start()を使用すると、特定の時間に移動するために使用するのとは対照的に、すべてのイベントを再生しようとするだけです。AdvanceTo(DateTime.MaxValue);

scheduler.Start();

私の場合の出力は次の通りです:

Time now is 07/01/2014 15:17:27
Event: 0 happened at 07/01/2014 15:17:23 +00:00
Event: 1 happened at 07/01/2014 15:17:24 +00:00
Event: 2 happened at 07/01/2014 15:17:25 +00:00
Event: 3 happened at 07/01/2014 15:17:26 +00:00
Event: 4 happened at 07/01/2014 15:17:27 +00:00

結局、特定の目的に合うものを得るためには、このツールを使って独自の API を作成する必要があるでしょう。かなりの作業が必要になりますが、それでもかなり強力なツールです。

素晴らしいのは、ライブ オブザーバブルと再生されたオブザーバブルは実際には互いに違いがないように見えることです (常にスケジューラをパラメーター化することを忘れなければ)。そのため、同じクエリを簡単に実行でき、すべての時間クエリがスケジューラの仮想時間で動作します。

私はこれを使用して、古いデータに対する新しいクエリをテストし、商用シナリオで大きな効果を上げました。

輸送制御を目的としていないGUIで時間を前後にスクロールする機能など。通常は、履歴を大きなチャンクで実行し、新しいクエリの出力を保存して、このデータを使用します。その後GUI に表示して、ユーザーが他のメカニズムを使用して自由に前後に移動できるようにします。

最後に、ライブ ストリームをキャッシュする必要はありませんReplaySubjectが、再生用にイベントを記録する手段は必要です。これは、ログに書き込むオブザーバーで十分です。

おすすめ記事