Akka Streamsへ移行しとるんやけど

ちょっと聞いてや

@grimrose

@Scala関西 Summit2017

2017-09-08

今日話すこと

現在もともとPHPで作られていた集計業務システムをScalaへ置き換えています。

チームが成熟したのを機に、まずバッチ処理をAkka Streamsに移行し始めました。

また、Akka HTTPを導入してPHPが持っていたAPIのサービスも一部移行を始めています。

Akka Streamsを導入してどう変わったのか、ハマったところ等を紹介します。

お前、誰よ

  • よしだ
  • とある人材紹介会社
    • マーケティング部門のデータエンジニア(自称)
  • 好きなIDE: IntelliJ IDEA
  • Scala歴: 2年(2015~)
  • ScalaMatsuri 2016, 2017 スタッフ

普段の仕事

  • 事業KPIの可視化を行うためのWebアプリケーションの設計から運用
  • Scalaを使って日次や月次の集計バッチを実行するアプリケーションの作成
  • 帳票や外部APIなどからデータベースへデータを投入するツールの作成
  • 部門の日常業務を改善するためのちょっとしたツールの作成

移行の話をする前に

プロジェクトの目標

事業KPIを集計して可視化する

プロジェクトの概要

  • 始まりは2013年
  • もともとはExcelでやっていた
  • 「KPIのデータを出社してから見られるようにしたい」

プロジェクトの概要

必要なデータをかき集める(人力で)

  • 社内の基幹システムから
  • 外部サービスから
  • ぬくもりのあるExcelから

プロジェクトの概要

導入時(約4年前)に採用した技術

出社時に誰もがブラウザから見られるようになった

プロジェクトの課題

見られるようになったので、やりたいことが増えた

  • 見たい範囲の拡大

    • 集計業務の拡大
    • 取り込みが必要なデータ量の増加

集計時間の増加 -> 間に合わない

このままでは出社時に見られなくなってしまう

どうしたらいいんだ

短縮できそうな箇所を洗い出し

選んだのは「データの取り込み処理

集計に必要な社内の基幹システムのデータの取り込みを短縮する

逐次処理をしていた箇所を、並行で行うことで短縮を図る

  • 2015年、並行処理が書きやすいScalaを導入

Scalaの導入の前に

とあるマーケティング部隊でのembulkの活用事例

  • 当初は、embulkで解決しようとした
  • 並列でやるには別の仕組みが必要だったため、断念
    • 当時はまだdigdagが無い
    • liquidテンプレートもまだだった

Scalaの導入

ScalikeJDBC + Skinny ORMを利用してスクラッチで開発

  • ScalaのFutureを活用して、並行で処理するように
  • PHPで3~4時間 -> 1~2時間で完了するように

さらなる改善と時間短縮を求めて

そこで、Akka Streamsですよ

Akka Streams、どうでしょう

  • ストリーム処理ことはじめ ~ Akka Streams
  • グラフを知って理解するAkka Stream
  • Scala on Docker(AWS ECS)
注意
  • 初めからWebアプリとして作っているわけではない
  • 集計業務に特化している
  • 前提がWebアプリケーションとは異なっている
    • 例えば、対象となるデータは特定期間のみ
    • 一定時間のみ稼働
    • 複数の集計軸
    • 外部サービスからのETLなど

Akka Streams導入に向けて

チームのScalaの習熟度が上がってきた

Akka Streamsの導入に向けて、まずAkkaとは何かから学ぶことに

Scala + Akka勉強会を週一で開催

Akkaとは何かを学ぶなら

Akka in Action

  • Akka Streamsの章が増えた
  • この本を読めば、Akkaの基本となる考え方やAkkaのエコシステムについて知ることが出来る
  • 日本語版が出るらしい <- new!!

Akkaでのメッセージのパターンについて学ぶなら

Reactive Messaging Patterns with the Actor Model

通称: RMP本

Akka Streamsを導入する

既存の処理とAkka Streamsを繋げるために下記のスライドを参考に、シンプルな制約を策定

DDD + Clean Architecture + UCDOM Full版

最新DDDアーキテクチャとAkkaでの実装ヒントについて

import akka.scaladsl.Flow

