#!/usr/bin/env python # coding: utf-8 # # Akka Streamsへ移行しとるんやけど # # ちょっと聞いてや # # [@grimrose](https://twitter.com/grimrose) # # @[Scala関西 Summit2017](http://summit.scala-kansai.org/) # # 2017-09-08 # ### 今日話すこと # # 現在もともとPHPで作られていた集計業務システムをScalaへ置き換えています。 # # チームが成熟したのを機に、まずバッチ処理をAkka Streamsに移行し始めました。 # # また、Akka HTTPを導入してPHPが持っていたAPIのサービスも一部移行を始めています。 # # Akka Streamsを導入してどう変わったのか、ハマったところ等を紹介します。 # ### お前、誰よ # # * よしだ # * twitter: [@grimrose](https://twitter.com/grimrose) # * github: [grimrose](https://github.com/grimrose) # * とある人材紹介会社 # * マーケティング部門のデータエンジニア(自称) # * 好きなIDE: IntelliJ IDEA # * Scala歴: 2年(2015~) # * [ScalaMatsuri](scalamatsuri.org) 2016, 2017 スタッフ # ### 普段の仕事 # # * 事業KPIの可視化を行うためのWebアプリケーションの設計から運用 # * Scalaを使って日次や月次の集計バッチを実行するアプリケーションの作成 # * 帳票や外部APIなどからデータベースへデータを投入するツールの作成 # * 部門の日常業務を改善するためのちょっとしたツールの作成 # ## 移行の話をする前に # ### プロジェクトの目標 # # **事業KPIを集計して可視化する** # # ### プロジェクトの概要 # # * 始まりは2013年 # * もともとはExcelでやっていた # * 「KPIのデータを出社してから見られるようにしたい」 # # ### プロジェクトの概要 # # # 必要なデータをかき集める(人力で) # # * 社内の基幹システムから # * 外部サービスから # * ぬくもりのあるExcelから # # ### プロジェクトの概要 # # 導入時(約4年前)に採用した技術 # # * PHP 5.5 # * FuelPHP 1.7 # * MySQL 5.6 # * [TypeScript](https://www.typescriptlang.org/) # * [AngularJS](https://angularjs.org/) # * [Wijmo](http://wijmo.com/) # # **出社時に誰もがブラウザから見られるようになった** # ### プロジェクトの課題 # # # **見られるようになったので、やりたいことが増えた** # # * 見たい範囲の拡大 # # * 集計業務の拡大 # * 取り込みが必要なデータ量の増加 # # ** 集計時間の増加 -> 間に合わない** # **このままでは出社時に見られなくなってしまう** # **どうしたらいいんだ** # 短縮できそうな箇所を洗い出し # # 選んだのは「**データの取り込み処理**」 # # 集計に必要な社内の基幹システムのデータの取り込みを短縮する # # 逐次処理をしていた箇所を、並行で行うことで短縮を図る # # * 2015年、並行処理が書きやすい**Scala**を導入 # ### Scalaの導入の前に # # [とあるマーケティング部隊でのembulkの活用事例](https://gist.github.com/grimrose/1377fce36840b6ef7536d22dc63e7761) # # * 当初は、embulkで解決しようとした # * 並列でやるには別の仕組みが必要だったため、断念 # * 当時はまだdigdagが無い # * liquidテンプレートもまだだった # # ### Scalaの導入 # # [ScalikeJDBC](scalikejdbc.org) + [Skinny ORM](http://skinny-framework.org/documentation/orm.html)を利用してスクラッチで開発 # # * ScalaのFutureを活用して、並行で処理するように # * PHPで3~4時間 -> 1~2時間で完了するように # # **さらなる改善と時間短縮を求めて** # # そこで、Akka Streamsですよ # ## Akka Streams、どうでしょう # # * ストリーム処理ことはじめ ~ Akka Streams # * グラフを知って理解するAkka Stream # * Scala on Docker(AWS ECS) # #
# 注意 #
# # * 初めからWebアプリとして作っているわけではない # * 集計業務に特化している # * 前提がWebアプリケーションとは異なっている # * 例えば、対象となるデータは特定期間のみ # * 一定時間のみ稼働 # * 複数の集計軸 # * 外部サービスからのETLなど # # ### Akka Streams導入に向けて # # チームのScalaの習熟度が上がってきた # # Akka Streamsの導入に向けて、まずAkkaとは何かから学ぶことに # **Scala + Akka勉強会を週一で開催** # ### Akkaとは何かを学ぶなら # # [Akka in Action](https://www.manning.com/books/akka-in-action) # # * Akka Streamsの章が増えた # * この本を読めば、Akkaの基本となる考え方やAkkaのエコシステムについて知ることが出来る # * 日本語版が出るらしい <- **new!!** # ### Akkaでのメッセージのパターンについて学ぶなら # # [Reactive Messaging Patterns with the Actor Model](https://www.amazon.co.jp/Reactive-Messaging-Patterns-Actor-Model-ebook/dp/B011S8YC5G) # # 通称: RMP本 # # ### [akka-stream を始めるときに覚えておきたいこと](https://speakerdeck.com/tkawachi/akka-stream-woshi-merutokinijue-eteokitaikoto) # ### Akka Streamsを導入する # # 既存の処理とAkka Streamsを繋げるために下記のスライドを参考に、シンプルな制約を策定 # # [DDD + Clean Architecture + UCDOM Full版](https://speakerdeck.com/yoskhdia/ddd-plus-clean-architecture-plus-ucdom-fullban) # # [最新DDDアーキテクチャとAkkaでの実装ヒントについて](https://speakerdeck.com/j5ik2o/zui-xin-dddakitekutiyatoakkadefalseshi-zhuang-hintonituite) # # ```scala # import akka.scaladsl.Flow # # trait BaseFlow[Command, Event] { # # def toFlow: Flow[Command, Event, NotUsed] # # } # ``` # ### 少しづつ「部品」としてのFlowを作っていく # # この制約の基本 # # **Flow**は、**Command**を受け取って**Event**を返す # # * 既存の処理を分析し、「入力」「処理」「結果」を制約に則って**Flow**を作る # # ### Dependency Injection(依存性の注入) # # * 必要な**Repository**、**Service**、**Flow**は[Scaldi]を利用 # # * [Scaldi]を採用したのは、[Skinny](http://skinny-framework.org/documentation/dependency-injection.html)と[scaldi/scaldi-akka](https://github.com/scaldi/scaldi-akka)があったから # # [Scaldi]: http://scaldi.org/ # ```scala # import SomethingProtocol._ # # class DoSomethingBusinessLogic( # repository: SomeRepository # ) extends BaseFlow[DoSomething, DoneSomething] { # # override def toFlow: Flow[DoSomething, DoneSomething, NotUsed] = { # Flow[DoSomething] # .map { cmd => process(cmd.arg1, cmd.arg2) } # .map(toEvent) # } # # def process(arg1: String, arg2: Int): String = { ... } # def toEvent(result: String): DoneSomething = { ... } # } # ``` # 設計の基本は、以下の通り # # * built-inのメソッドを使う # * `Flow`の中でしか利用しない固有の`Flow`は、無理やりclassにはしない # * `Flow`の責務と境界を考える # * 業務ロジックの流れを`Flow.fromGraph`で`FlowShape`を作ってまとめる # # # ### GraphDSLを使う場合 # # [scalafmt](http://scalameta.org/scalafmt/#//format:off)やIntelliJ IDEAでフォーマットされないようにする # # ```scala # // @formatter:off # # // @formatter:on # ``` # **いくつか設計のパターンが見えてくる** # 例えば # * ListやSeqで流れてきたのを、一つづつ後ろへ流したい # # ```scala # Flow[List[Int]] # .flatMapConcat { list => # Source(list) # } # ``` # * 型に応じて流す先を変えたい # # ```scala # val broadcast = Broadcast[DoneSomething](2) # val accepted = broadcast.out(0).mapConcat { # case a: Accepted => a :: Nil # case _ => Nil # } # val rejected = broadcast.out(1).mapConcat { # case r: Rejected => r :: Nil # case _ => Nil # } # ``` # * 必要な要素をまとめて違う型で後ろに流したい # # ```scala # ZipWith[Int, String, Seq[String]] { (i: Int, s: String) => # (1 to i).map(n => s * n) # } # ``` # ### 非同期処理と並行処理 # # * Futureにしていた非同期処理は、`mapAsync`や`async`へ # * Fan-In、Fan-Outを活用して並行処理を行うことが多い # * Futureを使う場合は、Akka Streamsで使う`ExecutionContext`とは別のを用意する # * DBなどを扱うFlowは、blocking用の`ExecutionContext`を用意する # 今のところ自作の **GraphStage** を作る必要性に迫られていない # # が、今後に備えて勉強しておく必要はあると思われる # **GraphStage** を作るときに参考にしたい # # [Alpakka](http://developer.lightbend.com/docs/alpakka/current/) # # ### Flow同士をつなぐ # # 他の`Flow`の`Event`から`Command`へ変換するのは、`Protocol`の役割 # # 変換は、`Flow`または`UseCase`のprivateなメソッドで行うか、`Protocol` objectにpublicなメソッドで行う # # ```scala # object SomethingProtocol { # # case class DoSomething(arg1: String, args2: Int) # case object DoneSomething # # def toDoSomethingCommand(event: OtherContextEvent): DoSomething = { # DoSomething(event.result, 100) # } # } # ``` #
# 困ったときに参考になった #
# # 公式ドキュメント # # http://doc.akka.io/docs/akka/current/scala/stream/index.html # # # GitHub # # https://github.com/akka/akka # # ## Akka Streamsを使ったさらなる改善 # ### [Akka HTTP](http://doc.akka.io/docs/akka-http/current/scala/http/introduction.html)の導入 # ### Akka HTTPを利用したAPIサーバの開発 # # 導入する理由 # # * 現在[Angular](https://angular.io/) 4を試験的に導入しているため、今後はサーバーサイドにViewを持つ必要がなくなっていく # # * 既存のシステムのAPIは画面と密になっているため、他のシステムからAPIを呼ぶのが難しい # # * バッチ処理を呼び出すシステムがJSONを受け付けるようになると、AWS上での連携がしやすくなる # * バッチの処理IDを`202 Accepted`な`HttpResponse`で返すことで、非同期にすることができるようになる # ### Akka HTTPを利用したAPIサーバの開発 # # * 現在、PHPアプリケーションの一部をBFF(Backends for Frontends)として、Akka HTTPのAPIとやり取りを行うようにして稼働中 # # * 結果を返す必要の無いバッチ処理のキックやAPIサーバの死活監視等、直接影響が少ない箇所から導入中 # # * 将来的に、Angular等のフロントエンドアプリケーションからのリクエストは全てAkka HTTPのAPIサーバに任せるようにしたい # ## テストについて # ### テストについて # # DBにアクセスする集計処理がメインなので、SQLの実行は必ず行う # # * MySQL固有のクエリは、scalatestのtagを利用して、他のクエリのテストと分ける # * H2ではどうしてもテスト出来ないので、MySQL上で必ず実行する必要があるため # * 他のクエリは、H2を使用しても問題ないためユニットテストとして他のテストと合わせて実施 # * その他に、Redis等特定のミドルウェアの環境が必要なテストは、tagをつけて分ける # ### テストについて # # ユニットテスト及びインテグレーションテストには、dockerを利用 # # * docker-composeでテスト環境を集約 # * sbtをインストールしたCentOSのコンテナでcompileとtestの実行 # * その他にミドルウェアのMySQL, Redisのコンテナを使用 # * MySQLへのマイグレーションは、`sbt-flyway`を使用 # # ### テストについて # # # Akka Streamsのテストのソースは、困ったときに参考になった # # * 「こういう時どう書けばいいのかな?」がだいたい揃っている # # https://github.com/akka/akka/tree/master/akka-stream-tests/src/test/scala/akka/stream/scaladsl # # ### テストについて # # Akka StreamsのFlowを採用したメリット # # * 制約に則った部品のテストは、業務ロジックのテストに集中することが出来た # ### テストについて # # Flow同士を繋げたUseCaseのテストは、メッセージが適切に流れるかを確認するだけになった # # * In, Outのメッセージの中身を適切に返すMockにDIで差し替える # * 意図していないFlowにメッセージが来たら、failするようにする # * Protocolが適切に他のEventからCommandを生成できているかを確認 # * 他のFlowが出来てなくても、[akka-stream-testkit]を使うことでTDDも出来た # # [akka-stream-testkit]: http://doc.akka.io/docs/akka/current/scala/stream/stream-testkit.html # ### テストについて # # テストがしにくいと感じたら、設計を見直すきっかけになった # # * Flowの責務 # * Command及びEventで持つべきメッセージの中身 # # ### テストについて # # Akka Streamsのテストで一部困ったこと # # * `Flow`のOutがSeqなどのコレクションの場合、正しく流れてこないというのをテストする # # ```scala # it should "Inから何かしら処理をしてListが出来たのを一つづつOutへ流す処理、Outが0件の場合" in { # val flow = Flow[Int].flatMapConcat(_ => Source(List.empty[String])) # # val (pub, sub) = TestSource.probe[Int] # .via(flow) # .toMat(TestSink.probe[String])(Keep.both) # .run() # # sub.request(10) # # pub.sendNext(1) # .sendNext(10) # .sendComplete() // OnCompleteのメッセージを渡すため # # // sub.expectNext("1") // これだとテストが失敗する # sub.expectComplete() // OnCompleteのメッセージを受け取ったことを確認する # } # ``` # ## つまずいたこと # ### つまづいたこと # # 最初に移行するのは、移行しやすい規模の処理なので、大きな処理に立ち向かう場合、FlowとUseCaseの粒度を見極めるのが難しかった # # -> issueへリファクタリングのやり方等示唆を記載して、早めに対応する # ### つまづいたこと # # Akka HTTPで例外処理を行う # # -> 正しく`Directive`を使わないと、例外が起きたときに500な`HttpResponse`を返したいのに、`200 OK`を返してしまう # # ### Akka HTTPで例外処理を行う # # `Flow`の下流に`Sink.head`をセットして、`Future`を返す`RunnableGraph`を用意する # # * [onSuccess](http://doc.akka.io/docs/akka-http/current/scala/http/routing-dsl/directives/future-directives/onSuccess.html#onsuccess)を使うと、`scala.util.Failure`を`InternalServerError`な`HttpResponse`を返してくれる # * 失敗時のハンドリングをしたいときは、[onComplete](http://doc.akka.io/docs/akka-http/current/scala/http/routing-dsl/directives/future-directives/onComplete.html#oncomplete)を使う # # ### 今後について # # * PHPでやっていた処理を引き続きAkka Streams、Akka HTTPへ移行していく # * Query向けのAPIを[Sangria]に出来ないか検証する # * AWSへ移行していくために、[Alpakka]のAWSの各サービスのモジュールと組み合わせていく # # [Sangria]: http://sangria-graphql.org/ # [Alpakka]: http://developer.lightbend.com/docs/alpakka/current/ # ## まとめ # # * 集計処理の改善にScalaを活用している # * Akka Streamsは、業務処理の流れを**stream**として表現出来る # * Akka Streamsのテストは、テスティングフレームワークをはじめ充実している # * Akka Streamsは公式のドキュメントが充実してきているので、必ず読もう(英語だけど) # * Akka in Action の日本語版が出たら、買いましょう!