akka-stream 替代actor的例子
Akka Streams 相比actor 到底提供了什麼?
The Akka Streams API is a higher-level API that builds on top of the Akka actor-model and implements many of this things that I have just addressed, including batching messages, rate-limiting requests, and handling flow control
通過websocket和瀏覽器通信(akka-http) , 並寫入資料庫,然後返回消息給瀏覽器
val database = new Database()val measurementsWebSocketService = Flow[Message] .collect { case TextMessage.Strict(text) => Future.successful(text) case TextMessage.Streamed(textStream) => textStream.runFold("")(_ + _) .flatMap(Future.successful) } .mapAsync(1)(identity) .map(InsertMessage.parse) .groupedWithin(1000, 1 second) .mapAsync(10)(database.bulkInsertAsync) .map(messages => InsertMessage.ack(messages.last))val route = path("measurements") { get { handleWebSocketMessages(measurementsWebSocketService) }}val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
如果用actor 實現,代碼非常複雜,還有一堆var
class DatabaseActor extends Actor { val database = new Database() var messages: Seq[String] = Nil var count = 0 var flush = true var outstanding = 0 override def preStart() = { context.system.scheduler.scheduleOnce(1 second) { self ! Insert } } def receive = { case InsertMessage(message) => messages = message +: messages count += 1 if (count >= 1000) { insert() flush = false } case Insert => if (flush) insert() else flush = true context.system.scheduler.scheduleOnce(1 second) { self ! Insert } case Decrement => outstanding -= 1 if (count >= 1000) { insert() flush = false } } private def insert() = { if (count > 0 && outstanding < 10) { outstanding += 1 val (insert, remaining) = messages.splitAt(1000) messages = remaining count = remaining.size database.bulkInsertAsync(insert) andThen { case _ => self ! Decrement } } }}
Thanks the authors below!
Akka Streams: A Motivating ExampleSoftwareMill blog: Replacing Akka actors with Akka streams
推薦閱讀:
※盤點2017年11月份值得參加的大數據大會
※回頭望,來時路-R語言學習實踐階段回顧
※讀阿里巴巴中台戰略筆記
※知識布局-大數據apache基礎組件安裝文檔-hbase
※阿里巴巴大數據之路-數據挖掘中台