ブロッキングキューを「閉じる」 質問する

ブロッキングキューを「閉じる」 質問する

私は使用していますjava.util.concurrent.ブロッキングキュー非常に単純な生産者-消費者のシナリオ。例えば、この疑似コードは消費者の部分を表しています。

class QueueConsumer implements Runnable {

    @Override
    public void run() {
        while(true)
        {
            try {
                ComplexObject complexObject = myBlockingQueue.take();
                //do something with the complex object
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

ここまでは順調です。ブロッキング キューの Javadoc には次のように書かれています。

BlockingQueue は、これ以上アイテムが追加されないことを示す「閉じる」または「シャットダウン」操作を本質的にサポートしていません。このような機能の必要性と使用法は、実装に依存する傾向があります。たとえば、一般的な戦術は、プロデューサーが特別なストリーム終了オブジェクトまたはポイズン オブジェクトを挿入し、コンシューマーがそれを受け取ったときにそれに応じて解釈することです。

残念ながら、使用されているジェネリックと ComplexObject の性質により、「有害なオブジェクト」をキューにプッシュするのは簡単ではありません。したがって、この「一般的な戦術」は、私のシナリオでは実際には便利ではありません。

私の質問は、キューを「閉じる」ために使用できる他の優れた戦術/パターンは何ですか?

ありがとう!

ベストアンサー1

コンシューマ スレッドへのハンドルがあれば、それを中断できます。提供されたコードを使用すると、コンシューマが強制終了されます。プロデューサーがこれを実行するとは考えていません。おそらく、何らかの方法でプログラム コントローラーにコールバックして、完了したことを知らせる必要があるでしょう。その後、コントローラーがコンシューマ スレッドを中断します。

割り込みに従う前に、いつでも作業を完了することができます。例:

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().isInterrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                this.process(complexObject);

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }

        // Thread is getting ready to die, but first,
        // drain remaining elements on the queue and process them.
        final LinkedList<ComplexObject> remainingObjects;
        myBlockingQueue.drainTo(remainingObjects);
        for(ComplexObject complexObject : remainingObjects) {
            this.process(complexObject);
        }
    }

    private void process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

いずれにしても、キューを汚染するよりも、この方法のほうが好ましいです。スレッドを強制終了したい場合は、スレッドに自身を強制終了するように依頼してください。

(きちんと対応している人を見ると嬉しいですねInterruptedException。)


ここでは中断の扱いについて議論があるようです。まず、皆さんに読んでいただきたいのはこの記事

さて、実際にこれを読んでいる人はいないだろうということを理解した上で、ここで問題を説明します。スレッドは、InterruptedException割り込み時にブロックしていた場合にのみ を受け取ります。この場合、Thread.interrupted()は を返しますfalse。ブロックしていなかった場合は、この例外を受信せず、代わりにThread.interrupted()を返しますtrue。したがって、ループ ガードは、何があろうと を必ずチェックする必要がありますThread.interrupted()。そうしないと、スレッドへの割り込みを見逃すリスクがあります。

したがって、Thread.interrupted()何があってもチェックし、キャッチすることを強制されるためInterruptedException(強制されていない場合でも対処する必要があります)、スレッドの中断という同じイベントを処理する 2 つのコード領域が存在することになります。これを処理する 1 つの方法は、それらを 1 つの条件に正規化することです。つまり、ブール状態チェックで例外をスローするか、例外でブール状態を設定できます。私は後者を選択します。


編集:静的Thread#interruptedメソッドクリア現在のスレッドの中断ステータス。

おすすめ記事