Tokio ランタイムのコンテキスト内で非同期メソッドから呼び出された非非同期メソッド内で future を待機するにはどうすればよいですか? 質問する

Tokio ランタイムのコンテキスト内で非同期メソッドから呼び出された非非同期メソッド内で future を待機するにはどうすればよいですか? 質問する

私は非同期処理を実行するために Tokio 1.1 を使用しています。async mainwithを持っている#[tokio::main]ので、すでにランタイムで動作しています。

main非同期メソッドを呼び出しますが、ここではawait将来実行したい (具体的には、データフュージョン データフレームから収集しています)。この非同期メソッドには、ではなく構造体を返す特性によって規定されたシグネチャがありますFuture<Struct>。私が知る限り、これを非同期としてマークすることはできません。

電話をかけてみるとdf.collect().await;

async関数とブロック内でのみ許可されます

コンパイラからのエラーで、呼び出しているメソッドがawaitではないことが指摘されていますasync

block_on次のように新しいランタイムから futureを試してみます。

tokio::runtime::Builder::new_current_thread()
    .build()
    .unwrap()
    .block_on(df.collect());

ランタイムパニックが発生します:

ランタイム内からランタイムを開始することはできません。これは、block_onスレッドが非同期タスクの実行に使用されているときに、関数 (など) が現在のスレッドをブロックしようとしたために発生します。

試してみるとfutures::executor::block_on(df.collect()).unwrap();、新たなランタイムパニックが発生します。

「現在、Tokio 0.2.x ランタイムでは実行されていません。」

私が使用しているのは Tokio v1.1 なので、これは奇妙です。

これは思ったよりも難しいように感じます。私は非同期コンテキスト内にいるので、コンパイラはそれを認識し、.awaitメソッド内からの呼び出しを許可する必要があるように感じます。唯一のコード パスは、ブロック内からこのメソッドを呼び出しますasync。私が見逃している、これを行う簡単な方法はありますか?

ベストアンサー1

私は非同期コンテキスト内にいるので、コンパイラはそれを認識し、メソッド内から.awaitを呼び出すことを許可する必要があるようです。

awaitランタイムのコンテキスト内であるかどうかに関係なく、同期関数の内部で を実行することは基本的に不可能です。awaitは、yield ポイントに変換され、async関数は、これらの yield ポイントを使用して非同期計算を実行するステート マシンに変換されます。関数を としてマークしないとasync、この変換は不可能です。

質問を正しく理解していれば、次のコードがあるはずです。

#[tokio::main]
async fn main() {
    let foo = Foo {};
    foo.bar()
}

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        df.collect().await
    }
}

問題は、が としてマークされていないため、df.collect内からawait できないことです。 のシグネチャを変更できる場合は、で説明した回避策を使用して async メソッドを作成できます。barasyncTraitTrait::bar特性で非同期メソッドを定義するにはどうすればよいですか?

のシグネチャを変更できない場合Traitは問題があります。非同期関数は一度もない到達せずに長い時間を過ごす.awaitfuture-rs でブロッキング I/O をカプセル化する最適な方法は何ですか?spawn_blocking非同期コードに移行するときに使用できます。

#[tokio::main]
async fn main() {
    let foo = Foo {};
    tokio::task::spawn_blocking(move || foo.bar()).await
}

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        df.collect().await
    }
}

ここで、待機せずに完了まで実行する方法が必要ですdf.collect。この問題を解決するために、ネストされたランタイムを作成しようとしたと述べています。

新しいランタイムから未来をブロックしようとすると...パニックになります

しかし、tokioではネストされたランタイムを作成することはできません。できた新しいを作成します、独立したランタイムについては、Tokio ランタイム内に別の Tokio ランタイムを作成するにはどうすればよいでしょうかただし、ネストされたランタイムを生成するのは非効率的です。

新しいランタイムを生成する代わりに、現在ランタイム:

let handle = Handle::current();

ランタイム コンテキストを入力します。

handle.enter();

そして、次のようにして future を完了まで実行しますfutures::executor::block_on

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        let handle = Handle::current();
        handle.enter();
        futures::executor::block_on(df.collect())
    }
}

tokio ランタイム コンテキストを入力すると、以前に発生していたエラーが解決されます。

futures::executor::block_on(df.collect()).unwrap(); を試みると、新たなランタイムパニックが発生します。not currently running on a Tokio 0.2.x runtime

可能であれば、これを避けるようにしてください。最適な解決策は、 を通常どおりにマークすることです。Trait::bar上記を含むその他の解決策では、指定された future が完了するまで現在のスレッドをブロックします。asyncawait

説明してくれた@AliceRyhlに感謝します

おすすめ記事