BT

最新技術を追い求めるデベロッパのための情報コミュニティ

寄稿

Topics

地域を選ぶ

InfoQ ホームページ アーティクル C# 8の非同期ストリーム

C# 8の非同期ストリーム

原文(投稿日:2018/09/05)へのリンク

Async/Awaitは、ユーザインターフェースの応答性とリソースへのWebアクセスを改善するために、C# 5で導入されました。別の言い方をすれば、Async Methodは、スレッドをブロックせずにひとつのスカラ値を結果として返す、非同期操作を実行するためのものなのです。

非同期操作を単純化するためにMicrosoftは、さまざまな方法を試しました。その結果得られたasync/awaitパターンは、理解の容易なアプローチによって、開発者の間で好意的に受け入れられました。

従来の非同期メソッドの大きな制限のひとつは、戻り値としてスカラ(ひとつの値)を持たなくてはならない点でした。async Task < int > DoAnythingAsync()というメソッドを考えてみましょう。DoAnythingAsyncの処理結果は(ひとつの)整数値になります。

このような制限があるので、yieldキーワードを使用したり、async IEnumerable < int >(非同期の列挙型を返す)というような使い方をすることはできません。

async/await機能とyield演算子を併用することができれば、非同期データプル(asynchronous data pull)あるいはプルベース列挙型(pull based enumeration)と呼ばれる、極めて強力なプログラミングモデルが使用可能になります。F#ではこれを非同期シーケンス(async sequence)と呼んでいます。

C# 8で新たに提案された機能である非同期ストリーム(Async Streams)では、結果がスカラ値でなくてはならないという制限が排除されて、非同期メソッドから複数の結果を返すことが可能になります。

この変更により、asyncパターンはより柔軟になり、データベースから遅延非同期シーケンスでデータを取得したり、あるいはデータが利用可能になった時にチャンクで返すような非同期シーケンスを使ってデータをダウンロードしたりすることが可能になります。

例:

foreach await (var streamChunck in asyncStreams)
{
  Console.WriteLine($"Received data count = {streamChunck.Count}");
} 

非同期プログラミングの問題を解決するもうひとつのアプローチは、Reactive Extensions(Rx)です。Rxは開発者の間で広く受け入れられていて、JavaやJavaScriptといった他のプログラミング言語では、こちらの手法が実装(RxJava、RxJS)されています。Rxはプッシュプログラミングモデル(Push Programming Model)(Tell Don’t Askシステム)に基づいており、リアクティブプログラミング(Reactive Programming)とも呼ばれています。リアクティブプログラミングは、イベント駆動プログラミングの特殊な形式で、通知ではなくデータを処理することが特徴です。

一般的に、プッシュプログラミングモデルでは、パブリッシャをコントロールする必要はありません。データはキューに非同期にプッシュされ、データが到着するとコンシューマがデータを使用します。Rxとは違い、非同期ストリームはオンデマンドで呼び出されて、列挙の終端に到達するまで値を生成することができます。

この記事では、プルベースモデルとプッシュベースモデルを比較して、どちらの手法がどのようなシナリオに適しているかを述べたいと思っています。多数のサンプルとコードデモを使って概念全体とメリットを示した上で、最後に非同期ストリーム機能を説明して、デモコードを紹介します。

プルプログラミングモデルとプッシュプログラミングモデル

[画像をクリックすると拡大します]

 

図1- プルプログラミングモデルとプッシュプログラミングモデル

この例は、食品などで見られる生産者(プロデューサ)と消費者(コンシューマ)の関係を示したものですが、図1に示すように、このプロデューサは食品ではなくデータを生成し、コンシューマは生成されたデータを消費します。プルモデルは理解が容易で、コンシューマがプロデューサにデータを要求し、プルします。もうひとつのアプローチはプッシュモデルで、プロデューサがデータをキューにパブリッシュ(publish)します。コンシューマは、必要なデータを受信するためにサブスクライブ(subscribe)する必要があります。

“プロデューサがコンシューマよりも速い”ユースケースでは、オーバーフローの問題を回避するため、コンシューマが必要なデータをプロデューサからプルするプルモデルが適しています。逆に“コンシューマがプロデューサよりも速い”場合には、プロデューサがより多くのデータをプッシュして、コンシューマの不要な待機時間を避けるために、プッシュモデルが適しています。

