ドキュメントにはオブジェクトがスレッドセーフであると書かれていますが、それはすべてのメソッドからのすべてのアクセスがスレッドセーフであるという意味でしょうか? では、put()
一度に複数のスレッドからtake()
同じインスタンスでオブジェクトを呼び出しても、何も問題はありませんか?
この答えが私に疑問を抱かせたので、私はこう尋ねます。https://stackoverflow.com/a/22006181/4164238
ベストアンサー1
簡単に答えると、はい、スレッドセーフです。しかし、それで終わりにしてはいけません...
まず、ちょっとした管理ですが、BlockingQueue
はインターフェースであり、スレッドセーフでない実装は文書化された契約に違反することになります。あなたが含めたリンクは を参照していましたがLinkedBlockingQueue
、これにはいくつかの巧妙な点があります。
のあなたが含めたリンク興味深い観察をしています。確かに、 内には 2 つのロックがありますLinkedBlockingQueue
。しかし、これは、「単純な」実装が違反するエッジ ケースが実際に処理されていることを理解していません。そのため、 take メソッドと put メソッドは、最初に予想するよりも複雑です。
LinkedBlockingQueue
読み取りと書き込みの両方で同じロックを使用しないように最適化されているため、競合が減りますが、正しい動作のためにはキューが空でないことが条件となります。キューに要素がある場合、プッシュ ポイントとポップ ポイントはメモリの同じ領域にないため、競合を回避できます。ただし、キューが空の場合、競合は回避できないため、この一般的な「エッジ」ケースを処理するために追加のコードが必要になります。これは、コードの複雑さとパフォーマンス/スケーラビリティの間の一般的なトレードオフです。
LinkedBlockingQueue
すると、キューが空か空でないかをどのように認識し、スレッドを処理するのかという疑問が生じます。その答えは、AtomicInteger
と をCondition
2 つの追加の同時データ構造として使用することです。 は、AtomicInteger
キューの長さがゼロかどうかをチェックするために使用され、 Condition は、キューがおそらく目的の状態にあるときに待機中のスレッドに通知する信号を待機するために使用されます。この追加の調整にはオーバーヘッドがありますが、同時スレッドの数を増やした場合、この手法のオーバーヘッドは、単一のロックを使用することによって生じる競合よりも低いことが測定で示されています。
以下に、 のコードをコピーしLinkedBlockingQueue
、その動作を説明するコメントを追加しました。大まかに言うと、 はtake()
最初に への他のすべての呼び出しをロックアウトしtake()
、次にput()
必要に応じてシグナルを送信します。put()
も同様に動作し、最初に への他のすべての呼び出しをブロックしput()
、次に必要に応じてシグナルを送信しますtake()
。
方法からput()
:
// putLock coordinates the calls to put() only; further coordination
// between put() and take() follows below
putLock.lockInterruptibly();
try {
// block while the queue is full; count is shared between put() and take()
// and is safely visible between cores but prone to change between calls
// a while loop is used because state can change between signals, which is
// why signals get rechecked and resent.. read on to see more of that
while (count.get() == capacity) {
notFull.await();
}
// we know that the queue is not full so add
enqueue(e);
c = count.getAndIncrement();
// if the queue is not full, send a signal to wake up
// any thread that is possibly waiting for the queue to be a little
// emptier -- note that this is logically part of 'take()' but it
// has to be here because take() blocks itself
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
からtake()
takeLock.lockInterruptibly();
try {
// wait for the queue to stop being empty
while (count.get() == 0) {
notEmpty.await();
}
// remove element
x = dequeue();
// decrement shared count
c = count.getAndDecrement();
// send signal that the queue is not empty
// note that this is logically part of put(), but
// for thread coordination reasons is here
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();