複数の API リクエストからの複数の読み取り可能なストリームを単一の書き込み可能なストリームにパイプする方法は? 質問する

複数の API リクエストからの複数の読み取り可能なストリームを単一の書き込み可能なストリームにパイプする方法は? 質問する

- 望ましい行動
- 実際の行動
- 私が試したこと
- 再現する手順
- 研究


望ましい行動

複数の API リクエストから受信した複数の読み取り可能なストリームを、単一の書き込み可能なストリームにパイプします。

APIレスポンスはIBM-WatsonのtextToSpeech.合成()方法。

複数のリクエストが必要な理由は、サービスに5KBテキスト入力の制限があるためです。

18KBしたがって、たとえばの文字列を完了するには 4 つのリクエストが必要になります。

実際の行動

書き込み可能なストリーム ファイルが不完全で文字化けしています。

アプリケーションがハングしているようです。

.mp3不完全なファイルをオーディオ プレーヤーで開こうとすると、破損しているというメッセージが表示されます。

ファイルを開いたり閉じたりするプロセスによってファイル サイズが増加するようです。ファイルを開くと、何らかの理由でより多くのデータが流入するようです。

望ましくない動作は、たとえば 4000 バイト以下の 4 つの文字列など、入力が大きい場合に顕著になります。

私が試したこと

npmパッケージを使用して、読み取り可能なストリームを単一の書き込み可能なストリームまたは複数の書き込み可能なストリームにパイプするいくつかの方法を試しました。複合ストリーム複合ストリーム2マルチストリームそしてアーカイバこれらすべての結果は不完全なファイルになります。私の最後の試みはパッケージを一切使用せず、Steps To Reproduce以下のセクションに示されています。

したがって、私はアプリケーション ロジックの各部分に疑問を抱いています。

01.Watson Text to Speech API リクエストの応答タイプは何ですか?

テキスト読み上げドキュメントたとえば、API 応答タイプは次のようになります。

Response type: NodeJS.ReadableStream|FileObject|Buffer

応答タイプが 3 つのうちの 1 つであることがわかり、困惑しています。

これまでのすべての試みにおいて、私はそれが であると想定してきましたreadable stream

02.マップ関数で複数の API リクエストを行うことはできますか?

03.各リクエストを 内にラップしてpromise()を解決できますかresponse?

04.結果の配列をpromises変数に割り当てることはできますか?

05.申告できますかvar audio_files = await Promise.all(promises)

06.この宣言の後、すべての応答は「完了」しますか?

07.各応答を書き込み可能なストリームに正しくパイプするにはどうすればよいですか?

08.すべてのパイプが終了したことを検出して、ファイルをクライアントに送り返すにはどうすればよいですか?

質問 2 - 6 については、答えは「はい」であると想定しています。

私の失敗は質問 7 と 8 に関係していると思います。

再現する手順

3975このコードは、それぞれバイトサイズが、3863、バイトの3974ランダムに生成された 4 つのテキスト文字列の配列でテストできます。3629ここにその配列のペーストビンがあります

// route handler
app.route("/api/:api_version/tts")
    .get(api_tts_get);

