私は、LSTM モデル (Keras と TF で構築) を、それぞれ 3 つの特徴を持つ 7 つのサンプルの複数のバッチでトレーニングしました。サンプルの形状は以下のとおりです (以下の数字は説明のためのプレースホルダーです)。各バッチには 0 または 1 のラベルが付けられています。
データ:
[
[[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3]]
[[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3]]
[[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3]]
...
]
すなわち、それぞれの長さが7のm個のシーケンスのバッチで、その要素は3次元ベクトルである(したがって、バッチの形状は(m73))
目標:
[
[1]
[0]
[1]
...
]
私の実稼働環境では、データは 3 つの特徴を持つサンプルのストリームです ( [1,2,3],[1,2,3]...
)。各サンプルがモデルに到着するとすぐにストリーミングし、バッチ全体を待たずに中間確率を取得したいと思います (7) - 下のアニメーションを参照してください。
私の考えの 1 つは、欠落しているサンプルのバッチに 0 を追加することでした[[0,0,0],[0,0,0],[0,0,0],[0,0,0],[0,0,0],[0,0,0],[1,2,3]]
が、これは非効率的であるようです。
次のサンプルを待機しながら LSTM 中間状態を永続的に保存し、部分的なデータを使用して特定のバッチ サイズでトレーニングされたモデルで予測するという正しい方向を指し示してくれる助けがあれば幸いです。
アップデート、モデルコードを含む:
opt = optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=10e-8, decay=0.001)
model = Sequential()
num_features = data.shape[2]
num_samples = data.shape[1]
first_lstm = LSTM(32, batch_input_shape=(None, num_samples, num_features),
return_sequences=True, activation='tanh')
model.add(first_lstm)
model.add(LeakyReLU())
model.add(Dropout(0.2))
model.add(LSTM(16, return_sequences=True, activation='tanh'))
model.add(Dropout(0.2))
model.add(LeakyReLU())
model.add(Flatten())
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer=opt,
metrics=['accuracy', keras_metrics.precision(),
keras_metrics.recall(), f1])
モデルの概要:
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
lstm_1 (LSTM) (None, 100, 32) 6272
_________________________________________________________________
leaky_re_lu_1 (LeakyReLU) (None, 100, 32) 0
_________________________________________________________________
dropout_1 (Dropout) (None, 100, 32) 0
_________________________________________________________________
lstm_2 (LSTM) (None, 100, 16) 3136
_________________________________________________________________
dropout_2 (Dropout) (None, 100, 16) 0
_________________________________________________________________
leaky_re_lu_2 (LeakyReLU) (None, 100, 16) 0
_________________________________________________________________
flatten_1 (Flatten) (None, 1600) 0
_________________________________________________________________
dense_1 (Dense) (None, 1) 1601
=================================================================
Total params: 11,009
Trainable params: 11,009
Non-trainable params: 0
_________________________________________________________________
ベストアンサー1
もっと簡単な解決策があるかもしれないと思います。
モデルに畳み込み層や長さ/ステップの次元に作用するその他の層がない場合は、単に次のようにマークすることができます。stateful=True
警告: モデルには長さの次元に作用するレイヤーがあります。
レイヤーFlatten
は長さのディメンションをフィーチャのディメンションに変換します。これにより、目標の達成が完全に妨げられます。レイヤーがFlatten
7 つのステップを期待している場合、常に 7 つのステップが必要になります。
したがって、以下の私の回答を適用する前に、レイヤーを使用しないようにモデルを修正してくださいFlatten
。代わりに、return_sequences=True
最後LSTM レイヤー。
次のコードはそれを修正し、以下の回答で使用するためのいくつかのものを準備します。
def createModel(forTraining):
#model for training, stateful=False, any batch size
if forTraining == True:
batchSize = None
stateful = False
#model for predicting, stateful=True, fixed batch size
else:
batchSize = 1
stateful = True
model = Sequential()
first_lstm = LSTM(32,
batch_input_shape=(batchSize, num_samples, num_features),
return_sequences=True, activation='tanh',
stateful=stateful)
model.add(first_lstm)
model.add(LeakyReLU())
model.add(Dropout(0.2))
#this is the last LSTM layer, use return_sequences=False
model.add(LSTM(16, return_sequences=False, stateful=stateful, activation='tanh'))
model.add(Dropout(0.2))
model.add(LeakyReLU())
#don't add a Flatten!!!
#model.add(Flatten())
model.add(Dense(1, activation='sigmoid'))
if forTraining == True:
compileThisModel(model)
これにより、7 ステップでトレーニングし、1 ステップで予測できるようになります。そうでなければ不可能です。
あなたの質問に対する解決策としてステートフルモデルの使用
まず、この新しいモデルには Flatten レイヤーがないので、再度トレーニングします。
trainingModel = createModel(forTraining=True)
trainThisModel(trainingModel)
この訓練されたモデルを使えば、ニューモデルトレーニング済みモデルを作成したのとまったく同じ方法ですが、stateful=True
すべての LSTM レイヤーにマークを付けます。また、トレーニング済みモデルから重みをコピーする必要があります。
これらの新しいレイヤーには固定のバッチ サイズ (Keras のルール) が必要になるため、バッチ サイズは 1 (m 個のストリームではなく、単一のストリームが来る) になると想定し、上記のモデル作成に追加しました。
predictingModel = createModel(forTraining=False)
predictingModel.set_weights(trainingModel.get_weights())
すると、次のようになります。1 つのステップでモデルの出力を予測するだけです。
pseudo for loop as samples arrive to your model:
prob = predictingModel.predict_on_batch(sample)
#where sample.shape == (1, 1, 3)
連続したシーケンスの終わりに到達したと判断した場合は、 を呼び出して、predictingModel.reset_states()
モデルが前のシーケンスの終わりでシーケンスを修正する必要があると判断することなく、新しいシーケンスを安全に開始できるようにします。
状態の保存と読み込み
取得して設定し、h5py で保存するだけです:
def saveStates(model, saveName):
f = h5py.File(saveName,'w')
for l, lay in enumerate(model.layers):
#if you have nested models,
#consider making this recurrent testing for layers in layers
if isinstance(lay,RNN):
for s, stat in enumerate(lay.states):
f.create_dataset('states_' + str(l) + '_' + str(s),
data=K.eval(stat),
dtype=K.dtype(stat))
f.close()
def loadStates(model, saveName):
f = h5py.File(saveName, 'r')
allStates = list(f.keys())
for stateKey in allStates:
name, layer, state = stateKey.split('_')
layer = int(layer)
state = int(state)
K.set_value(model.layers[layer].states[state], f.get(stateKey))
f.close()
状態の保存/読み込みの動作テスト
import h5py, numpy as np
from keras.layers import RNN, LSTM, Dense, Input
from keras.models import Model
import keras.backend as K
def createModel():
inp = Input(batch_shape=(1,None,3))
out = LSTM(5,return_sequences=True, stateful=True)(inp)
out = LSTM(2, stateful=True)(out)
out = Dense(1)(out)
model = Model(inp,out)
return model
def saveStates(model, saveName):
f = h5py.File(saveName,'w')
for l, lay in enumerate(model.layers):
#if you have nested models, consider making this recurrent testing for layers in layers
if isinstance(lay,RNN):
for s, stat in enumerate(lay.states):
f.create_dataset('states_' + str(l) + '_' + str(s), data=K.eval(stat), dtype=K.dtype(stat))
f.close()
def loadStates(model, saveName):
f = h5py.File(saveName, 'r')
allStates = list(f.keys())
for stateKey in allStates:
name, layer, state = stateKey.split('_')
layer = int(layer)
state = int(state)
K.set_value(model.layers[layer].states[state], f.get(stateKey))
f.close()
def printStates(model):
for l in model.layers:
#if you have nested models, consider making this recurrent testing for layers in layers
if isinstance(l,RNN):
for s in l.states:
print(K.eval(s))
model1 = createModel()
model2 = createModel()
model1.predict_on_batch(np.ones((1,5,3))) #changes model 1 states
print('model1')
printStates(model1)
print('model2')
printStates(model2)
saveStates(model1,'testStates5')
loadStates(model2,'testStates5')
print('model1')
printStates(model1)
print('model2')
printStates(model2)
データの側面に関する考察
最初のモデル( の場合stateful=False
)では、 内の各シーケンスは個別であり、他のシーケンスとは接続されていないとみなされますm
。また、各バッチには一意のシーケンスが含まれているとみなされます。
そうでない場合は、代わりにステートフル モデルをトレーニングすることをお勧めします (各シーケンスは実際には前のシーケンスに接続されていることを考慮して)。その場合、m
1 つのシーケンスのバッチが必要になります。-> m x (1, 7 or None, 3)
。