RxとAkka Streams(ストリームプログラミングモデル)では、フロー制御機構のひとつであるバックプレッシャ(backpressure)技術を使用して、プルまたはプッシュモデルによって前述のプロデューサ/コンシューマの問題を解決しています。

2番目の例では、速度の遅いコンシューマが使用されていて、速度の速いプロデューサからデータの非同期シーケンスをプルします。コンシューマが要素を処理すると、プロデューサに対して次の要素を再び要求します。この処理は、シーケンスの最後に達するまで続きます。

モチベーションと背景

非同期ストリームが必要な理由を理解するために、次のコードを検討してみましょう。

// Loops and sums the provided argument (count)
static int SumFromOneToCount(int count)
{
  ConsoleExt.WriteLine("SumFromOneToCount called!");

  var sum = 0;
  for (var i = 0; i <= count; i++)
  {
    sum = sum + i;
  }
  return sum;
}

メソッド呼び出し:

const int count = 5;
ConsoleExt.WriteLine($"Starting the application with count: {count}!");
ConsoleExt.WriteLine("Classic sum starting.");
ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}");
ConsoleExt.WriteLine("Classic sum completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

出力:

[画像をクリックすると拡大します]

 

次のようにyield演算子を使えば、このメソッドに遅延処理をさせることができます。

static IEnumerable<int> SumFromOneToCountYield(int count)
{
  ConsoleExt.WriteLine("SumFromOneToCountYield called!");

  var sum = 0;
  for (var i = 0; i <= count; i++)
  {
    sum = sum + i;

    yield return sum;
  }
}

メソッド呼び出し:

const int count = 5;
ConsoleExt.WriteLine("Sum with yield starting.");
foreach (var i in SumFromOneToCountYield(count))
{
  ConsoleExt.WriteLine($"Yield sum: {i}");
}
ConsoleExt.WriteLine("Sum with yield completed.");

ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

出力:

[画像をクリックすると拡大します]

 

上の出力ウィンドウで分かるように、結果は単一の値ではなく、一部分ずつ返されます。これは処理結果の蓄積を示すもので、遅延列挙(lazy enumeration)と言われます。それでも問題は残ります - sumメソッドがコードをブロックするのです。スレッドを確認すれば、すべてがメインスレッドで実行されているのが分かります。

そこでマジックワードasyncを、最初のメソッドであるSumFromOneToCountに(yieldなしで)適用してみましょう。

static async Task<int> SumFromOneToCountAsync(int count)
{
  ConsoleExt.WriteLine("SumFromOneToCountAsync called!");

  var result = await Task.Run(() =>
  {
    var sum = 0;

    for (var i = 0; i <= count; i++)
    {
      sum = sum + i;
    }
    return sum;
  });

  return result;
}

メソッド呼び出し:

const int count = 5;
ConsoleExt.WriteLine("async example starting.");
// Sum runs asynchronously! Not enough. We need sum to be async with lazy behavior.
var result = await SumFromOneToCountAsync(count);
ConsoleExt.WriteLine("async Result: " + result);
ConsoleExt.WriteLine("async completed.");

ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

出力:

[画像をクリックすると拡大します]

 

よろしい、コンピューティングが別のスレッドで動作していることが分かりました。ですが、まだ、結果に問題があります。結果がひとつの値として返されているのです!

ここで、遅延列挙型(yield return)とasyncメソッドを命令的に組み合わることができたなら、と考えてみてください。この組み合わせが非同期ストリームなのです。この機能はC# 8で新たに提案されました。この新機能により、Webサイトからデータをダウンロードする、あるいはファイルやデータベースから最新の方法でレコードを読み込む、といったプルプログラミングも大宇の問題点を解決するための、優れたテクニックが手に入ります。

これを現行バージョンのC#で試してみましょう。次のように、SumFromOneToCountYieldメソッドにasyncキーワードを追加します。

[画像をクリックすると拡大します]

 

図2- asyncキーワードとyieldを組み合わせた場合のエラー

SumFromOneToCountYieldにasyncを加えようとすると即座に、上記のようなエラーが発生します!

他の方法を考えましょう。タスクにIEnumerableを配置すれば、下記のようにyieldキーワードは不要になります。

static async Task<IEnumerable<int>> SumFromOneToCountTaskIEnumerable(int count)
{
  ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable called!");
  var collection = new Collection<int>();

  var result = await Task.Run(() =>
  {
    var sum = 0;

    for (var i = 0; i <= count; i++)
    {
      sum = sum + i;
      collection.Add(sum);
    }
    return collection;
  });

  return result;
}

メソッド呼び出し:

const int count = 5;
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable started!");
var scs = await SumFromOneToCountTaskIEnumerable(count);
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable done!");

foreach (var sc in scs)
{
  // !!!This is not what we need we become the result in one block!!!!
  ConsoleExt.WriteLine($"AsyncIEnumerable Result: {sc}");
}

ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

出力:

[画像をクリックすると拡大します]

 

スレッドで分かるように、すべての計算が非同期に行なわれるようになりましたが、まだ問題があります。結果(すべての結果はコレクションに収集されます)が、ひとつのブロックとして返されているのです。私たちの望む遅延動作ではありません。私たちの目標は、遅延動作と非同期コンピューティングスタイルの組み合わせであったことを思い返してください。

希望する動作を実現するためには、Ix(Rxの一部)のような外部ライブラリを使用するか、あるいはC#の新機能である非同期ストリームを使用する必要があります。

コード例に戻りましょう。ここでは、非同期動作を行うために外部ライブラリを使用しています。

static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence)
{
  ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called");

  await sequence.ForEachAsync(value =>
  {
    ConsoleExt.WriteLineAsync($"Consuming the value: {value}");

    // simulate some delay!
    Task.Delay(TimeSpan.FromSeconds(1)).Wait();
  });
}

