= AKKA = 동시성 Actor 모델의 현대적 구현체 * [[https://developer.lightbend.com/guides/akka-quickstart-java/index.html#benefits-of-using-the-actor-model | Actor 모델 사용의 이점]] Scala 기본 문법을 안다는 전제하에 실용적인 측면으로 기술. {{tag>Framework CQRS Concurrency Parallelism}} = 기초 = == Reactive System == * Resilient회복력 : 오류/예외 같은 장애가 발생하더라도 항상 응답하여 회복 * Elastic탄력성 : Traffic 크기에 상관없이 할당된 자원을 조절해 부하를 서비스할 수 있게 항상 응답성을 유지 * Message Driven메시지 기반 : 시스템의 구성 요소가 서로 **느슨하게 결합**되어 있으며 **비동기** 메시지 전달을 사용해 통신하고, 행동을 취함으로써 메시지에 반응. * Responsive응답성 : 위 세 속성을 만족하는 시스템을 "응답성이 있는 시스템"이라 한다. == Actor == === 연산 === 모든 연산은 **비동기**적!! * create (생성) * send (송신) * become (상태변화) * supervise (감독) === 속성 === * State : Actor는 내부 상태가 있고, 이 상태는 Message를 처리함에 따라 순차적으로 변한다. (mutable) * Behavior : Actor는 송신된 Message에 행위를 적용함으로써 반응한다. * Communication : Actor는 다른 Actor에게 Message를 송신하거나 수신함으로써 서로 통신한다. * Mailbox : Mailbox는 Actor가 메시지를 가져오고 처리하는 **Message Queue**다. == 기본 Actor System Project 생성 == https://doc.akka.io/docs/akka/current/actors.html?language=scala {{section>[language:scala]#[환경 설정]}} === Project 생성 === 아래 과정은 console에서 수행할 경우 필요한 작업. Intellij로 sbt 프로젝트 생성하면 더 쉽게 만들 수 있다. # Project를 생성할 Directory 생성 # 생성한 Directory로 이동해서 "sbt" 실행 # sbt 콘솔에서 아래 과정을 수행하면 "build.sbt" Build 설정 파일과 Class 파일이 놓일 "target" Directory가 생성된다. set name := "hello-akka" set version := "0.1" set scalaVersion := "2.12.6" session save exit # build.sbt를 열고 아래 내용을 추가한다. // https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.14" # src/main/scala/ 밑에 프로젝트 이름의 패키지와 클래스를 작성. package com.practice.ex01 import akka.actor.ActorSystem object HelloAkkaActorSystem extends App{ val actorSystem = ActorSystem("HelloAkka") println(actorSystem) // akka://HelloAkka } # console에서 sbt "runMain com.practice.ex01.HelloAkkaActorSystem" ==== 기본적인 Ask-Tell 예제 ==== scala에서 ask는 "?", tell은 "!" method를 사용한다. package com.practice.ex01 import akka.actor.{ActorSystem, Props} import akka.pattern.ask import akka.util.Timeout import scala.concurrent.Await import scala.concurrent.duration._ import akka.actor.Actor object HelloAkkaActorSystem extends App { val actorSystem = ActorSystem("HelloAkka") val actor = actorSystem.actorOf(Props(classOf[SumActor]), "sumActor") // ask를 사용하려면 timeout을 반드시 명시해야 함. implicit val timeout = Timeout(10 seconds) val future = actor ? (1 to 10).toArray val result = Await.result(future, 10 seconds) println(s"result : $result") } class SumActor extends Actor { override def receive: Receive = { case arr: Array[Int] => sender ! arr.sum case _ => sender ! "Error!" } } ==== Actor간 통신 (tell을 이용한 결과 수신) ==== ask는 기본적으로 blocking. 그러므로 비슷하게 tell을 이용하여 서로 주고 받는 callback 형식으로 구현하므로 더 경제적. # QueryActor에서 Start로 연산 요청. 연산할 Actor인 SumActor와 값을 전달 받음. # SumActor에서 전달 받은 값을 연산하고 sender(QueryActor)의 Complete로 전달 # QueryActor의 Complete에서 연산 결과 수신. package com.practice.ex01 import akka.actor.{ActorSystem, Props} import com.practice.ex01.Message.Start object HelloAkkaActorSystem extends App { val actorSystem = ActorSystem("HelloAkka") val queryActor = actorSystem.actorOf(Props(classOf[QueryActor]), "queryActor") val sumActor = actorSystem.actorOf(Props(classOf[SumActor]), "sumActor") queryActor ! Start(sumActor, (1 to 10).toArray) } package com.practice.ex01 import akka.actor.ActorRef object Message { case class Start(actorRef: ActorRef, arr: Array[Int]) case class Progress(arr: Array[Int]) case class Complete(result: Int) } package com.practice.ex01 import akka.actor.Actor import com.practice.ex01.Message._ class QueryActor extends Actor{ override def receive: Receive = { case Start(actorRef, arr) => println("Start!") actorRef ! Progress(arr) case Complete(result) => println(s"Complete! $result") case _ => println("QueryActor - WTHIGO?") } } package com.practice.ex01 import akka.actor.Actor import com.practice.ex01.Message._ class SumActor extends Actor { override def receive: Receive = { case Progress(arr) => println("Progress!") sender ! Complete(arr.sum) case _ => sender ! "SumActor - WTHIGO?" } } ==== 사용자 정의 Mailbox 만들기 ==== Actor가 메시지를 가져오는 방법을 제어할 때 사용. ===== Mailbox 정의 ===== package com.practice.ex01 import akka.actor.{ActorRef, ActorSystem} import akka.dispatch.{MailboxType, MessageQueue, ProducesMessageQueue} import com.typesafe.config.Config class MyCustomMailbox extends MailboxType with ProducesMessageQueue[MyMessageQueue] { def this(setting: ActorSystem.Settings, config: Config) = this() override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = { println("MyCustomMailbox is created!") new MyMessageQueue() } } package com.practice.ex01 import java.util.concurrent.ConcurrentLinkedQueue import akka.actor.ActorRef import akka.dispatch.{Envelope, MessageQueue} class MyMessageQueue extends MessageQueue{ private final val queue = new ConcurrentLinkedQueue[Envelope]() override def enqueue(receiver: ActorRef, handle: Envelope): Unit = { println("enqueue!") if(handle.sender.path.name == "MyActor") { handle.sender ! "I know you!" queue.offer(handle) } else handle.sender ! "Who are you?" } override def dequeue(): Envelope = queue.poll override def numberOfMessages: Int = queue.size override def hasMessages: Boolean = !queue.isEmpty override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { while (hasMessages) { deadLetters.enqueue(owner, dequeue()) } } } custom-dispatcher { mailbox-requirement = "com.practice.ex01.MyMessageQueue" } akka.actor.mailbox.requirements { "com.practice.ex01.MyMessageQueue" = custom-dispatcher-mailbox } custom-dispatcher-mailbox { mailbox-type = "com.practice.ex01.MyCustomMailbox" } ===== Actor 정의 ===== package com.practice.ex01 import akka.actor.Actor class MySpecialActor extends Actor { override def receive: Receive = { case msg: String => println(s"MySpecialActor's msg : $msg") } } package com.practice.ex01 import akka.actor.{Actor, ActorRef} class MyActor extends Actor { override def receive: Receive = { case (msg: String, actorRef: ActorRef) => actorRef ! msg case msg => println(s"MyActor's msg? $msg") } } ===== Client ===== package com.practice.ex01 import akka.actor.{ActorSystem, Props} object Client extends App { val actorSystem = ActorSystem("HelloAkka") // application.conf에 기술된 custom-dispatcher을 읽고 // 사용자 정의 Mailbox(MyCustomMailBox)가 먼저 생성되고, // 그 안에 create() method로 message queue가 생성된다. val mySpecialActor = actorSystem.actorOf(Props[MySpecialActor] .withDispatcher("custom-dispatcher")) val unknownActor = actorSystem.actorOf(Props[MyActor], "abracadabra") val knownActor = actorSystem.actorOf(Props[MyActor], "MyActor") unknownActor ! ("hello", mySpecialActor) knownActor ! ("hello", mySpecialActor) // 비동기적이므로 아래 결과는 바뀔 수 있음. /* MyCustomMailbox is created! enqueue! enqueue! MyActor's msg? Who are you? MyActor's msg? I know you! MySpecialActor's msg : hello */ } ==== 수신 메시지의 우선순위 정의 ==== 우선순위에 따라 특정 메시지를 먼저 처리하고 싶을 때 사용. 설정은 위와 비슷하므로 상세한 설명은 생략. ===== Mailbox 정의 ===== package com.practice.ex01 import akka.actor.ActorSystem import akka.dispatch.{PriorityGenerator, UnboundedPriorityMailbox} import com.typesafe.config.Config class MyPriorityMailbox(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox ( PriorityGenerator { // int 값이 작을수록 우선순위가 높다. case x: Int => 1 case x: String => 0 case _ => 3 } ) my-priority-mailbox { mailbox-type = "com.practice.ex01.MyPriorityMailbox" } ===== Actor 정의 ===== package com.practice.ex01 import akka.actor.Actor class MyPriorityActor extends Actor{ override def receive: Receive = { case x: Int => println(x) case x: String => println(x) case x => println(x) } } ===== Client ===== package com.practice.ex01 import akka.actor.{ActorSystem, Props} object Client extends App { val actorSystem = ActorSystem("HelloAkka") val myPriorityActor = actorSystem.actorOf(Props[MyPriorityActor] .withDispatcher("my-priority-mailbox")) Array('LowestPriority, 123, "Top Priority!", 1.23).foreach(myPriorityActor ! _) /* 결과적으로 String, int 순으로 우선순위가 높음. */ /* Top Priority! 123 1.23 'LowestPriority */ } ==== 때와 상관없이 항상 우선권을 갖는 메일박스 만들기 ==== ===== 항상 우선권을 가질 Message 정의 ===== package com.practice.ex01 import akka.dispatch.ControlMessage case object MyControlMessage extends ControlMessage ===== Mailbox 정의 ===== control-aware-mailbox { mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox" } ===== Actor 정의 ===== package com.practice.ex01 import akka.actor.Actor class Logger extends Actor{ override def receive: Receive = { case MyControlMessage => println("Control message first!") case x => println(x) } } ===== Client ===== package com.practice.ex01 import akka.actor.{ActorSystem, Props} object Client extends App { val actorSystem = ActorSystem("HelloAkka") val loggerActor = actorSystem.actorOf(Props[Logger] .withDispatcher("control-aware-mailbox")) loggerActor ! "Hello," loggerActor ! "world!" loggerActor ! MyControlMessage // 우선적으로 처리 /* Control message first! Hello, world! */ } ==== Runtime중에 Actor 행위 바꾸기 ==== become/unbecome으로 구현. ===== Message 정의 ===== package com.practice.ex01 object StateMessage { case class IntState() case class StringState() } ===== Actor 정의 ===== package com.practice.ex01 import akka.actor.Actor import com.practice.ex01.StateMessage.{IntState, StringState} class StateChangingActor extends Actor { override def receive: Receive = { case StringState => context.become(isStateString) case IntState => context.become(isStateInt) } def isStateString: Receive = { case msg : String => println(s"$msg") case IntState => context.become(isStateInt) } def isStateInt: Receive = { case msg : Int => println(s"$msg") case StringState => context.become(isStateString) } } ===== Client ===== package com.practice.ex01 import akka.actor.{ActorSystem, Props} import com.practice.ex01.StateMessage.{IntState, StringState} object Client extends App { val actorSystem = ActorSystem("HelloAkka") val stateChangingActor = actorSystem.actorOf(Props[StateChangingActor]) stateChangingActor ! StringState stateChangingActor ! "Hello, world!" stateChangingActor ! IntState stateChangingActor ! 123 stateChangingActor ! "abracadabra" // 현재 state가 IntState이기 때문에 무시됨. /* Hello, world! 123 */ } ==== Actor 중지시키기 ==== https://stackoverflow.com/questions/13847963/akka-kill-vs-stop-vs-poison-pill * PoisonPill : Mailbox에 대기중인 메시지를 모두 처리 정지 * Kill : ActorKilledException를 발생시킴. 이에 대한 구체적인 동작은 supervisor mechanism을 따른다. 기본값은 Actor를 중지. 그리고 Mailbox가 유지되므로 실패를 발생시킨 메시지를 제외한 나머지 메시지는 그대로 남는다. * context.stop(actorRef) : Actor를 순차적으로 정지시킬때 사용. Bottom-up(자식, 부모, 액터 시스템) 순서로 정지. 현재 처리중인 Message 완료후 다른 메시지는 모두 무시하고 정지시킨다. package com.practice.ex01 import akka.actor.Actor class ShutdownActor extends Actor { override def receive: Receive = { case msg: String => println(s"$msg") case Stop => context.stop(self) } } package com.practice.ex01 import akka.actor.{ActorSystem, Kill, PoisonPill, Props} object Client extends App { val actorSystem = ActorSystem("HelloAkka") val shutdownActor1 = actorSystem.actorOf(Props[ShutdownActor]) shutdownActor1 ! "hello1" shutdownActor1 ! PoisonPill shutdownActor1 ! "Are you breathing?" val shutdownActor2 = actorSystem.actorOf(Props[ShutdownActor]) shutdownActor2 ! "hello2" shutdownActor2 ! Kill shutdownActor2 ! "Are you breathing?" val shutdownActor3 = actorSystem.actorOf(Props[ShutdownActor]) shutdownActor3 ! "hello3" shutdownActor3 ! Stop shutdownActor3 ! "Are you breathing?" /* hello2 hello1 hello3 scala> [INFO] [08/06/2018 00:25:30.860] [HelloAkka-akka.actor.default-dispatcher-2] [akka://HelloAkka/user/$c] Message [java.lang.String] without sender to Actor[akka://HelloAkka/user/$c#303943664] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://HelloAkka/user/$c#303943664]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [08/06/2018 00:25:30.861] [HelloAkka-akka.actor.default-dispatcher-2] [akka://HelloAkka/user/$a] Message [java.lang.String] without sender to Actor[akka://HelloAkka/user/$a#-1903145472] was not delivered. [2] dead letters encountered. If this is not an expected behavior, then [Actor[akka://HelloAkka/user/$a#-1903145472]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [ERROR] [08/06/2018 00:25:30.862] [HelloAkka-akka.actor.default-dispatcher-5] [akka://HelloAkka/user/$b] Kill (akka.actor.ActorKilledException: Kill) [INFO] [08/06/2018 00:25:30.863] [HelloAkka-akka.actor.default-dispatcher-2] [akka://HelloAkka/user/$b] Message [java.lang.String] without sender to Actor[akka://HelloAkka/user/$b#14529698] was not delivered. [3] dead letters encountered. If this is not an expected behavior, then [Actor[akka://HelloAkka/user/$b#14529698]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. */ } == Fault tolerant system == 장애 발생시 정지되는 것이 아닌 처리량이 감소한 가동으로 항상 반응성을 유지하며, 완전 가동중과 비교하여 정책적으로 더 혹은 덜 가동되는 시스템. === System design === ==== 염두할 사항 ==== * 시스템을 컴포넌트로 나누기 : "어떤 기능에 대한 책임" 단위로 나눈다. 이 때, 연쇄적 장애 발생시 각 컴포넌트는 간섭이 없는 독립적인 형태로 서로 영향이 없어야 한다. 특히 Core 컴포넌트는 더더욱 장애 발생으로 부터 독립적이어야 한다. * Backup : 장애 발생에 대비. 고가용성. ==== 구축 방법 ==== * Duplication : 컴포넌트 여러개 실행 * Replication : 같은 컴포넌트를 여러개 준비, 모두에게 요청 후 송신하고, 그 중 하나 선택 * Isolation : Multi process. 다른 컴포넌트의 실패에 영향을 받지 않게 하려는 목적 * Delegation : 장애 발생시 정상 실행중인 컴포넌트에게 넘기려는 목적. === Actor Life cycle === https://doc.akka.io/docs/akka/2.5/actors.html#actor-lifecycle === Supervison and Monitoring === https://doc.akka.io/docs/akka/2.5/general/supervision.html ==== Strategy ==== * OneForOneStrategy : Default. 고장난 자식 Actor만 Restart/Resume 혹은 상위 Actor에게 보고. 이는 각 자식 Actor들이 각각 영향을 주지 않은, 독립적일 때 유용. * AllForOneStrategy : 한 Supervisor 액터 시스템 아래에 있는 모든 자식 Actor 전략 적용. 이는 특정 자식 Actor들이 종속적인 관계일 때 유용. ==== Monitoring ==== 특정 서비스가 살아있는 지 지속적으로 확인할 때 필요. // 감시 context.watch(childActor: ActorRef) // 감시 해제 context.unwatch(childActor: ActorRef) === 기본적인 예제 === 아래와 같은 Tree 구조로 운영한 뒤 Strategy, watch(감시, 모니터링)를 이용한다. ==== 부모 액터의 자식 액터 생성 ==== Master - Slave. 자식 액터의 장애를 부모가 처리하는 구조. ===== Message 정의 ===== package com.practice.ex01 object Message { case object CreateChild case class Greet(msg: String) } ===== 자식, 부모 Actor 정의 ===== package com.practice.ex01 import akka.actor.Actor import com.practice.ex01.Message.Greet class ChildActor extends Actor { override def receive: Receive = { case Greet(msg) => println(s"parent : ${self.path.parent} // me : ${self.path} // msg : ${msg}") } } package com.practice.ex01 import akka.actor.{Actor, Props} import com.practice.ex01.Message.{CreateChild, Greet} class ParentActor extends Actor{ override def receive: Receive = { case CreateChild => val child = context.actorOf(Props[ChildActor], "child") child ! Greet("Hello, child!") } } ===== Client ===== package com.practice.ex01 import akka.actor.{ActorSystem, Props} import com.practice.ex01.Message.CreateChild object Client extends App { val actorSystem = ActorSystem("Supervision") val parent = actorSystem.actorOf(Props[ParentActor], "parent") parent ! CreateChild // parent : akka://Supervision/user/parent // me : akka://Supervision/user/parent/child // msg : Hello, child! } == Routing == https://doc.akka.io/docs/akka/2.5/routing.html 언제 사용하는가? * 같은 형태의 액터들 중에서 가장 부하가 적은 액터에 메시지를 보내고 싶을 때 * 한 액터에 메시지를 Round-robin으로, 즉, Loop로 모든 액터에 메시지를 하나씩 보내고 싶을 때 * 그룹 내 모든 액터에 하나의 메시지를 보내고 싶을 때 * 액터 사이의 작업을 어떤 메커니즘으로 재분배하고 싶을 때 === Pool의 종류 === ==== SmallestMailboxPool ==== 메시지 수가 가장 적은 액터에 메시지 전달. 즉, 가장 덜 바쁜 액터에게 메시지 전달. class RoutingActor extends Actor { override def receive: Receive = { case msg: String => sender ! s"I am ${self.path.name}, I received $msg" case _ => println(s"I don't understand the message") } } package com.practice.ex01 import akka.actor.{ActorSystem, Props} import akka.routing.SmallestMailboxPool object Client extends App { val actorSystem = ActorSystem("Supervision"); val router = actorSystem.actorOf(SmallestMailboxPool(5).props(Props[SmallestMailboxActor])) 1 to 5 foreach(i => router ! s"Hello $i") /* I am $a I am $a I am $d I am $b I am $c */ } ==== BalancingPool ==== 액터 사이의 작업을 재분배. ====== BalancingPool vs SmallestMailboxPool ====== https://groups.google.com/forum/#!msg/akka-user/pymtrCZBduk/rwXKIdGUGvEJ https://doc.akka.io/docs/akka/2.5/routing.html#balancing-pool * BalancingPool은 각 라우팅되는 대상이 완전히 구분된 정체성을 가진 속성을 가지지 않는다. 대상들은 다름 이름들을 가지지만, 대부분 그 대상들은 결국 올바른 액터들에게 말하지 않는다. * 그렇기 때문에 BalacingPool은 Broadcast Messages에 사용하면 안된다. * SmallestMailboxPool은 BancingPool과 다른 이점으로 메시지 큐 구현에 제한을 두지 않는다. * SmallestMailboxPool이 더 낫다. 가장 덜 바쁜 액터에게 보내지는 것을 항상 알기 때문. ==== RoundRobinPool ==== 각 액터에게 하나씩 메시지 전달. ==== RandomPool ==== 말그대로 무작위. ==== BroadcastPool ==== **하나**의 같은 메시지를 모든 액터에게 전달. 모든 액터에게 같은 작업을 하도록 일반적인 명령을 보내고 싶을 때 사용. ==== Specially Handled Messages ==== https://doc.akka.io/docs/akka/2.5/routing.html#specially-handled-messages * Broadcast * PoisonPill * Kill * GetRoutees, Addroutee, RemoveRoutee, AdjustPoolSize 액터를 관리하는 데 사용. ==== ScatterGatherFirstCompletedPool ==== 같은 라우터를 공유하는 액터들 중에게 같은 메시지를 모두 보내고(Broadcast), 가장 먼저 작업을 마친 액터를 **기다려 응답을 송신**(ask) 후 나머지 액터 응답을 버림. 여러 서버 중 가장 빠르게 반응하는 서버에 작업을 보내는 상황에서 사용. package com.practice.ex01 import akka.actor.Actor class RoutingActor extends Actor { override def receive: Receive = { case msg: String => sender ! s"I am ${self.path.name}, I received $msg" case _ => println(s"I don't understand the message") } } package com.practice.ex01 import akka.actor.{ActorSystem, Props} import akka.pattern.ask import akka.routing.ScatterGatherFirstCompletedPool import akka.util.Timeout import scala.concurrent.duration._ import scala.concurrent.{Await, Future} object Client extends App { val duration = 10 seconds implicit val timeout = Timeout(duration) val actorSystem = ActorSystem("Hello-Akka"); val router = actorSystem .actorOf(ScatterGatherFirstCompletedPool(5, within = duration) .props(Props[RoutingActor])) val future: Future[Any] = router ? "hello" val response: String = Await.result(future.mapTo[String], duration) println(response) // I am $a, I received hello } ==== TailChoppingPool ==== 무작위로 고른 액터에게 메시지를 보내고, interval에서 지정한 약간의 지연 후 남은 액터들 중에 무작위로 고른 액터에게 송신하는 일을 계속한다. 첫 번째 응답이 수신되기를 기다리고, 이를 본래 송신자에게 전달, 나머지 응답은 버림. package com.practice.ex01 import akka.actor.Actor class RoutingActor extends Actor { override def receive: Receive = { case msg: String => sender ! s"I am ${self.path.name}, I received $msg" case _ => println(s"I don't understand the message") } } package com.practice.ex01 import akka.actor.{ActorSystem, Props} import akka.pattern.ask import akka.routing.TailChoppingPool import akka.util.Timeout import scala.concurrent.duration._ import scala.concurrent.{Await, Future} object Client extends App { val duration = 10 seconds implicit val timeout = Timeout(duration) val actorSystem = ActorSystem("Hello-Akka"); val router = actorSystem .actorOf(TailChoppingPool(5, within = duration, interval = 20 millis) .props(Props[RoutingActor])) val future: Future[Any] = router ? "hello!" val response: String = Await.result(future.mapTo[String], duration) println(response) } ==== ConsistentHashingPool ==== 말그대로 Hashing. [[https://en.wikipedia.org/wiki/Consistent_hashing|Consistent Hashing]] 사용. 같은 키를 가지는 메시지를 항상 같은 액터에게 전달. package com.practice.ex01 import akka.routing.ConsistentHashingRouter.ConsistentHashable object Message { final case class Create(key: String, value: String) final case class Get(key: String) extends ConsistentHashable { override def consistentHashKey: Any = key } final case class Remove(key: String) } package com.practice.ex01 import akka.actor.Actor import com.practice.ex01.Message._ class Cache extends Actor { var cache = Map.empty[String, String] override def receive: Receive = { case Create(key, value) => println(s"${self.path.name} : Create $key/$value") cache += (key -> value) case Get(key) => println(s"${self.path.name} : Get $key") context.sender ! cache.get(key) case Remove(key) => println(s"${self.path.name} : Remove $key") cache -= key } } package com.practice.ex01 import akka.actor.{ActorRef, ActorSystem, Props} import akka.pattern.ask import akka.routing.ConsistentHashingPool import akka.routing.ConsistentHashingRouter.{ConsistentHashMapping, ConsistentHashableEnvelope} import akka.util.Timeout import com.practice.ex01.Message._ import scala.concurrent.Await import scala.concurrent.duration._ object Client extends App { def hashMapping: ConsistentHashMapping = { case Remove(key) ⇒ key } val actorSystem = ActorSystem("Hello-Akka") val cache: ActorRef = actorSystem.actorOf( ConsistentHashingPool(10, hashMapping = hashMapping). props(Props[Cache]), name = "cache") // 생성 cache ! ConsistentHashableEnvelope(message = Create("hello", "HELLO"), hashKey = "hello") cache ! ConsistentHashableEnvelope(message = Create("hi", "HI"), hashKey = "hi") // Get 응답 받기, 삭제등 시험 val duration = 10 seconds implicit val timeout = Timeout(duration) val future1 = cache ? Get("hello") val response1 = Await.result(future1.mapTo[Any], duration) println(response1) val future2 = cache ? Get("hi") val response2 = Await.result(future2.mapTo[Any], duration) println(response2) cache ! Remove("hi") val future3 = cache ? Get("hi") val response3 = Await.result(future3.mapTo[Any], duration) println(response3) } $d : Create hello/HELLO $e : Create hi/HI $d : Get hello Some(HELLO) $e : Get hi Some(HI) $e : Remove hi $e : Get hi None ==== Dynamically Resizable Pool ==== https://doc.akka.io/docs/akka/2.5/routing.html#specially-handled-messages https://doc.akka.io/docs/akka/2.1.2/scala/routing.html package com.practice.ex01 import akka.actor.{Actor, ActorSystem, Props} import akka.pattern.ask import akka.routing.{DefaultResizer, ScatterGatherFirstCompletedPool} import akka.util.Timeout import scala.annotation.tailrec import scala.concurrent.duration._ import scala.concurrent.{Await, Future} // Message case class FibonacciNumber(nbr: Int) // Actor class FibonacciActor extends Actor { def receive = { case FibonacciNumber(nbr) => sender ! fibonacci(nbr) case _ => new IllegalArgumentException } private def fibonacci(n: Int): Int = { @tailrec def fib(n: Int, b: Int, a: Int): Int = n match { case 0 => a case _ => fib(n - 1, a + b, b) } fib(n, 1, 0) } } // Client object Client extends App { val duration = 10 seconds implicit val timeout = Timeout(duration) val akkaSystem = ActorSystem("Hello-Akka") val resizer = DefaultResizer(lowerBound = 4, upperBound = 15) val router = akkaSystem.actorOf( ScatterGatherFirstCompletedPool(5, within = duration, resizer = Some(resizer)) .props(Props[FibonacciActor])) val future: Future[Any] = router ? FibonacciNumber(20) val response: Int = Await.result(future.mapTo[Int], duration) println(response) //6765 } = Futures and Agents = * Future : **Concurrency** 혹은 **Pararell** 실행을 위해 다른 Thread에서 작업할 때 사용. Callback 및 다른 작업을 실행하기 위해 Thread Pool이 지원하는 ExcutionContext를 필요. * 단, Actor를 이용할 경우 ActorSystem 자체에서 제공하므로 명시적으로 필요하지 않다.