從頭學習大數據培訓課程 scala 對象與函數式編程(五)scala 基礎 5 Actor
Actor
為什麼要學習actor?我們現在學的Scala Actor是Scala 2.10.x版本及以前版本的ActorScala在2.11.x版本中將Akka加入其中,作為默認的Actor,老版本的Actor已經廢棄我們學習Actor的目的就是為了學習Akka做鋪墊
什麼是Actor?
Actor是消息並發模型Scala中的Actor能夠實現並行編程的強大功能,它是基於事件模型的並發機制。Scala是運用消息(message)的發送、接收來實現多線程的。使用Scala能夠更容易地實現多線程應用的開發。Java並發編程與Scala Actor編程的區別
Scala的Actor類似於Java中的多線程編程,但是不同的是,Scala的Actor提供的模型與多線程有所不同。Actor方法執行順序
1.調用start()方法啟動Actor2.執行act()方法3.向Actor發送消息發送消息的方式
! 發送非同步消息,沒有返回值!? 發送同步消息,等待返回值!! 發送非同步消息,返回值是Future[Any]Akka簡介
Spark的RPC是通過Akka類庫實現的,Akka用Scala語言開發,基於Actor並發模型實現Akka具有高可靠、高性能、可擴展等特點,使用Akka可以輕鬆實現分散式RPC功能。
Actor是Akka中最核心的概念,它是一個封裝了狀態和行為的對象,Actor之間可以通過交換消息的方式進行通信每個Actor都有自己的收件箱(MailBox)。通過Actor能夠簡化鎖及線程管理,可以非常容易地開發出正確地並發程序和並行系統,Actor具有如下特性:1.提供了一種高級抽象,能夠簡化在並發(Concurrency)/並行(Parallelism)應用場景下的編程開發2.提供了非同步非阻塞的、高性能的事件驅動編程模型3.超級輕量級事件處理(每GB堆內存幾百萬Actor)ActorSystem
在Akka中,ActorSystem是一個重量級的結構他需要分配多個線程,所以在實際應用中,ActorSystem通常是一個單例對象我們可以使用這個ActorSystem創建很多Actor
Actor
在Akka中,Actor負責通信,在Actor中有一些重要的生命周期方法1.preStart()方法:該方法在Actor對象構造方法執行後執行,整 個Actor生命周期中僅執行一次2.receive()方法:該方法在Actor的preStart方法執行完成後執行,用於接收消息,會被反覆執行掌握的內容
1.創建Actor2.Actor的消息接受和發送3.用Actor並發編程實現WordCountactor的簡單實現import scala.actors.Actorobject ActorDemo1 extends Actor{override def act(): Unit = { for(i <- 1 to 10){ println(s"actor1:$i")Thread.sleep(1000) } } }object ActorDemo2 extends Actor{override def act(): Unit = { for(i <- 1 to 10){ println(s"actor2:$i")Thread.sleep(2000) } } }object ActorTest{def main(args: Array[String]): Unit = {ActorDemo1.start()ActorDemo2.start() } }
actor通信
class ActorDemo3 extends Actor {override def act(): Unit = { while(true){ receive({ case "start" => {println("starting...")sender ! "started" } case AsynMsg(id,msg) => { println(s"id:$id,AsynMsg:$msg")Thread.sleep(2000) sender ! ReplyMeg(10,"success") } case SyncMsg(id,msg) => { println(s"id:$id,AsynMsg:$msg")Thread.sleep(2000)sender ! ReplyMeg(20,"success") } }) } } }object ActorDemo3{def main(args: Array[String]): Unit = {val demo = new ActorDemo3demo.start() //非同步發送消息,沒有返回值demo ! AsynMsg(1,"hainiu1") println("沒有返回值的非同步消息發送完成") //同步發送消息,線程等待返回值 val res: Any = demo !? SyncMsg(2,"hainiu2") println("有返回值的同步消息發送完成") println(res) //非同步發送消息,有返回值,返回類型是Future[Any]val res2: Future[Any] = demo !! AsynMsg(3,"hainiu3")Thread.sleep(3000) println("有返回值的非同步消息發送完成") if(res2.isSet){val value = res2.apply() println(value) }else{ print("None") } //發送string的同步有返回值,當actor中沒有寫相應的返回邏輯的時候 !? 方法會一直阻塞val res3: Any = demo !? "start" println(res3) } } case class AsynMsg(id:Int,msg:String) case class SyncMsg(id:Int,msg:String) //參數的長度最多是23個 case class ReplyMeg(id:Int,msg:String)
用actor實現wordcount
import scala.actors.{Actor, Future}import scala.collection.mutable.ListBufferimport scala.io.Sourceobject ActorWordCount {def main(args: Array[String]): Unit = {val files: Array[String] = Array("C:\tmp\input\111.txt", "C:\tmp\input\111.txt", "C:\tmp\input\111.txt") //存放接收到的每個actor處理的結果數據val replys: ListBuffer[Future[Any]] = new ListBuffer[Future[Any]] //存放有actor返回結果的Future數據val contens: ListBuffer[Map[String, Int]] = new ListBuffer[Map[String, Int]] for (file <- files) { // val lines:List[String] = Source.fromFile(file).getLines().toList // val words:List[String] = lines.flatMap(_.split(" ")) // val res: Map[String, Int] = words.map((_,1)).groupBy(_._1).mapValues(_.size)val task = new Tasktask.start() //非同步發送有返回值val res: Future[Any] = task !! file//把每個文件的結果數據存儲到ListBufferreplys += res//List(Map("a"->1,"b"->2),Map("a",1)) } while (replys.size > 0) { //過濾每個Futrue對象,如果沒有數據就過濾掉val dones: ListBuffer[Future[Any]] = replys.filter(_.isSet) for (done <- dones) {contens += done.apply().asInstanceOf[Map[String, Int]]replys -= done} } //List(Map("a"->1,"b"->2),Map("c",1)) //List(Map("a"->1,"b"->2,("c",1))) flatten //map("a" -> List(("a",1),("a",2))) groupBy //map("a" -> 3) mapValues(_.foldLeft(0)(_ + _._2))val result: Map[String, Int] = contens.flatten.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2)) println(result) } } class Task extends Actor {override def act(): Unit = { receive({ case file: String => {val lines: List[String] = Source.fromFile(file).getLines().toListval words: List[String] = lines.flatMap(_.split(" "))val res: Map[String, Int] = words.map((_, 1)).groupBy(_._1).mapValues(_.size) //非同步發送結果數據sender ! res} }) } }
版權聲明:原創作品,允許轉載,轉載時務必以超鏈接的形式表明出處和作者信息。否則將追究法律責任。來自海牛學院-青牛
推薦閱讀:
※MaxCompute 存儲優化技巧
※大數據實時日活計算之Bloom Filter
※阿里巴巴大數據之路
※R語言和大數據
※如何利用八爪魚,實現餐飲大數據(以辰智商圈秀為例)