static IEnumerable<int> ProduceAsyncSumSeqeunc(int count)
{
  ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called");
  var sum = 0;

  for (var i = 0; i <= count; i++)
  {
    sum = sum + i;

    // simulate some delay!
    Task.Delay(TimeSpan.FromSeconds(0.5)).Wait();

    yield return sum;
  }
}

メソッド呼び出し:

const int count = 5;
ConsoleExt.WriteLine("Starting Async Streams Demo!");

// Start a new task. Used to produce async sequence of data!
IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count).ToAsyncEnumerable();

ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#");

// Start another task; Used to consume the async data sequence!
var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence));

// Just for demo! Wait until the task is finished!
consumingTask.Wait();
ConsoleExt.WriteLineAsync("Async Streams Demo Done!"

Output:

[画像をクリックすると拡大します]

 

これでやっと、希望した動作を達成することができました!列挙に対して、非同期にイテレーションすることができます。

ソースコードはこちらです。

クライアント/サーバーによる非同期プル

もっと現実的な例を使って、概念を示してみましょう。この機能のメリットを説明する上では、クライアント/サーバアーキテクチャが最適なシナリオです。

クライアント/サーバ同期呼び出し

クライアントがサーバに要求を送信します。下の図3に示すように、サーバから応答が返るまで、クライアントは待たなくてはなりません(クライアントはブロックされます)。

図3- 同期データプルでは、クライアントは要求が完了するまで待機する

非同期データプル

この場合は、クライアントはデータを要求して、他の処理を継続して実行します。データが到着すると、クライアントはその処理を継続します。

図4- 非同期データプルでは、データを要求している間、クライアントは他の処理を行なうことができる

非同期シーケンスデータプル

このケースでは、クライアントはデータのチャンクを要求した後、何か他のことを継続して実行します。データチャンクが到着すると、クライアントプロセスはそのデータを受信し、次のデータチャンクを要求した上で、最後のデータチャンクまで処理を続けます。このシナリオはまさに、非同期ストリームの発想の元になっています。図5は、別の作業を行なっているクライアントが、データチャンク到着時にそのデータを処理することを示しています。

[画像をクリックすると拡大します]

 

図5- 非同期シーケンスデータプル(非同期ストリーム)。クライアントはブロックされない!

非同期ストリーム

IEnumerable<T>IEnumerator<T>と同じように、新たにIAsyncEnumerable<T> IAsyncEnumerator<T>という2つのインターフェースが、下記のように定義されています。

public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator();
    }

    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        Task<bool> MoveNextAsync();
        T Current { get; }
    }

   // The Async Streams Feature can also asynchronous disposed
   public interface IAsyncDisposable
   {
      Task DiskposeAsync();
   }