trait BaseFlow[Command, Event] {

  def toFlow: Flow[Command, Event, NotUsed]

}

少しづつ「部品」としてのFlowを作っていく

この制約の基本

Flowは、Commandを受け取ってEventを返す

  • 既存の処理を分析し、「入力」「処理」「結果」を制約に則ってFlowを作る

Dependency Injection(依存性の注入)

import SomethingProtocol._

class DoSomethingBusinessLogic(
  repository: SomeRepository
) extends BaseFlow[DoSomething, DoneSomething] {

  override def toFlow: Flow[DoSomething, DoneSomething, NotUsed] = {
    Flow[DoSomething]
      .map { cmd => process(cmd.arg1, cmd.arg2) }
      .map(toEvent)
  }

  def process(arg1: String, arg2: Int): String = { ... }
  def toEvent(result: String): DoneSomething = { ... }
}

設計の基本は、以下の通り

  • built-inのメソッドを使う
  • Flowの中でしか利用しない固有のFlowは、無理やりclassにはしない
  • Flowの責務と境界を考える
  • 業務ロジックの流れをFlow.fromGraphFlowShapeを作ってまとめる

GraphDSLを使う場合

scalafmtやIntelliJ IDEAでフォーマットされないようにする

// @formatter:off

// @formatter:on

いくつか設計のパターンが見えてくる

例えば

  • ListやSeqで流れてきたのを、一つづつ後ろへ流したい
Flow[List[Int]]
  .flatMapConcat { list =>
    Source(list)
  }
  • 型に応じて流す先を変えたい
val broadcast = Broadcast[DoneSomething](2)
val accepted = broadcast.out(0).mapConcat {
  case a: Accepted => a :: Nil
  case _           => Nil
}
val rejected = broadcast.out(1).mapConcat {
  case r: Rejected => r :: Nil
  case _           => Nil
}
  • 必要な要素をまとめて違う型で後ろに流したい
ZipWith[Int, String, Seq[String]] { (i: Int, s: String) =>
  (1 to i).map(n => s * n)
}

非同期処理と並行処理

  • Futureにしていた非同期処理は、mapAsyncasync
  • Fan-In、Fan-Outを活用して並行処理を行うことが多い
  • Futureを使う場合は、Akka Streamsで使うExecutionContextとは別のを用意する
  • DBなどを扱うFlowは、blocking用のExecutionContextを用意する

今のところ自作の GraphStage を作る必要性に迫られていない

が、今後に備えて勉強しておく必要はあると思われる

GraphStage を作るときに参考にしたい

Alpakka

Flow同士をつなぐ

他のFlowEventからCommandへ変換するのは、Protocolの役割

変換は、FlowまたはUseCaseのprivateなメソッドで行うか、Protocol objectにpublicなメソッドで行う

object SomethingProtocol {

  case class DoSomething(arg1: String, args2: Int)
  case object DoneSomething

  def toDoSomethingCommand(event: OtherContextEvent): DoSomething = {
    DoSomething(event.result, 100)
  }
}
困ったときに参考になった

公式ドキュメント

http://doc.akka.io/docs/akka/current/scala/stream/index.html

GitHub

https://github.com/akka/akka

Akka Streamsを使ったさらなる改善

Akka HTTPの導入

Akka HTTPを利用したAPIサーバの開発

導入する理由

  • 現在Angular 4を試験的に導入しているため、今後はサーバーサイドにViewを持つ必要がなくなっていく

  • 既存のシステムのAPIは画面と密になっているため、他のシステムからAPIを呼ぶのが難しい

  • バッチ処理を呼び出すシステムがJSONを受け付けるようになると、AWS上での連携がしやすくなる

  • バッチの処理IDを202 AcceptedHttpResponseで返すことで、非同期にすることができるようになる

Akka HTTPを利用したAPIサーバの開発

  • 現在、PHPアプリケーションの一部をBFF(Backends for Frontends)として、Akka HTTPのAPIとやり取りを行うようにして稼働中

  • 結果を返す必要の無いバッチ処理のキックやAPIサーバの死活監視等、直接影響が少ない箇所から導入中

  • 将来的に、Angular等のフロントエンドアプリケーションからのリクエストは全てAkka HTTPのAPIサーバに任せるようにしたい

