ここ数年間,Javaの世界は,リアクティブプログラミングの方向へ強く進んでいます。非ブロックAPIを使用したNodeJS開発の成功,潜在能力の高いマイクロサービスの爆発的人気,あるいは単にコンピューティングリソースをもっと効率的に使用したいという要求など理由はさまざまですが,多くの開発者が,実用的なプログラミングモデルとしてのリアクティブプログラミングに注目し始めています。
幸いにもJava開発者には,リアクティブなフレームワークと適切な使用方法に関しては,目移りするほど潤沢な選択肢が用意されています。リアクティブなコードを書く上で“間違った”方法というのは多くありませんが,そこに落とし穴があります — リアクティブコードを書く上でこれが“正しい”と言える方法も,実はほとんどないのです。
この記事では,リアクティブなコードをいかに書くべきかという点について,やや独断的な提案をしたいと思います。ご紹介する意見は,大規模なリアクティブAPIの開発に長年携わった経験を基にしているため,すべての読者に当てはまるものではないかも知れませんが,リアクティブの旅を始める上で何らかのヒントになればと思います。
この記事で取り上げた例は,すべてCloud Foundry Java Clientからのものです。このプロジェクトでは,リアクティブフレームワークとしてProject Reactorを採用しています。私たちがJava ClientでReactorを選んだのは,Springチームとの緊密な統合のためですが,ここで論じているコンセプトはすべて,RxJavaなど他のリアクティブフレームワークにも当てはまるものです。Cloud Foundyの知識がある程度あれば役に立ちますが,必須ではありません。例では自己説明的な命名法を用いていますので,それぞれの名前もリアクティブな概念を理解する上で一助となると思います。 |
リアクティブプログラミングそのものは記事の範囲を大きく越えるような広大なテーマですが,ここでは私たちの目的のために必要な範囲として,“従来の命令型のプログラミングスタイルよりも流暢(fluent)な方法でイベント駆動システムを定義する方法”と定義しておきたいと思います。目標は,従来の命令型によるロジックを,より理解と論証が容易な,非同期かつ非ブロック型の関数スタイルに移行することです。
さまざまなビヘイビア(behavior、スレッドやNIOコールバックなど)のために設計された命令型APIの多くは,適切かつ信頼性の高い使いやすさという観点を欠いています。そのため、この種のAPIを使用するケースの大部分では,依然としてアプリケーションコード側での明示的な管理が必要になります。リアクティブフレームワークでは,これらの問題を舞台裏で処理することが可能になるため,アプリケーション機能に重点を置いたコードの記述が可能になります。
リアクティブプログラミングを使うべきか?
リアクティブAPIを設計する場合,最初に疑問に思うのは,本当にリアクティブAPIが必要なのか,ということです。リアクティブAPIがすべてにおいて,絶対に正しい選択肢という訳ではありません。リアクティブAPIにも明白な欠点はあります(現時点で最も大きな問題はデバッグですが,フレームワークとIDEでこれに対処しようとしています)。逆に価値がデメリットを大幅に上回れば,リアクティブAPIを選択すべき,ということになります。この判断を下す上で,リアクティブプログラムが明らかに適しているパターンというものがいくつか存在します。
ネットワーキング
ネットワーク要求には本質的に(比較的)大きなレイテンシが関わるため,これらに対する応答の待機は,システム内のリソースを多大に消費することが少なくありません。リアクティブでないアプリケーションでは,このような待機要求はスレッドをブロックするため,スタックメモリを消費しながら,何もすることなく応答の到着を待つことになります。リモート側で障害やタイムアウトが発生した場合,提供されているAPIでは簡単に処理できないため,システム的および明示的に処理されないことが少なくありません。そして最後に,リモート呼び出しからのペイロードはサイズが不明であることが多いため,ヒープメモリの浪費につながります。リアクティブプログラミングと非ブロッキングIOと組み合わせは,それぞれに明確かつ明示的なAPIの提供を可能にすることで,これらの問題に対処します。
並行性の高い処理
ネットワーク要求や並列化可能なCPU集約型計算などといった,並列性の高い処理の調整にも適しています。スレッディングの明示的な管理も可能ですが,リアクティブフレームワークが特に優れているのは,スレッディング管理を自動的に行う場合です。.faltMap()などの演算子では,ビヘイビアを透過的に並列化して,利用可能なリソースを最大限に使用します。
大規模アプリケーション
接続ごとにひとつのスレッドを用意するサーブレットモデルは,長年にわたって私たちが愛用してきたものです。 しかしマイクロサービスでアプリケーションが大規模になり始める(単一のステートレスアプリケーションに対して,25,50,あるいは100インスタンス)と,CPU使用率がアイドルの場合でも接続負荷を処理する必要が生じてきました。非ブロッキングIOの選択と,その旨味を引き出すリアクティブプログラミングによってこの結び付きを克服し,リソースをより効率的に活用することが可能になります。これによるメリットには,しばしば驚くべきものがあります。8スレッドのNetty上に構築された同じアプリケーションの同じ負荷を処理するために,数百ないし数千のスレッドを備えたTomcat上で,複数のアプリケーションインスタンスを実行しなければならないことも少なくないのです。
ここに挙げた例が,リアクティブプログラミングが有効な場所のすべてだと考えるべきではありませんが,対象とするアプリケーションがこれらのカテゴリに適合しなければ,アプリケーションを無意味に複雑化するだけの可能性もある点に注意してください。
リアクティブAPIには何が期待できるのか?
最初の質問への回答で,自身のアプリケーションにとってリアクティブプログラミングが役に立つと判断したならば,APIの設計に入ります。まず最初は,リアクティブAPIが返すべきプリミティブが何かを決めるとよいでしょう。
Javaの世界におけるリアクティブフレームワークはすべて(Java 9の |
Project Reactorには,すべての基盤となる主要な型が2つあります。システムを流れる0からNの値を表すFlux<T>
型と, 0から1の値を表現するMono<T>
型です。Java Clientの内部では,単一要求,単一応答モデルに明確に対応するMono
をおもに使用しています。
Flux<Application> listApplications() {...}
Flux<String> listApplicationNames() {
return listApplications()
.map(Application::getName);
}
void printApplicationName() {
listApplicationNames()
.subscribe(System.out::println);
}
この例では,listApplications()
メソッドがネットワークコールを実行して,0からN個のApplication
インスタンスのFlux
を返します。その後で.map()
演算子を使って,各Application
をその名前のString
に変換しています。生成されたアプリケーション名のFlux
は,コンソールへの出力に使用されます。
Flux<Application> listApplications() {...}
Mono<List<String>> listApplicationNames() {
return listApplications()
.map(Application::getName)
.collectList();
}
Mono<Boolean> doesApplicationExist(String name) {
return listApplicationNames()
.map(names -> names.contains(name));
}
Mono
はFlux
のようなフローを持つことはしませんが,概念的には1アイテムのフローであるため,使用されている演算子の名前もほぼ同じです。この例では,アプリケーション名のFlux
にマッピングした後に,それらの名前をひとつのList
にまとめています。 そのList
を格納したMono
は,この例では,名前がその中に含まれているかどうかを示すBoolean
に変換することができます。直感的ではないかも知れませんが,対象とするものがアイテムのストリームではなく論理的なコレクションである場合には,この例のようにコレクションのMono
(Mono<List<String>>
のように)を返すのが一般的です。
命令型APIとは異なり,リアクティブでの戻り値としてvoid
は使用できません。すべてのメソッドは,Flux
かMono
のいずれかを返さなくてはならないのです。これは奇妙に思えるかも知れません(何も返さないビヘイビアは実際にありますから!)が,リアクティブフローの基本オペレーションの処理結果なのです。リアクティブAPI(flatMap().map()
など)を呼び出すコードを実行することで,データを流すフレームワークが構築されますが,その時点ではデータは変換されません。その終端で.subscribe()
がコールされた時に,データがフローを通じて移動し始めて,その過程のオペレーションによって変換されるのです。リアクティブプログラミングがラムダ上に構築され、常に値を返す理由は、この遅延処理にあります — .subscribe()
するものが常に存在しなければならないのです。
void delete(String id) {
this.restTemplate.delete(URI, id);
}
public void cleanup(String[] args) {
delete("test-id");
}
命令型でブロックを伴う上記の例では、ネットワークコールの実行が直ちに開始されて、応答を受信するまで戻らないため、void
を返すことが可能です。
Mono<Void> delete(String id) {
return this.httpClient.delete(URI, id);
}
public void cleanup(String[] args) {
CountDownLatch latch = new CountDownLatch(1);
delete("test-id")
.subscribe(n -> {}, Throwable::printStackTrace, () -> latch::countDown);
latch.await();
}
こちらのリアクティブな例では、delete()
が戻った後に.subscribe()
がコールされるまで、ネットワークコールは開始されません。コールするのはフレームワークであって、コールの結果それ自体ではないからです。このケースでは、void戻り型と等価な0個のアイテムを返すMono<Void>
を使うことで、応答を受け取った後でのみonComplete()
を通知することができます。
メソッドのスコープ
APIが何を返すべきかを決めたならば、次は各メソッド(APIと実装)が何をするかを見極める必要があります。Java Clientの開発を通じて、私たちは,コンパクトで再利用可能なメソッド設計の持つメリットに気付きました。このような設計にすることで、それぞれのメソッドを組み立てて、より規模の大きなオペレーションにすることが簡単になるとともに、並列処理や順次操作としてもフレキシブルに組み合わせることができます。複雑なフローをはるかに読みやすくする、という潜在的なメリットもあります。
Mono<ListApplicationsResponse> getPage(int page) {
return this.client.applicationsV2()
.list(ListApplicationsRequest.builder()
.page(page)
.build());
}
void getResources() {
getPage(1)
.flatMapMany(response -> Flux.range(2, response.getTotalPages() - 1)
.flatMap(page -> getPage(page))
.startWith(response))
.subscribe(System.out::println);
}
この例では、ページ分割APIの呼び出し方を示しています。最初のgetPage()
要求は,処理結果の先頭ページを取得するものです。先頭ページには、処理結果をすべて取り出す上で必要なページ総数が含まれています。getPage()
メソッドが小型で再利用可能、かつ副作用を持たないため、同じメソッドを使って、2ページ目からtotalPages
までを並列的に呼び出すことができるのです!
シーケンシャルおよびパラレルコーディネーション
今日では、重大なパフォーマンス改善のほとんどすべてが並行性に頼って実現されています。私たちはそれを知っていますが、いまだ多くのシステムがまったく並列性を持たないか、あるいは外部からの接続に関してのみ並列であるのが現状です。このような状況の多くは、高度な並列性を備えたシステムが難しく、エラーを起こしやすいという事実に遡ることができます。リアクティブプログラミングの大きなメリットのひとつは、オペレーション間の順次および並列関係の定義が可能で、フレームワークがリソースの最適な利用方法を決定してくれることです。
先程の例をもう一度見てください — 最初のgetPage()
呼び出しは、以降の各ページの呼び出しよりも順序として前に行われることが保証されています。さらに,このgetPage()
の順次呼び出しは.flatMapMany()
内で実行されるため、それぞれの実行をマルチスレッドで最適化し、結果を取りまとめ、発生したエラーを伝搬するのはフレームワークの役目になります。
条件付きロジック
命令型プログラミングとは異なり、リアクティブプログラミングではエラーも値とみなされます。つまり、エラーもフロー処理を通過するということです。コンシューマに渡すことも、それに基づいてビヘイビアを変更することも可能です。このビヘイビアの変更は、エラーの変換や、あるいはエラーに基づいた新たな結果の生成として現れます。
public Mono<AppStatsResponse> getApplication(GetAppRequest request) {
return client.applications()
.statistics(AppStatsRequest.builder()
.applicationId(request.id())
.build())
.onErrorResume(ExceptionUtils.statusCode(APP_STOPPED_ERROR),
t -> Mono.just(AppStatsResponse.builder().build()));
}
この例では、実行中のアプリケーションの統計情報(statistics)を取得する要求が行われます。すべてが期待通りに動作すれば、応答がコンシューマに返されますが,(特定のステータスコードによって)エラーを受信した場合には、空の応答が返されます。コンシューマはエラーを感知することなく、何事もなかったかのように、デフォルト値で処理を継続するのです。
前述のように、フローは何も送信せずに完了することが可能です。これはしばしば、null
を返すことと等価です(戻り型としてのvoid
はその特殊なケースです)。エラーの場合と同じく、結果を持たないこの完了も、コンシューマに渡したり、あるいはそれに基づいてビヘイビアを変更したりすることができます。
public Flux<GetDomainsResponse> getDomains(GetDomainsRequest request) {
return requestPrivateDomains(request.getId())
.switchIfEmpty(requestSharedDomains(request.getId()));
}
この例でのgetDomains()
は、2つのバケットのいずれかに存在するドメインを返します。最初にプライベートドメイン(PrivateDomains)が検索され、それが正常終了し、かつ結果が何もない場合には、共有ドメイン(SharedDomains)が検索されます。
public Mono<String> getDomainId(GetDomainIdRequest request) {
return getPrivateDomainId(request.getName())
.switchIfEmpty(getSharedDomainId(request.getName()))
.switchIfEmpty(ExceptionUtils.illegalState(
"Domain %s not found", request.getName()));
}
アイテムのないことがエラー状態を表すケースもあり得ます。こちらの例では、プライベートドメインと共有ドメインで見つからない場合には、IllegalStateException
が新たに生成されてコンシューマに渡されます。
エラーや空ではなく、値そのものに基づいて判断したい場合もあると思います。演算子を使ってこのロジックを実装することも可能ですが、必要以上に複雑になることが少なくありません。このような場合は,命令型の条件文を使用した方がよいでしょう。
public Mono<String> getDomainId(String domain, String organizationId) {
return Mono.just(domain)
.filter(d -> d == null)
.then(getSharedDomainIds()
.switchIfEmpty(getPrivateDomainIds(organizationId))
.next() // select first returned
.switchIfEmpty(ExceptionUtils.illegalState("Domain not found")))
.switchIfEmpty(getPrivateDomainId(domain, organizationId)
.switchIfEmpty(getSharedDomainId(domain))
.switchIfEmpty(
ExceptionUtils.illegalState("Domain %s not found", domain)));
}
この例では、指定された組織(階層型コンテナ)内から、指定された名称のドメインのidを返します。ただし例外がひとつあります — 引数domainがnull
の場合は、その組織のスコープに対応する共有ドメインあるいはプライベートドメインの先頭のidを返すのです。domainがnull
でなければ、明示的なドメイン名を検索して、そのidを返します。このコードが難しく思えてもがっかりしないでください — 私たちも同じですから!
public Mono<String> getDomainId(String domain, String organizationId) {
if (domain == null) {
return getSharedDomainIds()
.switchIfEmpty(getPrivateDomainIds(organizationId))
.next()
.switchIfEmpty(ExceptionUtils.illegalState("Domain not found"));
} else {
return getPrivateDomainId(domain, organizationId)
.switchIfEmpty(getSharedDomainId(domain))
.switchIfEmpty(
ExceptionUtils.illegalState("Domain %s not found", domain));
}
}
同等な例ですが、命令型の条件文を使用しています。こちらの方がずっと分かりやすい、そう思いませんか?
テスト
実際問題として、有用なフローのほとんどは非同期なものです。これがテスト時に問題となります。テストフレームワークでは同期処理が多用されていて、結果が非同期に返されるよりずっと前に、合格あるいは不合格が記録されてしまうからです。これを補うためには、結果が返るまでメインスレッドをブロックしておいて、その結果をメインスレッドに移して検証することが必要になります。
@Test
public void noLatch() {
Mono.just("alpha")
.subscribeOn(Schedulers.single())
.subscribe(s -> assertEquals("bravo", s));
}
この例では、メインでないスレッドの返すString
が期待に反してパスしてしまいます。明らかにパスしないはずのこのテストがパスしてしまう根本原因は、noLatch
メソッドがAssertionError
をスローせずに完了することにあります。
@Test
public void latch() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> actual = new AtomicReference<>();
Mono.just("alpha")
.subscribeOn(Schedulers.single())
.subscribe(actual::set, t -> latch.countDown(), latch::countDown);
latch.await();
assertEquals("bravo", actual.get());
}
この例は多少不格好ですが、CountDownLatch
を使って、フローが完了するまでlatch()
メソッドが戻らないようにしています。latchがリリースされると、メインスレッドでアサーションが実行され、AssertionError
がスローされてテストがフェールします。
このコードを見て、すべてのテストをこんな風に実装しなくてはならないのは勘弁してほしい、と思ったかも知れません — 私たちも同じです。幸いにもReactorには、テストの実装を容易にするStepVerifier
クラスが提供されています。
リアクティブデザインのテストに必要なのは、ブロッキングだけではありません。複数の値と予想されるエラーについて検証すると同時に、予期しないエラーに対してテストをフェールさせる必要のある場合がよくあります。StepVerifier
はこれらそれぞれに対処しています。
@Test
public void testMultipleValues() {
Flux.just("alpha", "bravo")
.as(StepVerifier::create)
.expectNext("alpha")
.expectNext("bravo")
.expectComplete()
.verify(Duration.ofSeconds(5));
}
この例ではStepVerifier
を使用して、“alpha
”と“bravo
”が確実に送出された後にフローが完了することを期待しています。いずれかが送出されないか、他の要素が送出されるか、あるいはエラーが発生した場合、テストはフェールします。
@Test
public void shareFails() {
this.domains
.share(ShareDomainRequest.builder()
.domain("test-domain")
.organization("test-organization")
.build())
.as(StepVerifier::create)
.consumeErrorWith(t -> assertThat(t)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Private domain test-domain does not exist"))
.verify(Duration.ofSeconds(5));
}
この例はStepVerifier
の最も高度な機能を使用して、エラーの通知だけでなく、それがIllegalArgumentException
であることや、メッセージが所定の内容であることも検証しています。
CountDownLatch
リアクティブフレームワークで注意しなくてはならない重要な点のひとつは、それ自身のオペレーションとスレッドモデルのみが管理対象である,ということです。リアクティブプログラミングの実行環境の多くは、個々のスレッドよりも長く継続します(サーブレットコンテナのように)。このような環境下であれば、リアクティブプログラミングの非同期性が問題になることはありません。しかしながら、前述のテストの例のように、個々のスレッドよりも前にプロセスが終了する環境も一部にはあります。
public static void main(String[] args) {
Mono.just("alpha")
.delaySubscription(Duration.ofSeconds(1))
.subscribeOn(Schedulers.single())
.subscribe(System.out::println);
}
このmain()
メソッドはテストメソッドの場合と同じく,“alpha
”が送出される前に終了してしまいます。
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Mono.just("alpha")
.delaySubscription(Duration.ofSeconds(1))
.subscribeOn(Schedulers.single())
.subscribe(System.out::println, t -> latch.countDown(),
latch::countDown);
latch.await();
}
こちらもテストの例と同じで、CountDownLatch
を使用することにより、実行中のスレッドが何であるかに関わらず、フローが終了する前にメインスレッドが終了しないようにすることができます。
ブロッキングフロー
現在,そして今後しばらくの間は、リアクティブプログラミングでもブロッキングAPIを操作することがごく一般的に行われるでしょう。この2つを橋渡しするには、結果を待つ間ブロックする方法が適当です。しかしながら、このやり方でブロッキングAPIにブリッジしてしまうと、リソースの有効利用など、リアクティブプログラミングのメリットの一部が失われることになります。これを避けるには、自分のコードを可能なかぎり長くリアクティブに保っておいて、ブロッキングを最小に留めることが必要です。この考え方の論理的な結論として、リアクティブAPIをブロックすることは可能だが、ブロッキングAPIをリアクティブにすることはできない、という点にも注目すべきです。
Mono<User> requestUser(String name) {...}
User getUser(String name) {
return requestUser(name)
.block();
}
この例では、Mono
の単一の結果から命令型の戻り値への橋渡しに.block()
を使用しています。
Flux<User> requestUsers() {...}
List<User> listUsers() {
return requestUsers()
.collectList()
.block();
}
前の例と同じように、命令型の戻り値への橋渡しに.block()
が使用されていますが、その前にFlux
を単一のList
に集める(collectList)必要があります。
エラー処理
前述のように、エラーはシステムを流れる値です。 これはつまり、例外をキャッチする適切なポイントが存在しない,ということでもあります。そうではあっても、エラーはフローの一部として、あるいはサブスクライバ(subscriber)として処理しなければなりません。.subscribe()
メソッドには0個から3個のパラメータがあり,到着したアイテムや発生したエラー,フロー完了の処理が可能です。
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Flux.concat(Mono.just("alpha"), Mono.error(new IllegalStateException()))
.subscribe(System.out::println, t -> {
t.printStackTrace();
latch.countDown();
}, latch::countDown);
latch.await();
}
この例では,値とエラーの両方がサブスクライバに渡されます。CountDownLatch
を使う場合には,onError()
あるいはonComplete
のいずれか一方のみがコールされる点を忘れてはなりません。したがって,エラーの場合もそうでない場合でも,ラッチを解放する必要があります。
構成可能なメソッド参照
ラムダに大きく依存しているプログラミングモデルが“コールバック地獄”に陥りがちなのは,想像に難くないと思います。しかしながら,少々の規律とメソッド参照さえあれば,それは避けられるのです。道理の分かったRuby開発者ならば,可読性に関しては小さな(1行でも!)プライベートメソッドに大きな価値がある,と教えてくれるでしょう。メソッドに適切な名前を付けて,メソッド参照構文を使用すれば,非常に読みやすいフローを作ることができます。
public Flux<ApplicationSummary> list() {
return Mono
.zip(this.cloudFoundryClient, this.spaceId)
.flatMap(function(DefaultApplications::requestSpaceSummary))
.flatMapMany(DefaultApplications::extractApplications)
.map(DefaultApplications::toApplicationSummary);
}
この例のフローは,非常に読みやすくなっています。Flux<ApplicationSummary>
を取得するには,まずcloudFoundaryClient
にspaceId
を渡します。これを使ってスペースサマリ(space summary)を要求し,そのスペースサマリからアプリケーションを抽出して,それぞれのアプリケーションをアプリケーションサマリにマップします。個々のオペレーションについては,それがどのように振る舞うのか分かりませんが,この時点では知る必要もありません。必要ならばIDEがそれぞれのメソッド参照をトラバースしてくれますが,このコードならば実装内容を理解するのにさほど苦労はしないでしょう。
ポイントフリースタイル
ここまで読んで,非常にコンパクトなスタイルが使用されていることに気づかれたかも知れません。この様式はポイントフリースタイル(Pointfree Style)と呼ばれるものです。その大きなメリットは,開発者がデータのシャッフル(低レベルな概念)ではなく,機能の構築(高レベルな概念)に専念できる点にあります。リアクティブなプログラムを書く上でこれが絶対条件だと言うつもりはありませんが,多くの人が(最終的には)このスタイルを好んでいるようです。
Mono<Void> deleteApplication(String name) {
return PaginationUtils
.requestClientV2Resources(page -> this.client.applicationsV2()
.list(ListApplicationsRequest.builder()
.name(name)
.page(page)
.build()))
.single()
.map(applicationResource -> applicationResource.getMetadata().getId())
.flatMap(applicationId -> this.client.applicationsV2()
.delete(DeleteApplicationRequest.builder()
.applicationId(applicationId)
.build()));
}
この例を見れば,変数が割り当てられている場所の大部分や結果が返される場所をイメージすることができて,従来の命令型コードのような印象を受けるのではないかと思います。しかしながら,これで可読性が高まったとは言いがたく,逆に括弧やセミコロン,等号,return文などが増えたことで,データの出所や行き先は明確になるものの,フロー自体の実際のポイントは分かりにくくなっています。
リアクティブプログラミングは巨大なテーマで,ほとんどすべての人たちが使い始めています。現時点では,リアクティブコードを書く上で,これは“間違い”だと言えるものはほとんどありませんが,それと同時に選択肢も多く,何から手を付けるべきなのか,多数の開発者を混乱させています。大規模プロジェクトでの経験から生まれた私たちの意見が,読者のみなさんのリアクティブな旅のお役に立てればと思います。この最先端技術をご自身で体験し,そこから得た発見でコミュニティに貢献することを期待しています。
Ben Hale氏はCloud FoundryのJava Experienceチームのリーダとして,Cloud Foundry上で動作するJavaアプリケーションに関わるエコシステムを担当しています。
Paul Harris氏はPivotalでCloud Foundry Java Clientの開発リーダを務め,Cloud Foundryのオーケストレーションと管理を行うJavaアプリケーションの開発に携わっています。