Jonathan Allen氏がInfoQでこのテーマを取り上げています。ここでそれをすべて繰り返すつもりはありませんので、合わせて氏の記事を読んで頂くことをお勧めします。

Task<bool> MoveNextAsync()のトリックが、(bool to Task<bool>, bool IEnumerator.MoveNext()に代えて)値を返します。これにより、計算処理全体に加えて、戻り値のイテレーションも非同期化することができるようになります。次の値の待機が可能であることを判断するのは、コンシューマです。非同期ではあっても、基本的にはプルモデルが維持されています。非同期のクリーンアップを可能にするために、IAsyncDisposableが用意されています。非同期ストリームに関する詳細は、こちらで確認してください

構文

最終的な構文は次のようになります。

foreach await (var dataChunk in asyncStreams)
{
  // Working the yield dataChunk! Or doing something else!
}

この例のように単一の値を計算するだけでなく、複数の値を順次計算しながら、非同期処理を行ってそれを待機することも可能です。

Microsoftによるサンプルの改訂

Microsoft Codeにあるデモを書き直してみました。いずれも私のGitHubからダウンロード可能です。

基本的な考え方は、大規模なMemoryStream(20,000バイト配列)を作成して、コレクションあるいは今回のシナリオであるメモリストリームの各要素に対して、非同期に反復処理を行なうというものです。それぞれの処理では、配列から8Kをプルします。

(1)で大きなバイト配列を作成してダミー値を埋め込み、(2)checksumという値を宣言しています。このチェックサムを使用して、計算の合計が正しいことを確認します。配列とチェックサムはメモリ上に生成され、(3)で示すようなタプルで返されます。

(4)AsEnumarble(正確にはAsAsyncEnumarble)は、非同期ストリームを8kbブロック((6)のBufferSize = 8000)でシミュレートするための拡張メソッドです。

通常ならばIAsyncEnumerableを継承する必要はありませんが、Microsoftの上記の例では、(5)で見るようなデモの簡略化のために継承を定義しています。

(7)の“foreach”では、非同期メモリストリームからデータを8kbのチャンクでプルします。プル処理は、コンシューマ(foreachコードブロック)によるデータ受信の準備ができたタイミングで順次実行されて、プロデューサ(メモリストリーム配列)からデータをとり出します。最終的にイテレーションが終了すると、(8)で示すように’c’とチェックサムの比較が行われて、一致すれば“Checksums match!”が書き込まれます。

Microsoft Demoの出力ウィンドウ:

[画像をクリックすると拡大します]

 

要約

ここまでは、複数の値を非同期に生成する計算処理の記述に使用可能な、優れた非同期プル手法である非同期ストリームについて議論してきました。

非同期ストリームの背景にあるコンセプトは、非同期プルモデルです。シーケンスの次の要素を要求することで、結果として応答が得られます。これは、コンシューマの状態とは無関係に値を生成する、IObservable<T>のプッシュモデルとは異なるものです。非同期ストリームは、コンシューマが次のデータを処理する準備ができていない場合など、コンシューマがコントロール可能な非同期データソースを表現する、洗練された手段を提供します。Webアプリケーションやデータベースからのレコード読み取りなどがその例です。

この記事では、非同期列挙を生成し、それを外部の非同期シーケンスライブラリで処理する方法を紹介しました。さらに、Webからのダウンロードにおいて、この機能がいかにメリットのあるものなのか、という概念も紹介しました。そして最後に、Microsoft Build Demo Code(2018年5月7~9日にワシントン州シアトルで開催)に基づいた、非同期ストリームの新たな構文と、その完全な例を見てきました。

著者について

Bassam AlugiliはSTRATEC AGのシニアソフトウェアスペシャリストで、データベースの専門家です。STRATECは、完全自動化分析システム、ラボラトリデータ管理ソフトウェア、スマート消耗品(smart consumable)に関する世界有数のパートナです。

この記事に星をつける

おすすめ度
スタイル

BT