// route handler middleware
const api_tts_get = async (req, res) => {

    var query_parameters = req.query;

    var file_name = query_parameters.file_name;
    var text_string_array = text_string_array; // eg: https://pastebin.com/raw/JkK8ehwV

    var absolute_path = path.join(__dirname, "/src/temp_audio/", file_name);
    var relative_path = path.join("./src/temp_audio/", file_name); // path relative to server root

    // for each string in an array, send it to the watson api  
    var promises = text_string_array.map(text_string => {

        return new Promise((resolve, reject) => {

            // credentials
            var textToSpeech = new TextToSpeechV1({
                iam_apikey: iam_apikey,
                url: tts_service_url
            });

            // params  
            var synthesizeParams = {
                text: text_string,
                accept: 'audio/mp3',
                voice: 'en-US_AllisonV3Voice'
            };

            // make request  
            textToSpeech.synthesize(synthesizeParams, (err, audio) => {
                if (err) {
                    console.log("synthesize - an error occurred: ");
                    return reject(err);
                }
                resolve(audio);
            });

        });
    });

    try {
        // wait for all responses
        var audio_files = await Promise.all(promises);
        var audio_files_length = audio_files.length;

        var write_stream = fs.createWriteStream(`${relative_path}.mp3`);

        audio_files.forEach((audio, index) => {

            // if this is the last value in the array, 
            // pipe it to write_stream, 
            // when finished, the readable stream will emit 'end' 
            // then the .end() method will be called on write_stream  
            // which will trigger the 'finished' event on the write_stream    
            if (index == audio_files_length - 1) {
                audio.pipe(write_stream);
            }
            // if not the last value in the array, 
            // pipe to write_stream and leave open 
            else {
                audio.pipe(write_stream, { end: false });
            }

        });

        write_stream.on('finish', function() {

            // download the file (using absolute_path)  
            res.download(`${absolute_path}.mp3`, (err) => {
                if (err) {
                    console.log(err);
                }
                // delete the file (using relative_path)  
                fs.unlink(`${relative_path}.mp3`, (err) => {
                    if (err) {
                        console.log(err);
                    }
                });
            });

        });


    } catch (err) {
        console.log("there was an error getting tts");
        console.log(err);
    }

}

公式例表示:

textToSpeech.synthesize(synthesizeParams)
  .then(audio => {
    audio.pipe(fs.createWriteStream('hello_world.mp3'));
  })
  .catch(err => {
    console.log('error:', err);
  });

私の知る限り、これは単一のリクエストでは問題なく機能するようです。しかし、複数のリクエストでは機能しないようです。

研究

読み取り可能および書き込み可能なストリーム、読み取り可能なストリーム モード (フローおよび一時停止)、「データ」、「終了」、「ドレイン」および「終了」イベント、pipe()、fs.createReadStream()、および fs.createWriteStream() に関するもの


ほとんどすべての Node.js アプリケーションは、どれほど単純なものであっても、何らかの方法でストリームを使用します...

const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a Readable Stream
// `res` is an http.ServerResponse, which is a Writable Stream

let body = '';
// get the data as utf8 strings.
// if an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');

// readable streams emit 'data' events once a listener is added
req.on('data', (chunk) => {
body += chunk;
});

