ステートフル LSTM とストリーム予測 質問する

ステートフル LSTM とストリーム予測 質問する

私は、LSTM モデル (Keras と TF で構築) を、それぞれ 3 つの特徴を持つ 7 つのサンプルの複数のバッチでトレーニングしました。サンプルの形状は以下のとおりです (以下の数字は説明のためのプレースホルダーです)。各バッチには 0 または 1 のラベルが付けられています。

データ:

[
   [[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3]]
   [[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3]]
   [[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3]]
   ...
]

すなわち、それぞれの長さが7のm個のシーケンスのバッチで、その要素は3次元ベクトルである(したがって、バッチの形状は(m73))

目標:

[
   [1]
   [0]
   [1]
   ...
]

私の実稼働環境では、データは 3 つの特徴を持つサンプルのストリームです ( [1,2,3],[1,2,3]...)。各サンプルがモデルに到着するとすぐにストリーミングし、バッチ全体を待たずに中間確率を取得したいと思います (7) - 下のアニメーションを参照してください。

ここに画像の説明を入力してください

私の考えの 1 つは、欠落しているサンプルのバッチに 0 を追加することでした[[0,0,0],[0,0,0],[0,0,0],[0,0,0],[0,0,0],[0,0,0],[1,2,3]]が、これは非効率的であるようです。

次のサンプルを待機しながら LSTM 中間状態を永続的に保存し、部分的なデータを使用して特定のバッチ サイズでトレーニングされたモデルで予測するという正しい方向を指し示してくれる助けがあれば幸いです。


アップデート、モデルコードを含む:

    opt = optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=10e-8, decay=0.001)
    model = Sequential()

    num_features = data.shape[2]
    num_samples = data.shape[1]

    first_lstm = LSTM(32, batch_input_shape=(None, num_samples, num_features), 
                      return_sequences=True, activation='tanh')
    model.add(first_lstm)
    model.add(LeakyReLU())
    model.add(Dropout(0.2))
    model.add(LSTM(16, return_sequences=True, activation='tanh'))
    model.add(Dropout(0.2))
    model.add(LeakyReLU())
    model.add(Flatten())
    model.add(Dense(1, activation='sigmoid'))

    model.compile(loss='binary_crossentropy', optimizer=opt,
                  metrics=['accuracy', keras_metrics.precision(), 
                           keras_metrics.recall(), f1])

モデルの概要:

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
lstm_1 (LSTM)                (None, 100, 32)           6272      
_________________________________________________________________
leaky_re_lu_1 (LeakyReLU)    (None, 100, 32)           0         
_________________________________________________________________
dropout_1 (Dropout)          (None, 100, 32)           0         
_________________________________________________________________
lstm_2 (LSTM)                (None, 100, 16)           3136      
_________________________________________________________________
dropout_2 (Dropout)          (None, 100, 16)           0         
_________________________________________________________________
leaky_re_lu_2 (LeakyReLU)    (None, 100, 16)           0         
_________________________________________________________________
flatten_1 (Flatten)          (None, 1600)              0         
_________________________________________________________________
dense_1 (Dense)              (None, 1)                 1601      
=================================================================
Total params: 11,009
Trainable params: 11,009
Non-trainable params: 0
_________________________________________________________________

ベストアンサー1

もっと簡単な解決策があるかもしれないと思います。

モデルに畳み込み層や長さ/ステップの次元に作用するその他の層がない場合は、単に次のようにマークすることができます。stateful=True

警告: モデルには長さの次元に作用するレイヤーがあります。

レイヤーFlattenは長さのディメンションをフィーチャのディメンションに変換します。これにより、目標の達成が完全に妨げられます。レイヤーがFlatten7 つのステップを期待している場合、常に 7 つのステップが必要になります。

したがって、以下の私の回答を適用する前に、レイヤーを使用しないようにモデルを修正してくださいFlatten。代わりに、return_sequences=True最後LSTM レイヤー。

次のコードはそれを修正し、以下の回答で使用するためのいくつかのものを準備します。

def createModel(forTraining):

    #model for training, stateful=False, any batch size   
    if forTraining == True:
        batchSize = None
        stateful = False

    #model for predicting, stateful=True, fixed batch size
    else:
        batchSize = 1
        stateful = True

    model = Sequential()

    first_lstm = LSTM(32, 
        batch_input_shape=(batchSize, num_samples, num_features), 
        return_sequences=True, activation='tanh', 
        stateful=stateful)   

    model.add(first_lstm)
    model.add(LeakyReLU())
    model.add(Dropout(0.2))

    #this is the last LSTM layer, use return_sequences=False
    model.add(LSTM(16, return_sequences=False, stateful=stateful,  activation='tanh'))

    model.add(Dropout(0.2))
    model.add(LeakyReLU())

    #don't add a Flatten!!!
    #model.add(Flatten())

    model.add(Dense(1, activation='sigmoid'))

    if forTraining == True:
        compileThisModel(model)

