SQL 接続プールのように、限られたリソースに対して共有オブジェクト プール戦略を実装するための優れたリソースを持っている人はいますか? (つまり、スレッド セーフになるように完全に実装されます)。
@Aaronaught の明確化のリクエストに関してフォローアップすると、プールの使用は外部サービスへの負荷分散リクエスト用です。私の直接の状況とは対照的に、おそらくすぐに理解しやすいシナリオで説明します。NHibernateISession
のオブジェクトと同様に機能するセッション オブジェクトがあります。各固有のセッションがデータベースへの接続を管理します。現在、長時間実行されるセッション オブジェクトが 1 つあり、サービス プロバイダーがこの個々のセッションの使用率を制限しているという問題が発生しています。
単一のセッションが長時間実行されるサービス アカウントとして扱われるという想定がないため、どうやらそのサービス アカウントをサービスに負荷をかけているクライアントとして扱っているようです。ここでの私の質問は、1 つの個別のセッションを持つ代わりに、異なるセッションのプールを作成し、以前のように単一の焦点を作成するのではなく、サービスへのリクエストをそれらの複数のセッションに分割することです。
この背景が何らかの価値をもたらすことを願っていますが、いくつかの質問に直接答えると次のようになります。
質問:オブジェクトの作成にはコストがかかりますか?
答え:オブジェクトは限られたリソースのプールではありません
質問:それらは頻繁に取得/リリースされますか?
答え:はい、もう一度言いますが、これらは NHibernate ISession と考えることができます。ここでは、各ページ リクエストの期間中、通常 1 つが取得され、解放されます。
質問:単純な先着順で十分でしょうか、それとも飢餓を防ぐようなもっとインテリジェントな仕組みが必要ですか?
答え:単純なラウンドロビン型の分散で十分でしょう。飢餓とは、利用可能なセッションがない場合に、呼び出し元が解放を待ってブロックされることを意味すると思います。セッションは異なる呼び出し元によって共有される可能性があるため、これは実際には当てはまりません。私の目標は、1 つのセッションではなく、複数のセッションに使用を分散することです。
これはおそらく、オブジェクト プールの通常の使用法からの逸脱であると私は考えています。そのため、当初この部分を省略し、オブジェクトが不足する状況が発生するのを防ぐため、オブジェクトの共有を可能にするパターンを適応させることを計画しました。
質問:優先順位、遅延読み込みと即時読み込みなどについてはどうでしょうか?
答え:優先順位付けは行われません。簡単にするために、プール自体の作成時に利用可能なオブジェクトのプールも作成すると想定します。
ベストアンサー1
この質問は、プールされるリソースの動作、オブジェクトの予想される/必要な寿命、プールが必要な本当の理由など、いくつかの不明点があるため、予想よりも少し複雑です。通常、プールはスレッドプール、接続プールなどの特別な目的に使用されます。これは、リソースが何をするかを正確に知っていて、さらに重要なことに、コントロールそのリソースがどのように実装されるかについて。
それほど単純ではないので、私が試みたのは、実験してみて何が最も効果的かを確認できる、かなり柔軟なアプローチを提供することです。長い投稿をあらかじめお詫びしますが、適切な汎用リソース プールを実装するには、カバーすべき領域が多数あります。そして、私は本当に表面をなぞっただけです。
汎用プールには、次のようないくつかの主要な「設定」が必要です。
- リソース読み込み戦略 - 積極的または遅延的。
- リソースの読み込み機構- 実際に構築する方法
- アクセス戦略 - 「ラウンドロビン」について言及されていますが、これはそれほど単純ではありません。この実装では、循環バッファを使用できます。似ているただし、プールはリソースが実際にいつ再利用されるかを制御できないため、完璧ではありません。他のオプションは FIFO と LIFO です。FIFO はランダム アクセス パターンになりますが、LIFO を使用すると、最近最も使用されていない解放戦略を実装するのが大幅に簡単になります (範囲外であるとおっしゃっていましたが、言及する価値はあります)。
リソース読み込みメカニズムについては、.NET ではすでにデリゲートという明確な抽象化が提供されています。
private Func<Pool<T>, T> factory;
これをプールのコンストラクターに渡せば、ほぼ完了です。new()
制約付きのジェネリック型を使用することもできますが、こちらの方が柔軟性があります。
他の 2 つのパラメータのうち、アクセス戦略はより複雑なので、継承 (インターフェース) ベースのアプローチを使用することにしました。
public class Pool<T> : IDisposable
{
// Other code - we'll come back to this
interface IItemStore
{
T Fetch();
void Store(T item);
int Count { get; }
}
}
ここでのコンセプトは単純です。Pool
スレッド セーフティなどの一般的な問題はパブリック クラスで処理しますが、アクセス パターンごとに異なる「アイテム ストア」を使用します。LIFO はスタックで簡単に表され、FIFO はキューです。また、ラウンドロビン アクセス パターンをList<T>
近似するために、インデックス ポインターを使用した、あまり最適化されていないがおそらく適切な循環バッファー実装を使用しました。
以下のクラスはすべて の内部クラスですPool<T>
。これはスタイル上の選択ですが、これらは の外部で使用することを意図したものではないためPool
、これが最も理にかなっています。
class QueueStore : Queue<T>, IItemStore
{
public QueueStore(int capacity) : base(capacity)
{
}
public T Fetch()
{
return Dequeue();
}
public void Store(T item)
{
Enqueue(item);
}
}
class StackStore : Stack<T>, IItemStore
{
public StackStore(int capacity) : base(capacity)
{
}
public T Fetch()
{
return Pop();
}
public void Store(T item)
{
Push(item);
}
}
これらは明らかなものです - スタックとキュー。これらについては、あまり説明する必要はないと思います。循環バッファはもう少し複雑です:
class CircularStore : IItemStore
{
private List<Slot> slots;
private int freeSlotCount;
private int position = -1;
public CircularStore(int capacity)
{
slots = new List<Slot>(capacity);
}
public T Fetch()
{
if (Count == 0)
throw new InvalidOperationException("The buffer is empty.");
int startPosition = position;
do
{
Advance();
Slot slot = slots[position];
if (!slot.IsInUse)
{
slot.IsInUse = true;
--freeSlotCount;
return slot.Item;
}
} while (startPosition != position);
throw new InvalidOperationException("No free slots.");
}
public void Store(T item)
{
Slot slot = slots.Find(s => object.Equals(s.Item, item));
if (slot == null)
{
slot = new Slot(item);
slots.Add(slot);
}
slot.IsInUse = false;
++freeSlotCount;
}
public int Count
{
get { return freeSlotCount; }
}
private void Advance()
{
position = (position + 1) % slots.Count;
}
class Slot
{
public Slot(T item)
{
this.Item = item;
}
public T Item { get; private set; }
public bool IsInUse { get; set; }
}
}
さまざまなアプローチを採ることもできますが、結局のところ、リソースは作成された順序でアクセスされる必要があるため、リソースへの参照は保持しつつ、それらを「使用中」(または使用していない) としてマークする必要があります。最悪のシナリオでは、使用可能なスロットは 1 つだけになり、フェッチごとにバッファの完全な反復が必要になります。これは、数百のリソースをプールし、1 秒間に数回取得および解放する場合は問題になりますが、5 ~ 10 個のアイテムのプールの場合はそれほど問題にはなりません。典型的なリソースの使用が少ない場合は、1 つまたは 2 つのスロットを進めるだけで済みます。
これらのクラスはプライベートな内部クラスであることに注意してください。そのため、多くのエラー チェックは必要ありません。プール自体がこれらのクラスへのアクセスを制限します。
列挙とファクトリ メソッドを追加すれば、この部分は完了です。
// Outside the pool
public enum AccessMode { FIFO, LIFO, Circular };
private IItemStore itemStore;
// Inside the Pool
private IItemStore CreateItemStore(AccessMode mode, int capacity)
{
switch (mode)
{
case AccessMode.FIFO:
return new QueueStore(capacity);
case AccessMode.LIFO:
return new StackStore(capacity);
default:
Debug.Assert(mode == AccessMode.Circular,
"Invalid AccessMode in CreateItemStore");
return new CircularStore(capacity);
}
}
次に解決すべき問題は、ロード戦略です。私は 3 つのタイプを定義しました。
public enum LoadingMode { Eager, Lazy, LazyExpanding };
最初の 2 つは説明不要でしょう。3 つ目はハイブリッドのようなもので、リソースを遅延ロードしますが、プールがいっぱいになるまでリソースの再利用は開始されません。プールをいっぱいにしたい (そうしたいようですね) が、実際にリソースを作成するコストを最初のアクセスまで延期したい (つまり、起動時間を短縮したい) 場合は、これは良いトレードオフになります。
アイテムストアの抽象化ができたので、読み込みメソッドはそれほど複雑ではありません。
private int size;
private int count;
private T AcquireEager()
{
lock (itemStore)
{
return itemStore.Fetch();
}
}
private T AcquireLazy()
{
lock (itemStore)
{
if (itemStore.Count > 0)
{
return itemStore.Fetch();
}
}
Interlocked.Increment(ref count);
return factory(this);
}
private T AcquireLazyExpanding()
{
bool shouldExpand = false;
if (count < size)
{
int newCount = Interlocked.Increment(ref count);
if (newCount <= size)
{
shouldExpand = true;
}
else
{
// Another thread took the last spot - use the store instead
Interlocked.Decrement(ref count);
}
}
if (shouldExpand)
{
return factory(this);
}
else
{
lock (itemStore)
{
return itemStore.Fetch();
}
}
}
private void PreloadItems()
{
for (int i = 0; i < size; i++)
{
T item = factory(this);
itemStore.Store(item);
}
count = size;
}
上記のフィールドsize
とcount
フィールドは、プールの最大サイズとプールが所有するリソースの合計数を示します(ただし、必ずしも利用可能)です。最もAcquireEager
単純な方法では、アイテムがすでにストア内にあることを前提としています。これらのアイテムは、構築時、つまりPreloadItems
最後に示した方法でプリロードされます。
AcquireLazy
プールに空きアイテムがあるかどうかを確認し、ない場合は新しいものを作成します。AcquireLazyExpanding
プールがまだ目標サイズに達していない限り、新しいリソースを作成します。ロックを最小限に抑えるためにこれを最適化しようとしましたが、間違いがなかったことを願っています(私は持っているこれをマルチスレッド条件下でテストしましたが、明らかに網羅的ではありません。
これらのメソッドのいずれも、ストアが最大サイズに達したかどうかをチェックしないのはなぜかと疑問に思うかもしれません。これについては後ほど説明します。
さて、プール自体についてです。ここにプライベート データの完全なセットがあります。その一部はすでに表示されています。
private bool isDisposed;
private Func<Pool<T>, T> factory;
private LoadingMode loadingMode;
private IItemStore itemStore;
private int size;
private int count;
private Semaphore sync;
前の段落で触れた質問、つまり、作成されるリソースの総数を制限するにはどうすればよいかという質問に対する答えは、.NETにはすでにそのための完璧なツールがあるということです。セマフォこれは、固定数のスレッドがリソースにアクセスできるように特別に設計されています (この場合、「リソース」は内部アイテム ストアです)。完全なプロデューサー/コンシューマー キューを実装しているわけではないので、これは私たちのニーズに完全に適しています。
コンストラクターは次のようになります。
public Pool(int size, Func<Pool<T>, T> factory,
LoadingMode loadingMode, AccessMode accessMode)
{
if (size <= 0)
throw new ArgumentOutOfRangeException("size", size,
"Argument 'size' must be greater than zero.");
if (factory == null)
throw new ArgumentNullException("factory");
this.size = size;
this.factory = factory;
sync = new Semaphore(size, size);
this.loadingMode = loadingMode;
this.itemStore = CreateItemStore(accessMode, size);
if (loadingMode == LoadingMode.Eager)
{
PreloadItems();
}
}
PreloadItems
ここでは驚くようなことはないはずです。唯一注目すべき点は、すでに前に説明した方法を使用した、早期読み込みの特別なケースです。
これまでにほぼすべてがきれいに抽象化されているため、実際のメソッドAcquire
とRelease
メソッドは非常に簡単です。
public T Acquire()
{
sync.WaitOne();
switch (loadingMode)
{
case LoadingMode.Eager:
return AcquireEager();
case LoadingMode.Lazy:
return AcquireLazy();
default:
Debug.Assert(loadingMode == LoadingMode.LazyExpanding,
"Unknown LoadingMode encountered in Acquire method.");
return AcquireLazyExpanding();
}
}
public void Release(T item)
{
lock (itemStore)
{
itemStore.Store(item);
}
sync.Release();
}
前に説明したように、Semaphore
アイテム ストアのステータスを厳密にチェックするのではなく、を使用して同時実行を制御します。取得したアイテムが正しくリリースされている限り、心配する必要はありません。
最後に、クリーンアップがあります。
public void Dispose()
{
if (isDisposed)
{
return;
}
isDisposed = true;
if (typeof(IDisposable).IsAssignableFrom(typeof(T)))
{
lock (itemStore)
{
while (itemStore.Count > 0)
{
IDisposable disposable = (IDisposable)itemStore.Fetch();
disposable.Dispose();
}
}
}
sync.Close();
}
public bool IsDisposed
{
get { return isDisposed; }
}
このプロパティの目的IsDisposed
はすぐに明らかになります。 mainDispose
メソッドが実際に行うことは、 を実装している場合に、実際にプールされたアイテムを破棄することだけですIDisposable
。
基本的にはブロックを使ってそのまま使うことができますtry-finally
が、私はこの構文が好きではありません。なぜなら、プールされたリソースをクラスやメソッド間で受け渡し始めると、非常に混乱するからです。リソースを使用するメインクラスが、持っているプールへの参照。これは非常に面倒なので、より良い方法は「スマートな」プールされたオブジェクトを作成することです。
次のような単純なインターフェース/クラスから始めるとします。
public interface IFoo : IDisposable
{
void Test();
}
public class Foo : IFoo
{
private static int count = 0;
private int num;
public Foo()
{
num = Interlocked.Increment(ref count);
}
public void Dispose()
{
Console.WriteLine("Goodbye from Foo #{0}", num);
}
public void Test()
{
Console.WriteLine("Hello from Foo #{0}", num);
}
}
これは、一意の ID を生成するための定型コードFoo
を実装しIFoo
、備えた仮想の使い捨てリソースです。ここでは、別の特別なプールされたオブジェクトを作成します。
public class PooledFoo : IFoo
{
private Foo internalFoo;
private Pool<IFoo> pool;
public PooledFoo(Pool<IFoo> pool)
{
if (pool == null)
throw new ArgumentNullException("pool");
this.pool = pool;
this.internalFoo = new Foo();
}
public void Dispose()
{
if (pool.IsDisposed)
{
internalFoo.Dispose();
}
else
{
pool.Release(this);
}
}
public void Test()
{
internalFoo.Test();
}
}
これは、すべての「実際の」メソッドをその内部にプロキシするだけですIFoo
(Castle などの動的プロキシ ライブラリを使用してこれを行うこともできますが、ここでは説明しません)。また、それPool
を作成した への参照も維持されるため、Dispose
このオブジェクトを使用すると、自動的にプールに解放されます。を除外するプールがすでに破棄されている場合 - これは「クリーンアップ」モードにあることを意味し、この場合は実際には内部リソースをクリーンアップするその代わり。
上記のアプローチを使用すると、次のようなコードを記述できます。
// Create the pool early
Pool<IFoo> pool = new Pool<IFoo>(PoolSize, p => new PooledFoo(p),
LoadingMode.Lazy, AccessMode.Circular);
// Sometime later on...
using (IFoo foo = pool.Acquire())
{
foo.Test();
}
これはとてもできることは良いことです。つまり、用途(IFoo
それを作成するコードとは対照的に)実際にはプールを意識する必要はありません。注入する IFoo
お気に入りの DI ライブラリとPool<T>
プロバイダー/ファクトリを使用するオブジェクト。
私はPasteBinの完全なコードコピー&ペーストを楽しむための短いテストプログラムさまざまな読み込み/アクセス モードやマルチスレッド条件を試して、スレッドセーフでありバグがないことを確認するために使用できます。
これに関してご質問やご不明な点がございましたらお知らせください。