// the 'end' event indicates that the entire body has been received
req.on('end', () => {
try {
const data = JSON.parse(body);
// write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});

ストリームAPI


読み取り可能なストリームには、ストリームの利用方法に影響する 2 つの主なモードがあります。モードpausedまたはモードのいずれかになります。すべての読み取り可能なストリームは、デフォルトでは一時停止モードで開始されますが、必要に応じて簡単に切り替えたり元に戻したりflowingできます。イベント ハンドラーを追加するだけで一時停止ストリームがモードに切り替わり、イベント ハンドラーを削除するとストリームがモードに戻ります。flowingpauseddataflowingdatapaused

https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93


読み取りおよび書き込み可能なストリームで使用できる重要なイベントと関数のリストを以下に示します。

ここに画像の説明を入力してください

読み取り可能なストリーム上で最も重要なイベントは次のとおりです。

dataストリームがデータのチャンクをコンシューマーに渡すたびに発行されるイベント。ストリームendから消費されるデータがなくなったときに発行されるイベント。

書き込み可能なストリームで最も重要なイベントは次のとおりです。

drain書き込み可能なストリームがさらにデータを受信できることを示す信号であるイベント。すべてfinishのデータが基盤となるシステムにフラッシュされたときに発行されるイベント。

https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93


.pipe()からの「データ」および「終了」イベントのリッスンを処理しますfs.createReadStream()

https://github.com/substack/stream-handbook#ストリームを使用する理由


.pipe()読み取り可能なソースストリームsrcを受け取り、出力を書き込み可能な宛先ストリームに接続する関数です。dst

https://github.com/substack/stream-handbook#pipe


メソッドの戻り値はpipe()宛先ストリームである

https://flaviocopes.com/nodejs-streams/#pipe


デフォルトでは、ストリーム終了()ソースストリームが を発行するWritableと、宛先ストリームで が呼び出され、宛先は書き込み不可になります。このデフォルトの動作を無効にするには、オプションを として渡し、宛先ストリームを開いたままにします。Readable'end'endfalse

https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options


メソッドが呼び出され、すべてのデータが基盤となるシステムにフラッシュされた後にイベント'finish'が発行されます。stream.end()

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
  console.log('All writes are now complete.');
});

https://nodejs.org/api/stream.html#stream_event_finish


複数のファイルを読み取って書き込み可能なストリームにパイプする場合は、各ファイルを書き込み可能なストリームにパイプして、end: falseその際にパスする必要があります。これは、読み取り可能なストリームでは、読み取るデータがなくなるとデフォルトで書き込み可能なストリームが終了するためです。次に例を示します。

var ws = fs.createWriteStream('output.pdf');

fs.createReadStream('pdf-sample1.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample2.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample3.pdf').pipe(ws);

https://stackoverflow.com/a/30916248


最初の読み取りが完了するまで、2 番目の読み取りをイベント リスナーに追加します...

var a = fs.createReadStream('a');
var b = fs.createReadStream('b');
var c = fs.createWriteStream('c');
a.pipe(c, {end:false});
a.on('end', function() {
  b.pipe(c)
}

https://stackoverflow.com/a/28033554


ノードストリームの簡単な歴史 - パート1つそして


関連するGoogle検索:

複数の読み取り可能なストリームを単一の書き込み可能なストリームにパイプする方法は? nodejs

同じまたは類似のトピックに関する質問で、信頼できる回答がない(または「古い」可能性がある)もの:

複数の ReadableStream を単一の WriteStream にパイプする方法は?

異なる読み取り可能ストリームを介して同じ書き込み可能ストリームに 2 回パイプする

複数のファイルを 1 つの応答にパイプする

2 つのパイプ ストリームから Node.js ストリームを作成する

ベストアンサー1

ここで解決すべき中心的な問題は、非同期性です。ほぼ解決しました。投稿したコードの問題は、すべてのソース ストリームを並列かつ順序なしでターゲット ストリームにパイプしていることです。つまり、dataチャンクはさまざまなオーディオ ストリームからランダムに流れます。イベントがsendを追い越しても、ターゲット ストリームを早く閉じる必要はありません。これが、再度開いた後に増加する理由を説明できるかもしれません。pipeend

あなたが望むのは、それらを順番にパイプすることです - あなたは引用したときに解決策を投稿しました

最初の読み取りが完了するまで、2 番目の読み取りをイベント リスナーに追加します...

またはコードとして:

a.pipe(c, { end:false });
a.on('end', function() {
  b.pipe(c);
}

これにより、ソース ストリームが順番にターゲット ストリームにパイプされます。

コードを取得すると、audio_files.forEachループを次のように置き換えることになります。

await Bluebird.mapSeries(audio_files, async (audio, index) => {  
    const isLastIndex = index == audio_files_length - 1;
    audio.pipe(write_stream, { end: isLastIndex });
    return new Promise(resolve => audio.on('end', resolve));
});

の使用に注意してくださいbluebird.js マップシリーズここ。

コードに関するさらなるアドバイス:

  • 使用を検討すべきjs の
  • の代わりにconst&を使うべきであり、letvarcamelCase
  • 「1 つのイベントでは動作するが、複数のイベントでは失敗する」ことに気付いた場合は、常に非同期性、順列、競合状態について考えます。

さらに詳しい情報、ネイティブ ノード ストリームを組み合わせる際の制限事項:参考:

おすすめ記事