これにより、7 ステップでトレーニングし、1 ステップで予測できるようになります。そうでなければ不可能です。

あなたの質問に対する解決策としてステートフルモデルの使用

まず、この新しいモデルには Flatten レイヤーがないので、再度トレーニングします。

trainingModel = createModel(forTraining=True)
trainThisModel(trainingModel)

この訓練されたモデルを使えば、ニューモデルトレーニング済みモデルを作成したのとまったく同じ方法ですが、stateful=Trueすべての LSTM レイヤーにマークを付けます。また、トレーニング済みモデルから重みをコピーする必要があります。

これらの新しいレイヤーには固定のバッチ サイズ (Keras のルール) が必要になるため、バッチ サイズは 1 (m 個のストリームではなく、単一のストリームが来る) になると想定し、上記のモデル作成に追加しました。

predictingModel = createModel(forTraining=False)
predictingModel.set_weights(trainingModel.get_weights())

すると、次のようになります。1 つのステップでモデルの出力を予測するだけです。

pseudo for loop as samples arrive to your model:
    prob = predictingModel.predict_on_batch(sample)

    #where sample.shape == (1, 1, 3)

連続したシーケンスの終わりに到達したと判断した場合は、 を呼び出して、predictingModel.reset_states()モデルが前のシーケンスの終わりでシーケンスを修正する必要があると判断することなく、新しいシーケンスを安全に開始できるようにします。


状態の保存と読み込み

取得して設定し、h5py で保存するだけです:

def saveStates(model, saveName):

    f = h5py.File(saveName,'w')

    for l, lay in enumerate(model.layers):
        #if you have nested models, 
            #consider making this recurrent testing for layers in layers
        if isinstance(lay,RNN):
            for s, stat in enumerate(lay.states):
                f.create_dataset('states_' + str(l) + '_' + str(s),
                                 data=K.eval(stat), 
                                 dtype=K.dtype(stat))

    f.close()


def loadStates(model, saveName):

    f = h5py.File(saveName, 'r')
    allStates = list(f.keys())

    for stateKey in allStates:
        name, layer, state = stateKey.split('_')
        layer = int(layer)
        state = int(state)

        K.set_value(model.layers[layer].states[state], f.get(stateKey))

    f.close()

状態の保存/読み込みの動作テスト

import h5py, numpy as np
from keras.layers import RNN, LSTM, Dense, Input
from keras.models import Model
import keras.backend as K




def createModel():
    inp = Input(batch_shape=(1,None,3))
    out = LSTM(5,return_sequences=True, stateful=True)(inp)
    out = LSTM(2, stateful=True)(out)
    out = Dense(1)(out)
    model = Model(inp,out)
    return model


def saveStates(model, saveName):

    f = h5py.File(saveName,'w')

    for l, lay in enumerate(model.layers):
        #if you have nested models, consider making this recurrent testing for layers in layers
        if isinstance(lay,RNN):
            for s, stat in enumerate(lay.states):
                f.create_dataset('states_' + str(l) + '_' + str(s), data=K.eval(stat), dtype=K.dtype(stat))

    f.close()


def loadStates(model, saveName):

    f = h5py.File(saveName, 'r')
    allStates = list(f.keys())

    for stateKey in allStates:
        name, layer, state = stateKey.split('_')
        layer = int(layer)
        state = int(state)

        K.set_value(model.layers[layer].states[state], f.get(stateKey))

    f.close()

def printStates(model):

    for l in model.layers:
        #if you have nested models, consider making this recurrent testing for layers in layers
        if isinstance(l,RNN):
            for s in l.states:
                print(K.eval(s))   

model1 = createModel()
model2 = createModel()
model1.predict_on_batch(np.ones((1,5,3))) #changes model 1 states

print('model1')
printStates(model1)
print('model2')
printStates(model2)

saveStates(model1,'testStates5')
loadStates(model2,'testStates5')

print('model1')
printStates(model1)
print('model2')
printStates(model2)

データの側面に関する考察

最初のモデル( の場合stateful=False)では、 内の各シーケンスは個別であり、他のシーケンスとは接続されていないとみなされますm。また、各バッチには一意のシーケンスが含まれているとみなされます。

そうでない場合は、代わりにステートフル モデルをトレーニングすることをお勧めします (各シーケンスは実際には前のシーケンスに接続されていることを考慮して)。その場合、m1 つのシーケンスのバッチが必要になります。-> m x (1, 7 or None, 3)

おすすめ記事