テストについて

テストについて

DBにアクセスする集計処理がメインなので、SQLの実行は必ず行う

  • MySQL固有のクエリは、scalatestのtagを利用して、他のクエリのテストと分ける
  • H2ではどうしてもテスト出来ないので、MySQL上で必ず実行する必要があるため
  • 他のクエリは、H2を使用しても問題ないためユニットテストとして他のテストと合わせて実施
  • その他に、Redis等特定のミドルウェアの環境が必要なテストは、tagをつけて分ける

テストについて

ユニットテスト及びインテグレーションテストには、dockerを利用

  • docker-composeでテスト環境を集約
  • sbtをインストールしたCentOSのコンテナでcompileとtestの実行
  • その他にミドルウェアのMySQL, Redisのコンテナを使用
  • MySQLへのマイグレーションは、sbt-flywayを使用

テストについて

Akka Streamsのテストのソースは、困ったときに参考になった

  • 「こういう時どう書けばいいのかな?」がだいたい揃っている

https://github.com/akka/akka/tree/master/akka-stream-tests/src/test/scala/akka/stream/scaladsl

テストについて

Akka StreamsのFlowを採用したメリット

  • 制約に則った部品のテストは、業務ロジックのテストに集中することが出来た

テストについて

Flow同士を繋げたUseCaseのテストは、メッセージが適切に流れるかを確認するだけになった

  • In, Outのメッセージの中身を適切に返すMockにDIで差し替える
  • 意図していないFlowにメッセージが来たら、failするようにする
  • Protocolが適切に他のEventからCommandを生成できているかを確認
  • 他のFlowが出来てなくても、akka-stream-testkitを使うことでTDDも出来た

テストについて

テストがしにくいと感じたら、設計を見直すきっかけになった

  • Flowの責務
  • Command及びEventで持つべきメッセージの中身

テストについて

Akka Streamsのテストで一部困ったこと

  • FlowのOutがSeqなどのコレクションの場合、正しく流れてこないというのをテストする
it should "Inから何かしら処理をしてListが出来たのを一つづつOutへ流す処理、Outが0件の場合" in {
    val flow = Flow[Int].flatMapConcat(_ => Source(List.empty[String]))

    val (pub, sub) = TestSource.probe[Int]
      .via(flow)
      .toMat(TestSink.probe[String])(Keep.both)
      .run()

    sub.request(10)

    pub.sendNext(1)
      .sendNext(10)
      .sendComplete() // OnCompleteのメッセージを渡すため

    // sub.expectNext("1") // これだとテストが失敗する
    sub.expectComplete() // OnCompleteのメッセージを受け取ったことを確認する
  }

つまずいたこと

つまづいたこと

最初に移行するのは、移行しやすい規模の処理なので、大きな処理に立ち向かう場合、FlowとUseCaseの粒度を見極めるのが難しかった

-> issueへリファクタリングのやり方等示唆を記載して、早めに対応する

つまづいたこと

Akka HTTPで例外処理を行う

-> 正しくDirectiveを使わないと、例外が起きたときに500なHttpResponseを返したいのに、200 OKを返してしまう

Akka HTTPで例外処理を行う

Flowの下流にSink.headをセットして、Futureを返すRunnableGraphを用意する

  • onSuccessを使うと、scala.util.FailureInternalServerErrorHttpResponseを返してくれる
  • 失敗時のハンドリングをしたいときは、onCompleteを使う

今後について

  • PHPでやっていた処理を引き続きAkka Streams、Akka HTTPへ移行していく
  • Query向けのAPIをSangriaに出来ないか検証する
  • AWSへ移行していくために、AlpakkaのAWSの各サービスのモジュールと組み合わせていく

まとめ

  • 集計処理の改善にScalaを活用している
  • Akka Streamsは、業務処理の流れをstreamとして表現出来る
  • Akka Streamsのテストは、テスティングフレームワークをはじめ充実している
  • Akka Streamsは公式のドキュメントが充実してきているので、必ず読もう(英語だけど)
  • Akka in Action の日本語版が出たら、買いましょう!