現在もともとPHPで作られていた集計業務システムをScalaへ置き換えています。
チームが成熟したのを機に、まずバッチ処理をAkka Streamsに移行し始めました。
また、Akka HTTPを導入してPHPが持っていたAPIのサービスも一部移行を始めています。
Akka Streamsを導入してどう変わったのか、ハマったところ等を紹介します。
事業KPIを集計して可視化する
出社時に誰もがブラウザから見られるようになった
** 集計時間の増加 -> 間に合わない**
このままでは出社時に見られなくなってしまう
どうしたらいいんだ
短縮できそうな箇所を洗い出し
選んだのは「データの取り込み処理」
集計に必要な社内の基幹システムのデータの取り込みを短縮する
逐次処理をしていた箇所を、並行で行うことで短縮を図る
ScalikeJDBC + Skinny ORMを利用してスクラッチで開発
さらなる改善と時間短縮を求めて
Scala + Akka勉強会を週一で開催
既存の処理とAkka Streamsを繋げるために下記のスライドを参考に、シンプルな制約を策定
import akka.scaladsl.Flow
trait BaseFlow[Command, Event] {
def toFlow: Flow[Command, Event, NotUsed]
}
必要なRepository、Service、FlowはScaldiを利用
Scaldiを採用したのは、Skinnyとscaldi/scaldi-akkaがあったから
import SomethingProtocol._
class DoSomethingBusinessLogic(
repository: SomeRepository
) extends BaseFlow[DoSomething, DoneSomething] {
override def toFlow: Flow[DoSomething, DoneSomething, NotUsed] = {
Flow[DoSomething]
.map { cmd => process(cmd.arg1, cmd.arg2) }
.map(toEvent)
}
def process(arg1: String, arg2: Int): String = { ... }
def toEvent(result: String): DoneSomething = { ... }
}
設計の基本は、以下の通り
Flow
の中でしか利用しない固有のFlow
は、無理やりclassにはしないFlow
の責務と境界を考えるFlow.fromGraph
でFlowShape
を作ってまとめるいくつか設計のパターンが見えてくる
例えば
Flow[List[Int]]
.flatMapConcat { list =>
Source(list)
}
val broadcast = Broadcast[DoneSomething](2)
val accepted = broadcast.out(0).mapConcat {
case a: Accepted => a :: Nil
case _ => Nil
}
val rejected = broadcast.out(1).mapConcat {
case r: Rejected => r :: Nil
case _ => Nil
}
ZipWith[Int, String, Seq[String]] { (i: Int, s: String) =>
(1 to i).map(n => s * n)
}
mapAsync
やasync
へExecutionContext
とは別のを用意するExecutionContext
を用意する今のところ自作の GraphStage を作る必要性に迫られていない
が、今後に備えて勉強しておく必要はあると思われる
GraphStage を作るときに参考にしたい
他のFlow
のEvent
からCommand
へ変換するのは、Protocol
の役割
変換は、Flow
またはUseCase
のprivateなメソッドで行うか、Protocol
objectにpublicなメソッドで行う
object SomethingProtocol {
case class DoSomething(arg1: String, args2: Int)
case object DoneSomething
def toDoSomethingCommand(event: OtherContextEvent): DoSomething = {
DoSomething(event.result, 100)
}
}
現在、PHPアプリケーションの一部をBFF(Backends for Frontends)として、Akka HTTPのAPIとやり取りを行うようにして稼働中
結果を返す必要の無いバッチ処理のキックやAPIサーバの死活監視等、直接影響が少ない箇所から導入中
将来的に、Angular等のフロントエンドアプリケーションからのリクエストは全てAkka HTTPのAPIサーバに任せるようにしたい
DBにアクセスする集計処理がメインなので、SQLの実行は必ず行う
ユニットテスト及びインテグレーションテストには、dockerを利用
sbt-flyway
を使用Akka Streamsのテストのソースは、困ったときに参考になった
https://github.com/akka/akka/tree/master/akka-stream-tests/src/test/scala/akka/stream/scaladsl
Flow同士を繋げたUseCaseのテストは、メッセージが適切に流れるかを確認するだけになった
it should "Inから何かしら処理をしてListが出来たのを一つづつOutへ流す処理、Outが0件の場合" in {
val flow = Flow[Int].flatMapConcat(_ => Source(List.empty[String]))
val (pub, sub) = TestSource.probe[Int]
.via(flow)
.toMat(TestSink.probe[String])(Keep.both)
.run()
sub.request(10)
pub.sendNext(1)
.sendNext(10)
.sendComplete() // OnCompleteのメッセージを渡すため
// sub.expectNext("1") // これだとテストが失敗する
sub.expectComplete() // OnCompleteのメッセージを受け取ったことを確認する
}
最初に移行するのは、移行しやすい規模の処理なので、大きな処理に立ち向かう場合、FlowとUseCaseの粒度を見極めるのが難しかった
-> issueへリファクタリングのやり方等示唆を記載して、早めに対応する
Flow
の下流にSink.head
をセットして、Future
を返すRunnableGraph
を用意する
scala.util.Failure
をInternalServerError
なHttpResponse
を返してくれる