サンプルアプリケーションで学ぶ Apache Cassandraを使った Javaアプリケーションの作り方 JJUG CCC 2017 Fall 森下 雄貴 (DataStax Japan合同会社)
スピーカー 森下 雄貴 (yuki@datastax.com) Solution Architect (ときどきSoftware Developer) @ DataStax Apache Cassandraコミッター 2
• スケーラブルなJavaアプリケーション を作りたい! • Apache Cassandra™ を使ったサンプ ルアプリケーションがあることを知っ てもらう。 • アーキテクチャの参考に • フレームワークやドライバーの使 い方の参考に • データモデリングの参考に 3 今日の目標
アプリケーションの変遷 4 クライアント/ サーバー クラウド 1990年代 現在 Web 2000年代
それを支えるデータベース 5 スケールアウトするアプリケーション層 スケールアウトするデータ層
6
The Apache Cassandra database is the right choice when you need scalability and high availability without compromising performance. 7
8 スケーラブルなアプリケーションを作らないといけない。 Apache Cassandraを使えばいけそうだ。 でも、参考になるソースコードないの?
あります! 9 http://killrvideo.com/
KillrVideo 10 https://killrvideo.github.io/
KillrVideo Apache Cassandra™ / DataStax Enterprise リファレンスアプリ ケーション – https://killrvideo.github.io – オープンソース (APLv2) – スケーラブルなマイクロサービスアーキテクチャ • サービスは Node.js/C#/Java の各言語で実装 – データベース層はDSE / Apache Cassandra • データモデリングのサンプル • ドライバーの利用サンプル 11
DataStax Enterprise Apache Cassandra を中核にした製品 • サポート • プロフェッショ ナルサービス • トレーニング • 開発用に無償で 利用可能
KillrVideo 各層をスケーラブルに(できるよう想定) 13 KillrVideo Web アプリケーション KillrVideo マイクロサービス Apache Cassandra / DataStax Enterprise
KillrVideo Java版デモ環境 https://github.com/killrvideo/killrvideo-java 14 KillrVideo Web アプリケーション KillrVideo マイクロサービス Apache Cassandra / DataStax Enterprise 1ノード etcd: サービスディスカバリー サンプルデータ生成アプリ
デモ環境を起動する killrvideo-java> ./lib/killrvideo-docker-common/create-environment.sh # もしくはWindowの場合、create-environment.ps1 # docker-composeに必要な環境変数を記述した.envファイルを作成する killrvideo-java> docker-compose up -d registrator # サービスレジストリの起動 killrvideo-java> docker-compose up –d dse # DataStax Enterpriseの起動 # 初回起動の場合は、スキーマの作成等を行うため時間がかかる # docker-compose logs --tail=10 –f dseでログを確認し、"DSE startup complete."がでるまで待ちましょう 15
デモ環境を起動する killrvideo-java> KILLRVIDEO_DOCKER_IP=172.xx.0.1 KILLRVIDEO_HOST_IP=xxx.xxx.xxx.xxx mvn spring-boot:run # 環境変数を設定してSpring Bootアプリケーションを実行 killrvideo-java> docker-compose up -d web # Web層の起動 # http://<KILLRVIDEO_HOST_IP>:3000 にアクセス killrvideo-java> docker-compose up –d generator # サンプルデータ生成アプリを起動 # しばらくするとデータが見れるようになるはず 16
Web 17
Web Node.jsで実装 (https://github.com/KillrVideo/killrvideo-web) クライアント – React + Redux + Falcor(データフェッチ) サーバー – Express – セッション管理にCassandraを利用 • デモではマイクロサービスと同一のクラスター • 実際に利用する場合は専用のクラスターを用意 – ユーザー認証はマイクロサービスに問い合わせ 18
Web もしJavaでやるとしたら… spring-session – 自前で SessionRepository を実装 – https://github.com/honorem/spring-session-cassandra 参考 Apache Shiro – 自前で SessionDAO を実装 – https://github.com/lhazlewood/shiro-cassandra-sample 参考 19
マイクロサービス 20
マイクロサービスの実装 - gRPCを利用 (https://grpc.io/) – プログラミング言語に非依存でサービスの記述 – 様々なプログラミング言語をサポート • Java / Python / C++ / Go / Ruby … • KillrVideoではJava / Node.js / C# でサービスを実装 - Apache Cassandraへのアクセスにドライバーの様々な機能を 利用 – ステートメントの非同期実行 – データマッパーの利用 – バッチ実行 - サービス間の連携にイベントバスを利用 - デモ用に一つのプロセスに複数のサービスを定義しているので、 GuavaのEventBusを利用 - プロセスを分ける場合はApache Kafkaなどを検討 21
gRPCとは? - Google の内部で利用していたRPCフレームワーク(Stubby) を もとにオープンソース - Protocol Buffer v3を利用し、プログラミング言語に非依存なイ ンターフェース定義 - HTTP/2をもとにした効率の良い通信と、双方向ストリーミン グの実現 - 認証やトレースなどもプラグイン可能 22
gRPCサービスの定義 (./lib/killrvideo-service-protos/src 以下) // Manages comments service CommentsService { // Add a new comment to a video rpc CommentOnVideo(CommentOnVideoRequest) returns (CommentOnVideoResponse); // Get comments made by a user rpc GetUserComments(GetUserCommentsRequest) returns (GetUserCommentsResponse); // Get comments made on a video rpc GetVideoComments(GetVideoCommentsRequest) returns (GetVideoCommentsResponse); } 23
gRPCサービスの定義 (./lib/killrvideo-service-protos/src 以下) // Get a page of comments made by a specific user message GetUserCommentsRequest { killrvideo.common.Uuid user_id = 1; int32 page_size = 2; killrvideo.common.TimeUuid starting_comment_id = 3; string paging_state = 16; } // Response when getting a page of comments made by a user message GetUserCommentsResponse { killrvideo.common.Uuid user_id = 1; repeated UserComment comments = 2; string paging_state = 3; } 24
スタブの自動生成 - Mavenプラグインを用いてIDL(インターフェース定義言語)か らJavaのスタブコードを自動生成 25 <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.5.0</version> <configuration> <protoSourceRoot>${basedir}/lib/killrvideo-service-protos/src</protoSourceRoot> <protocArtifact>com.google.protobuf:protoc:3.0.0:exe:${os.detected.classifier}</protocArtifact > <pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpc- java:0.14.0:exe:${os.detected.classifier}</pluginArtifact> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin>
サービスの実装 - target/generated-sources 以下に自動生成されたクライアント とサービスインターフェースのJavaコードが出力される - 基本はStreamObserverを利用した非同期呼び出し - クライアントは非同期実行、同期実行、Futureを利用する非同 期実行の3種類が生成される 26 // CommentServiceGrpc.java public static interface CommentsService { public void commentOnVideo( killrvideo.comments.CommentsServiceOuterClass.CommentOnVideoRequest request, io.grpc.stub.StreamObserver<killrvideo.comments.CommentsServiceOuterClass.CommentOnV ideoResponse> responseObserver); … }
SpringからgRPCサーバーを起動 KillrVideo-javaの場合: - コンポーネントの一つとしてGrpcServerを定義 – @PostConstructでサービスの登録とサーバー作成、スタート - そのままではすぐ終了してしまうので、ThreadFactoryを用い てデーモンスレッドを生成 – KillrVideoThreadFactoryクラス - JVMシャットダウンフックでサーバー停止 27
SpringからgRPCサーバーを起動 他の方法: - スレッドを立ち上げて Server#awaitTermination() を呼び出す - https://github.com/LogNet/grpc-spring-boot-starter を使う – やっていることは基本的に上と同じ 28
Apache Cassandraに接続する ドライバーのセットアップ – Apache Cassandra用 • DataStax Java driver for Apache Cassandra • https://github.com/datastax/java-driver • APLv2 – DataStax Enterprise用 • Java driver for DataStax Enterprise • https://github.com/datastax/java-driver-dse • ライセンスは独自(DSEにしか接続できない) • DSE固有の機能を使うためにJava Driverを拡張 – 外部認証、グラフデータベースなど 29
Apache Cassandraに接続する java-driver と java-driver-dse の基本的な使い方は一緒 接続するために使うクラスが違う – Cluster (java-driver) VS DseCluster (java-driver-dse) – Session (java-driver) VS DseSession (java-driver-dse) 30
Apache Cassandraに接続する - Apache Cassandra用ドライバーのダウンロード - Group ID: com.datastax.cassandra - Artifact IDs: - cassandra-driver-code - 本体 - cassandra-driver-mapping - オブジェクトマッパー (オプション) - cassandra-driver-extra - 追加の型マッピング (オプション) 31
Apache Cassandraに接続する DSEConfigurationクラスでクラスタへ接続し、セッションオブ ジェクトを作成 32 Builder clusterConfig = new Builder(); clusterConfig.addContactPoints(cassandraHosts) .withPort(cassandraPort) .withClusterName(CLUSTER_NAME); … DseCluster dseCluster = clusterConfig.build(); // java-driverの場合はClusterオブジェクト return dseCluster.connect(); // DseSessionオブジェクトを返す
Apache Cassandraに接続する クラスタービルダー([Dse]Cluster.Builder)の役割 – 最初に接続するノード(IPアドレスとポート)を指定 • このノードからクラスターの情報をドライバー側に得る – クラスタービルダーを通じて様々な設定を行う • 認証 • ロードバランス • リトライ • コネクションプーリング – 実際にステートメントを実行するセッションオブジェクト([Dse]Session) を生成 クラスターとセッションのオブジェクトは、アプリケーションに 一つあれば十分 33
Apache Cassandraに接続する コネクションプーリング – セッションオブジェクトがコネクションプールを持つ – 基本的にクラスター内の起動しているノードすべてに対してコネクション を確立 – 最近のCassandraは一つのクライアントコネクションで最大32kのリク エストを行うことができるため、デフォルトでコネクションプールの設定 は1 34 Cluster Session Pool Connection Request 1 n 1 n 1 n 1 32k
Cassandraの データモデリング 35
データモデリングの原則 データを知る クエリを知る 非正規化 – データをネストする – データを重複して持つ 36
なぜ? クエリがスキーマデザインを決める – クエリの変更はスキーマの変更を伴う可能性が高い Apache Cassandraが得意なアクセスパターンは限られる – 単一パーティションへのクエリー ◎ – 少数のパーティションへのクエリー 〇 – テーブルスキャン × – 複数テーブルへのアクセス × – Cassandraはクラスター運用が前提。テーブルスキャンは全ノードへの アクセスにつながる 37
例: コメントサービス コメントサービスの機能概要 – ユーザーが動画に書くコメントを登録 – 各動画に書かれたコメントを新着順に返す – あるユーザーが投稿したコメントの一覧を返す 38
コメントサービスのモデリング 39 Q2Q1 Q1: ユーザーID をもとに動画のコメントを 投稿順(新しいものを先) に取得 Q2: ビデオID をもとにユーザーからのコメントを 投稿順(新しいものを先) に取得 comments_by_user comments_by_video userId commentId videoId comment videoId commentId userId comment K C↑ K C↑
コメントサービスのモデリング CQL DDL 40 CREATE TABLE comments_by_video ( videoid uuid, commentid timeuuid, userid uuid, comment text, PRIMARY KEY (videoid, commentid) ) WITH CLUSTERING ORDER BY (commentid DESC); CREATE TABLE comments_by_user ( userid uuid, commentid timeuuid, videoid uuid, comment text, PRIMARY KEY (userid, commentid) ) WITH CLUSTERING ORDER BY (commentid DESC); Q1: SELECT commentid, videoid, comment FROM comments_by_user WHERE userid = ? Q2: SELECT commentid, userid, comment FROM comments_by_video WHERE videoid = ?
詳しくは RDB開発者のためのApache Cassandraデータモデリング入門 https://www.slideshare.net/yukim/rdbapache-cassandra 41
ドライバーの利用 42
バッチ登録 コメントを登録する – コメントが登録されるとふたつのテーブルに登録しなければならない • バッチ機能を利用してINSERT 43 PreparedStatement commentsByUserPrepared = dseSession.prepare( "INSERT INTO killrvideo.comments_by_user (userid, commentid, comment, videoid) VALUES (?, ?, ?, ?)" ).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); … BoundStatement bs1 = commentsByUserPrepared.bind(userId, commentId, comment, videoId); BoundStatement bs2 = commentsByVideoPrepared.bind(videoId, commentId, comment, userId); … final BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.LOGGED); batchStatement.add(bs1); batchStatement.add(bs2); batchStatement.setDefaultTimestamp(now.getTime()); FutureUtils.buildCompletableFuture(dseSession.executeAsync(batchStatement)) …
クエリビルダーの利用 コメントを取得する – ステートメントの組み立て 44 PreparedStatement getVideoComments_startingPointPrepared = dseSession.prepare( QueryBuilder.select() .column("video_id") .column("comment_id") .column("user_id") .column("comment") .fcall("toTimestamp", QueryBuilder.column("comment_id")) .as("comment_timestamp") .from("killrvideo", "comments_by_video") .where(QueryBuilder.eq("video_id", QueryBuilder.bindMarker())) .and(QueryBuilder.lte("comment_id", QueryBuilder.bindMarker())) ).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
ページング コメントを取得する – 取得件数の指定とページング 45 final Optional<String> pagingState = Optional.ofNullable(request.getPagingState()) .filter(StringUtils::isNotBlank); … statement = getUserComments_startingPointPrepared.bind() .setUUID("userid", fromString(userId.getValue())) .setUUID("commentid", fromString(startingCommentId.getValue())); … statement.setFetchSize(request.getPageSize()); pagingState.ifPresent( x -> statement.setPagingState(PagingState.fromString(x))); … Optional.ofNullable(commentResult.getExecutionInfo().getPagingState()) .map(PagingState::toString) .ifPresent(builder::setPagingState);
実行結果の取得 コメントを取得する – クエリの実行結果の取得 – 自動ページング(デフォルトページサイズ: 5000) 46 FutureUtils.buildCompletableFuture(dseSession.executeAsync(statement)) .handle((commentResult, ex) -> { … int remaining = commentResult.getAvailableWithoutFetching(); for (Row row : commentResult) { CommentsByUser commentByUser = new CommentsByUser( row.getUUID("userid"), row.getUUID("commentid"), row.getUUID("videoid"), row.getString("comment") ); commentByUser.setDateOfComment( row.getTimestamp("comment_timestamp")); builder.addComments(commentByUser.toUserComment()); if (--remaining == 0) break; } …
データマッパー エンティティクラスの準備 47 @Table(keyspace = KEYSPACE, name = "video_ratings") public class VideoRating { @PartitionKey private UUID videoid; @Column(name = "rating_counter") private Long ratingCounter; @Column(name = "rating_total") private Long ratingTotal; … }
データマッパー マッパーの準備 48 // DSECongifuration クラス @Bean public MappingManager getMappingManager(DseSession session) { return new MappingManager(session); } // MappingConfiguration クラス @Bean public Mapper<VideoRating> videoRatingMapper() { return manager.mapper(VideoRating.class); }
データマッパー マッパーの利用 49 FutureUtils.buildCompletableFuture(videoRatingMapper.getAsync(videoId)) .handle((ratings, ex) -> { if (ex != null) { … responseObserver.onError(…); } else { if (ratings != null) { responseObserver.onNext((ratings.toRatingResponse())); } else { responseObserver.onNext(…); } responseObserver.onCompleted(); } return ratings; });
アプリケーションの実装 その他の実装サンプル – DSEの機能を利用して • グラフDBにデータを格納して、リアルタイムレコメンデーションを 行うグラフクエリを実行 • 全文検索インデックスを利用した検索(node.js版) 50
まとめ 51
まとめ https://killrvideo.github.io - マイクロサービスを利用したスケーラブルなアプリケーション を作るときに参考になるソースコードがあります! – アーキテクチャの参考に – フレームワークやドライバーの使い方の参考に – データモデリングの参考に なります。 52

サンプルアプリケーションで学ぶApache Cassandraを使ったJavaアプリケーションの作り方

Editor's Notes

  • #7 2009年よりApache Software Foundationにおいて開発されている分散データベース 現時点での最新バージョンはv3.11.1
  • #12 KillrVideoの説明 DSEをデータベースとして使っているが、Apache Cassandraがコア
  • #17 デモ
  • #39 コメント機能デモ