TPL データフローの BroadcastBlock で例外が重複する 質問する

TPL データフローの BroadcastBlock で例外が重複する 質問する

TPL Dataflow を使用してパイプラインを作成しようとしています。これまでのところすべて正常に動作しており、パイプラインは次のように定義されています (ただし、問題は、broadcaster、submissionSucceeded、submissionFailed だけです)。

// Define tasks
var productListingBatchBuffer = new BufferBlock<PostSubmissionState>();
var splitFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SplitFile(s));
var saveFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SaveFile(s));
var postSubmission = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => PostSubmission(s));
var broadcaster = new BroadcastBlock<PostSubmissionState>(state => state);
var submissionSucceeded = new ActionBlock<PostSubmissionState>(s => SubmissionSucceeded(s));
var submissionFailed = new ActionBlock<PostSubmissionState>(s => SubmissionFailed(s));

// Link em up
productListingBatchBuffer.LinkTo(splitFile, new DataflowLinkOptions() { PropagateCompletion = true });
splitFile.LinkTo(saveFile, new DataflowLinkOptions() { PropagateCompletion = true });
saveFile.LinkTo(postSubmission, new DataflowLinkOptions() { PropagateCompletion = true });
postSubmission.LinkTo(broadcaster, new DataflowLinkOptions() { PropagateCompletion = true });
broadcaster.LinkTo(submissionSucceeded, new DataflowLinkOptions() { PropagateCompletion = true }, state => state.PostSucceeded);
broadcaster.LinkTo(submissionFailed, new DataflowLinkOptions() { PropagateCompletion = true }, state => !state.PostSucceeded);

私が抱えている問題は、例外の伝播です。BroadcastBlockは完了(つまり、Fault)を2つのブロックに伝播するため、例外が発生した場合、両方のブロックに伝播します。そのため、

Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);

最終的に、2 つの例外を含む集約例外が発生します。現時点でできる最善の方法は、これらをフィルターすることです。

try
{
    Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);
}
catch (AggregateException ex)
{
    var uniqueExceptions = new AggregateException(ex.Flatten().InnerExceptions.Distinct());
    Console.WriteLine("An exception was thrown.\n{0}", uniqueExceptions.Flatten());
}

しかし、これを行うより良い方法があるかどうか疑問に思っています。つまり、例外が 1 つだけ発生した場合は、例外を 1 つだけ発生させたいのです。私は Dataflow を初めて使用するので、すべての規則を調べているところです。

ベストアンサー1

TPL DataFlowの例を書きました(https://github.com/squideyes/PodFetch) は、完了とエラー処理に対して少し異なるアプローチを採用しています。以下は、Program.cs の 171 行目から 201 行目の関連コードです。

    scraper.LinkTo(fetcher, link => link != null);
    scraper.LinkTo(DataflowBlock.NullTarget<Link>());

    scraper.HandleCompletion(fetcher);

    Status.Info.Log("Fetching APOD's archive list");

    links.ForEach(link => scraper.Post(link));

    scraper.Complete();

    try
    {
        await fetcher.Completion;

        Status.Finished.Log("Fetched: {0:N0}, Skipped: {1:N0}, Errors: {2:N0}, Seconds: {3:N2}",
            fetched, skipped, errored, (DateTime.UtcNow - startedOn).TotalMilliseconds / 1000.0);
    }
    catch (AggregateException errors)
    {
        foreach (var error in errors.InnerExceptions)
            Status.Failure.Log(error.Message);
    }
    catch (TaskCanceledException)
    {
        Status.Cancelled.Log("The process was manually cancelled!");
    }
    catch (Exception error)
    {
        Status.Failure.Log(error.Message);
    }

ご覧のとおり、いくつかの TPL ブロックをリンクし、HandleCompletion 拡張メソッドを使用して完了を処理する準備をします。

    public static void HandleCompletion(
        this IDataflowBlock source, params IDataflowBlock[] targets)
    {
        source.Completion.ContinueWith(
            task =>
            {
                foreach (var target in targets)
                {
                    if (task.IsFaulted)
                        target.Fault(task.Exception);
                    else
                        target.Complete();
                }
            });
    }

非常に重要なのは、チェーンの最初のブロックにオブジェクトを渡したら scraper.Complete() を呼び出すことです。これにより、HandleCompletion 拡張メソッドが継続を処理します。また、fetcher (チェーンの最後のブロックが完了する) を待機しているため、try/catch 内で結果のエラーを簡単にキャッチできます。

おすすめ記事