AKKA
동시성 Actor 모델의 현대적 구현체
Scala 기본 문법을 안다는 전제하에 실용적인 측면으로 기술.
기초
Reactive System
Resilient회복력 : 오류/예외 같은 장애가 발생하더라도 항상 응답하여 회복
Elastic탄력성 : Traffic 크기에 상관없이 할당된 자원을 조절해 부하를 서비스할 수 있게 항상 응답성을 유지
Message Driven메시지 기반 : 시스템의 구성 요소가 서로 느슨하게 결합되어 있으며 비동기 메시지 전달을 사용해 통신하고, 행동을 취함으로써 메시지에 반응.
Responsive응답성 : 위 세 속성을 만족하는 시스템을 “응답성이 있는 시스템”이라 한다.
Actor
연산
모든 연산은 비동기적!!
create (생성)
send (송신)
become (상태변화)
supervise (감독)
속성
State : Actor는 내부 상태가 있고, 이 상태는 Message를 처리함에 따라 순차적으로 변한다. (mutable)
Behavior : Actor는 송신된 Message에 행위를 적용함으로써 반응한다.
Communication : Actor는 다른 Actor에게 Message를 송신하거나 수신함으로써 서로 통신한다.
Mailbox : Mailbox는 Actor가 메시지를 가져오고 처리하는 Message Queue다.
기본 Actor System Project 생성
환경 설정
Command Line
Scala의 Ant/Maven/Gradle와 비교될 수 있음. 직접으로 프로젝트 생성, 배포 관여.
Create templates
Fat Jar
https://github.com/sbt/sbt-assembly
# Build (Fat Jar 생성)
## build.sbt에
## addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.15.6")
## 추가 필요
### 생성된 jar 위치 : target/scala-[version]/*.jar
sbt assembly
Scala Binaries (선택 사항)
# 실행
scala
scala> 1
res0: Int = 1
# 종료
scala> :quit
## 혹은
scala> :q
IDE
- Intellij에 기본적으로 SBT 내장. Intellij만 설치하면 된다.
REPL 실행 방법
왼쪽 Project Tree에서 아무 Class에 오른쪽으로 누르고 “Run Scala Console”.
Project 생성
아래 과정은 console에서 수행할 경우 필요한 작업. Intellij로 sbt 프로젝트 생성하면 더 쉽게 만들 수 있다.
Project를 생성할 Directory 생성
생성한 Directory로 이동해서 “sbt” 실행
sbt 콘솔에서 아래 과정을 수행하면 “build.sbt” Build 설정 파일과 Class 파일이 놓일 “target” Directory가 생성된다.
set name := "hello-akka"
set version := "0.1"
set scalaVersion := "2.12.6"
session save
exit
build.sbt를 열고 아래 내용을 추가한다.
// https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.14"
src/main/scala/ 밑에 프로젝트 이름의 패키지와 클래스를 작성.
package com.practice.ex01
import akka.actor.ActorSystem
object HelloAkkaActorSystem extends App{
val actorSystem = ActorSystem("HelloAkka")
println(actorSystem)
// akka://HelloAkka
}
console에서
sbt "runMain com.practice.ex01.HelloAkkaActorSystem"
기본적인 Ask-Tell 예제
scala에서 ask는 “?”, tell은 “!” method를 사용한다.
package com.practice.ex01
import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.Actor
object HelloAkkaActorSystem extends App {
val actorSystem = ActorSystem("HelloAkka")
val actor = actorSystem.actorOf(Props(classOf[SumActor]), "sumActor")
// ask를 사용하려면 timeout을 반드시 명시해야 함.
implicit val timeout = Timeout(10 seconds)
val future = actor ? (1 to 10).toArray
val result = Await.result(future, 10 seconds)
println(s"result : $result")
}
class SumActor extends Actor {
override def receive: Receive = {
case arr: Array[Int] => sender ! arr.sum
case _ => sender ! "Error!"
}
}
Actor간 통신 (tell을 이용한 결과 수신)
ask는 기본적으로 blocking. 그러므로 비슷하게 tell을 이용하여 서로 주고 받는 callback 형식으로 구현하므로 더 경제적.
QueryActor에서 Start로 연산 요청. 연산할 Actor인 SumActor와 값을 전달 받음.
SumActor에서 전달 받은 값을 연산하고 sender(QueryActor)의 Complete로 전달
QueryActor의 Complete에서 연산 결과 수신.
package com.practice.ex01
import akka.actor.{ActorSystem, Props}
import com.practice.ex01.Message.Start
object HelloAkkaActorSystem extends App {
val actorSystem = ActorSystem("HelloAkka")
val queryActor = actorSystem.actorOf(Props(classOf[QueryActor]), "queryActor")
val sumActor = actorSystem.actorOf(Props(classOf[SumActor]), "sumActor")
queryActor ! Start(sumActor, (1 to 10).toArray)
}
package com.practice.ex01
import akka.actor.ActorRef
object Message {
case class Start(actorRef: ActorRef, arr: Array[Int])
case class Progress(arr: Array[Int])
case class Complete(result: Int)
}
package com.practice.ex01
import akka.actor.Actor
import com.practice.ex01.Message._
class QueryActor extends Actor{
override def receive: Receive = {
case Start(actorRef, arr) =>
println("Start!")
actorRef ! Progress(arr)
case Complete(result) =>
println(s"Complete! $result")
case _ => println("QueryActor - WTHIGO?")
}
}
package com.practice.ex01
import akka.actor.Actor
import com.practice.ex01.Message._
class SumActor extends Actor {
override def receive: Receive = {
case Progress(arr) =>
println("Progress!")
sender ! Complete(arr.sum)
case _ => sender ! "SumActor - WTHIGO?"
}
}
사용자 정의 Mailbox 만들기
Actor가 메시지를 가져오는 방법을 제어할 때 사용.
Mailbox 정의
package com.practice.ex01
import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch.{MailboxType, MessageQueue, ProducesMessageQueue}
import com.typesafe.config.Config
class MyCustomMailbox extends MailboxType with ProducesMessageQueue[MyMessageQueue] {
def this(setting: ActorSystem.Settings, config: Config) = this()
override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = {
println("MyCustomMailbox is created!")
new MyMessageQueue()
}
}
package com.practice.ex01
import java.util.concurrent.ConcurrentLinkedQueue
import akka.actor.ActorRef
import akka.dispatch.{Envelope, MessageQueue}
class MyMessageQueue extends MessageQueue{
private final val queue = new ConcurrentLinkedQueue[Envelope]()
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
println("enqueue!")
if(handle.sender.path.name == "MyActor") {
handle.sender ! "I know you!"
queue.offer(handle)
}
else
handle.sender ! "Who are you?"
}
override def dequeue(): Envelope = queue.poll
override def numberOfMessages: Int = queue.size
override def hasMessages: Boolean = !queue.isEmpty
override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
while (hasMessages) {
deadLetters.enqueue(owner, dequeue())
}
}
}
custom-dispatcher {
mailbox-requirement = "com.practice.ex01.MyMessageQueue"
}
akka.actor.mailbox.requirements {
"com.practice.ex01.MyMessageQueue" = custom-dispatcher-mailbox
}
custom-dispatcher-mailbox {
mailbox-type = "com.practice.ex01.MyCustomMailbox"
}
Actor 정의
package com.practice.ex01
import akka.actor.Actor
class MySpecialActor extends Actor {
override def receive: Receive = {
case msg: String => println(s"MySpecialActor's msg : $msg")
}
}
package com.practice.ex01
import akka.actor.{Actor, ActorRef}
class MyActor extends Actor {
override def receive: Receive = {
case (msg: String, actorRef: ActorRef) => actorRef ! msg
case msg => println(s"MyActor's msg? $msg")
}
}
Client
package com.practice.ex01
import akka.actor.{ActorSystem, Props}
object Client extends App {
val actorSystem = ActorSystem("HelloAkka")
// application.conf에 기술된 custom-dispatcher을 읽고
// 사용자 정의 Mailbox(MyCustomMailBox)가 먼저 생성되고,
// 그 안에 create() method로 message queue가 생성된다.
val mySpecialActor = actorSystem.actorOf(Props[MySpecialActor]
.withDispatcher("custom-dispatcher"))
val unknownActor = actorSystem.actorOf(Props[MyActor], "abracadabra")
val knownActor = actorSystem.actorOf(Props[MyActor], "MyActor")
unknownActor ! ("hello", mySpecialActor)
knownActor ! ("hello", mySpecialActor)
// 비동기적이므로 아래 결과는 바뀔 수 있음.
/*
MyCustomMailbox is created!
enqueue!
enqueue!
MyActor's msg? Who are you?
MyActor's msg? I know you!
MySpecialActor's msg : hello
*/
}
수신 메시지의 우선순위 정의
우선순위에 따라 특정 메시지를 먼저 처리하고 싶을 때 사용. 설정은 위와 비슷하므로 상세한 설명은 생략.
Mailbox 정의
package com.practice.ex01
import akka.actor.ActorSystem
import akka.dispatch.{PriorityGenerator, UnboundedPriorityMailbox}
import com.typesafe.config.Config
class MyPriorityMailbox(settings: ActorSystem.Settings, config: Config)
extends UnboundedPriorityMailbox (
PriorityGenerator {
// int 값이 작을수록 우선순위가 높다.
case x: Int => 1
case x: String => 0
case _ => 3
}
)
my-priority-mailbox {
mailbox-type = "com.practice.ex01.MyPriorityMailbox"
}
Actor 정의
package com.practice.ex01
import akka.actor.Actor
class MyPriorityActor extends Actor{
override def receive: Receive = {
case x: Int => println(x)
case x: String => println(x)
case x => println(x)
}
}
Client
package com.practice.ex01
import akka.actor.{ActorSystem, Props}
object Client extends App {
val actorSystem = ActorSystem("HelloAkka")
val myPriorityActor = actorSystem.actorOf(Props[MyPriorityActor]
.withDispatcher("my-priority-mailbox"))
Array('LowestPriority, 123, "Top Priority!", 1.23).foreach(myPriorityActor ! _)
/* 결과적으로 String, int 순으로 우선순위가 높음. */
/*
Top Priority!
123
1.23
'LowestPriority
*/
}
때와 상관없이 항상 우선권을 갖는 메일박스 만들기
항상 우선권을 가질 Message 정의
package com.practice.ex01
import akka.dispatch.ControlMessage
case object MyControlMessage extends ControlMessage
Mailbox 정의
control-aware-mailbox {
mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"
}
Actor 정의
package com.practice.ex01
import akka.actor.Actor
class Logger extends Actor{
override def receive: Receive = {
case MyControlMessage => println("Control message first!")
case x => println(x)
}
}
Client
package com.practice.ex01
import akka.actor.{ActorSystem, Props}
object Client extends App {
val actorSystem = ActorSystem("HelloAkka")
val loggerActor = actorSystem.actorOf(Props[Logger]
.withDispatcher("control-aware-mailbox"))
loggerActor ! "Hello,"
loggerActor ! "world!"
loggerActor ! MyControlMessage // 우선적으로 처리
/*
Control message first!
Hello,
world!
*/
}
Runtime중에 Actor 행위 바꾸기
Message 정의
package com.practice.ex01
object StateMessage {
case class IntState()
case class StringState()
}
Actor 정의
package com.practice.ex01
import akka.actor.Actor
import com.practice.ex01.StateMessage.{IntState, StringState}
class StateChangingActor extends Actor {
override def receive: Receive = {
case StringState => context.become(isStateString)
case IntState => context.become(isStateInt)
}
def isStateString: Receive = {
case msg : String => println(s"$msg")
case IntState => context.become(isStateInt)
}
def isStateInt: Receive = {
case msg : Int => println(s"$msg")
case StringState => context.become(isStateString)
}
}
Client
package com.practice.ex01
import akka.actor.{ActorSystem, Props}
import com.practice.ex01.StateMessage.{IntState, StringState}
object Client extends App {
val actorSystem = ActorSystem("HelloAkka")
val stateChangingActor = actorSystem.actorOf(Props[StateChangingActor])
stateChangingActor ! StringState
stateChangingActor ! "Hello, world!"
stateChangingActor ! IntState
stateChangingActor ! 123
stateChangingActor ! "abracadabra" // 현재 state가 IntState이기 때문에 무시됨.
/*
Hello, world!
123
*/
}
Actor 중지시키기
https://stackoverflow.com/questions/13847963/akka-kill-vs-stop-vs-poison-pill
PoisonPill : Mailbox에 대기중인 메시지를 모두 처리 정지
Kill : ActorKilledException를 발생시킴. 이에 대한 구체적인 동작은 supervisor mechanism을 따른다. 기본값은 Actor를 중지. 그리고 Mailbox가 유지되므로 실패를 발생시킨 메시지를 제외한 나머지 메시지는 그대로 남는다.
context.stop(actorRef) : Actor를 순차적으로 정지시킬때 사용. Bottom-up(자식, 부모, 액터 시스템) 순서로 정지. 현재 처리중인 Message 완료후 다른 메시지는 모두 무시하고 정지시킨다.
package com.practice.ex01
import akka.actor.Actor
class ShutdownActor extends Actor {
override def receive: Receive = {
case msg: String => println(s"$msg")
case Stop => context.stop(self)
}
}
package com.practice.ex01
import akka.actor.{ActorSystem, Kill, PoisonPill, Props}
object Client extends App {
val actorSystem = ActorSystem("HelloAkka")
val shutdownActor1 = actorSystem.actorOf(Props[ShutdownActor])
shutdownActor1 ! "hello1"
shutdownActor1 ! PoisonPill
shutdownActor1 ! "Are you breathing?"
val shutdownActor2 = actorSystem.actorOf(Props[ShutdownActor])
shutdownActor2 ! "hello2"
shutdownActor2 ! Kill
shutdownActor2 ! "Are you breathing?"
val shutdownActor3 = actorSystem.actorOf(Props[ShutdownActor])
shutdownActor3 ! "hello3"
shutdownActor3 ! Stop
shutdownActor3 ! "Are you breathing?"
/*
hello2
hello1
hello3
scala> [INFO] [08/06/2018 00:25:30.860] [HelloAkka-akka.actor.default-dispatcher-2] [akka://HelloAkka/user/$c] Message [java.lang.String] without sender to Actor[akka://HelloAkka/user/$c#303943664] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://HelloAkka/user/$c#303943664]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [08/06/2018 00:25:30.861] [HelloAkka-akka.actor.default-dispatcher-2] [akka://HelloAkka/user/$a] Message [java.lang.String] without sender to Actor[akka://HelloAkka/user/$a#-1903145472] was not delivered. [2] dead letters encountered. If this is not an expected behavior, then [Actor[akka://HelloAkka/user/$a#-1903145472]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[ERROR] [08/06/2018 00:25:30.862] [HelloAkka-akka.actor.default-dispatcher-5] [akka://HelloAkka/user/$b] Kill (akka.actor.ActorKilledException: Kill)
[INFO] [08/06/2018 00:25:30.863] [HelloAkka-akka.actor.default-dispatcher-2] [akka://HelloAkka/user/$b] Message [java.lang.String] without sender to Actor[akka://HelloAkka/user/$b#14529698] was not delivered. [3] dead letters encountered. If this is not an expected behavior, then [Actor[akka://HelloAkka/user/$b#14529698]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
*/
}
Fault tolerant system
장애 발생시 정지되는 것이 아닌 처리량이 감소한 가동으로 항상 반응성을 유지하며, 완전 가동중과 비교하여 정책적으로 더 혹은 덜 가동되는 시스템.
System design
염두할 사항
시스템을 컴포넌트로 나누기 : “어떤 기능에 대한 책임” 단위로 나눈다. 이 때, 연쇄적 장애 발생시 각 컴포넌트는 간섭이 없는 독립적인 형태로 서로 영향이 없어야 한다. 특히 Core 컴포넌트는 더더욱 장애 발생으로 부터 독립적이어야 한다.
Backup : 장애 발생에 대비. 고가용성.
구축 방법
Duplication : 컴포넌트 여러개 실행
Replication : 같은 컴포넌트를 여러개 준비, 모두에게 요청 후 송신하고, 그 중 하나 선택
Isolation : Multi process. 다른 컴포넌트의 실패에 영향을 받지 않게 하려는 목적
Delegation : 장애 발생시 정상 실행중인 컴포넌트에게 넘기려는 목적.
Actor Life cycle
Supervison and Monitoring
Strategy
OneForOneStrategy : Default. 고장난 자식 Actor만 Restart/Resume 혹은 상위 Actor에게 보고. 이는 각 자식 Actor들이 각각 영향을 주지 않은, 독립적일 때 유용.
AllForOneStrategy : 한 Supervisor 액터 시스템 아래에 있는 모든 자식 Actor 전략 적용. 이는 특정 자식 Actor들이 종속적인 관계일 때 유용.
Monitoring
특정 서비스가 살아있는 지 지속적으로 확인할 때 필요.
// 감시
context.watch(childActor: ActorRef)
// 감시 해제
context.unwatch(childActor: ActorRef)
기본적인 예제
아래와 같은 Tree 구조로 운영한 뒤 Strategy, watch(감시, 모니터링)를 이용한다.
부모 액터의 자식 액터 생성
Master - Slave. 자식 액터의 장애를 부모가 처리하는 구조.
Message 정의
package com.practice.ex01
object Message {
case object CreateChild
case class Greet(msg: String)
}
자식, 부모 Actor 정의
package com.practice.ex01
import akka.actor.Actor
import com.practice.ex01.Message.Greet
class ChildActor extends Actor {
override def receive: Receive = {
case Greet(msg) => println(s"parent : ${self.path.parent} // me : ${self.path} // msg : ${msg}")
}
}
package com.practice.ex01
import akka.actor.{Actor, Props}
import com.practice.ex01.Message.{CreateChild, Greet}
class ParentActor extends Actor{
override def receive: Receive = {
case CreateChild =>
val child = context.actorOf(Props[ChildActor], "child")
child ! Greet("Hello, child!")
}
}
Client
package com.practice.ex01
import akka.actor.{ActorSystem, Props}
import com.practice.ex01.Message.CreateChild
object Client extends App {
val actorSystem = ActorSystem("Supervision")
val parent = actorSystem.actorOf(Props[ParentActor], "parent")
parent ! CreateChild
// parent : akka://Supervision/user/parent // me : akka://Supervision/user/parent/child // msg : Hello, child!
}
Routing
Pool의 종류
SmallestMailboxPool
메시지 수가 가장 적은 액터에 메시지 전달. 즉, 가장 덜 바쁜 액터에게 메시지 전달.
class RoutingActor extends Actor {
override def receive: Receive = {
case msg: String => sender ! s"I am ${self.path.name}, I received $msg"
case _ => println(s"I don't understand the message")
}
}
package com.practice.ex01
import akka.actor.{ActorSystem, Props}
import akka.routing.SmallestMailboxPool
object Client extends App {
val actorSystem = ActorSystem("Supervision");
val router = actorSystem.actorOf(SmallestMailboxPool(5).props(Props[SmallestMailboxActor]))
1 to 5 foreach(i => router ! s"Hello $i")
/*
I am $a
I am $a
I am $d
I am $b
I am $c
*/
}
BalancingPool
BalancingPool vs SmallestMailboxPool
RoundRobinPool
RandomPool
BroadcastPool
하나의 같은 메시지를 모든 액터에게 전달. 모든 액터에게 같은 작업을 하도록 일반적인 명령을 보내고 싶을 때 사용.
Specially Handled Messages
ScatterGatherFirstCompletedPool
같은 라우터를 공유하는 액터들 중에게 같은 메시지를 모두 보내고(Broadcast),
가장 먼저 작업을 마친 액터를 기다려 응답을 송신(ask) 후 나머지 액터 응답을 버림. 여러 서버 중 가장 빠르게 반응하는 서버에 작업을 보내는 상황에서 사용.
package com.practice.ex01
import akka.actor.Actor
class RoutingActor extends Actor {
override def receive: Receive = {
case msg: String => sender ! s"I am ${self.path.name}, I received $msg"
case _ => println(s"I don't understand the message")
}
}
package com.practice.ex01
import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.routing.ScatterGatherFirstCompletedPool
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
object Client extends App {
val duration = 10 seconds
implicit val timeout = Timeout(duration)
val actorSystem = ActorSystem("Hello-Akka");
val router = actorSystem
.actorOf(ScatterGatherFirstCompletedPool(5, within = duration)
.props(Props[RoutingActor]))
val future: Future[Any] = router ? "hello"
val response: String = Await.result(future.mapTo[String], duration)
println(response)
// I am $a, I received hello
}
TailChoppingPool
무작위로 고른 액터에게 메시지를 보내고, interval에서 지정한 약간의 지연 후 남은 액터들 중에 무작위로 고른 액터에게 송신하는 일을 계속한다. 첫 번째 응답이 수신되기를 기다리고, 이를 본래 송신자에게 전달, 나머지 응답은 버림.
package com.practice.ex01
import akka.actor.Actor
class RoutingActor extends Actor {
override def receive: Receive = {
case msg: String => sender ! s"I am ${self.path.name}, I received $msg"
case _ => println(s"I don't understand the message")
}
}
package com.practice.ex01
import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.routing.TailChoppingPool
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
object Client extends App {
val duration = 10 seconds
implicit val timeout = Timeout(duration)
val actorSystem = ActorSystem("Hello-Akka");
val router = actorSystem
.actorOf(TailChoppingPool(5, within = duration, interval = 20 millis)
.props(Props[RoutingActor]))
val future: Future[Any] = router ? "hello!"
val response: String = Await.result(future.mapTo[String], duration)
println(response)
}
ConsistentHashingPool
말그대로 Hashing.
Consistent Hashing 사용.
같은 키를 가지는 메시지를 항상 같은 액터에게 전달.
package com.practice.ex01
import akka.routing.ConsistentHashingRouter.ConsistentHashable
object Message {
final case class Create(key: String, value: String)
final case class Get(key: String) extends ConsistentHashable {
override def consistentHashKey: Any = key
}
final case class Remove(key: String)
}
package com.practice.ex01
import akka.actor.Actor
import com.practice.ex01.Message._
class Cache extends Actor {
var cache = Map.empty[String, String]
override def receive: Receive = {
case Create(key, value) =>
println(s"${self.path.name} : Create $key/$value")
cache += (key -> value)
case Get(key) =>
println(s"${self.path.name} : Get $key")
context.sender ! cache.get(key)
case Remove(key) =>
println(s"${self.path.name} : Remove $key")
cache -= key
}
}
package com.practice.ex01
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.routing.ConsistentHashingPool
import akka.routing.ConsistentHashingRouter.{ConsistentHashMapping, ConsistentHashableEnvelope}
import akka.util.Timeout
import com.practice.ex01.Message._
import scala.concurrent.Await
import scala.concurrent.duration._
object Client extends App {
def hashMapping: ConsistentHashMapping = {
case Remove(key) ⇒ key
}
val actorSystem = ActorSystem("Hello-Akka")
val cache: ActorRef =
actorSystem.actorOf(
ConsistentHashingPool(10, hashMapping = hashMapping).
props(Props[Cache]), name = "cache")
// 생성
cache ! ConsistentHashableEnvelope(message = Create("hello", "HELLO"), hashKey = "hello")
cache ! ConsistentHashableEnvelope(message = Create("hi", "HI"), hashKey = "hi")
// Get 응답 받기, 삭제등 시험
val duration = 10 seconds
implicit val timeout = Timeout(duration)
val future1 = cache ? Get("hello")
val response1 = Await.result(future1.mapTo[Any], duration)
println(response1)
val future2 = cache ? Get("hi")
val response2 = Await.result(future2.mapTo[Any], duration)
println(response2)
cache ! Remove("hi")
val future3 = cache ? Get("hi")
val response3 = Await.result(future3.mapTo[Any], duration)
println(response3)
}
$d : Create hello/HELLO
$e : Create hi/HI
$d : Get hello
Some(HELLO)
$e : Get hi
Some(HI)
$e : Remove hi
$e : Get hi
None
Dynamically Resizable Pool
https://doc.akka.io/docs/akka/2.5/routing.html#specially-handled-messages
https://doc.akka.io/docs/akka/2.1.2/scala/routing.html
package com.practice.ex01
import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.ask
import akka.routing.{DefaultResizer, ScatterGatherFirstCompletedPool}
import akka.util.Timeout
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
// Message
case class FibonacciNumber(nbr: Int)
// Actor
class FibonacciActor extends Actor {
def receive = {
case FibonacciNumber(nbr) => sender ! fibonacci(nbr)
case _ => new IllegalArgumentException
}
private def fibonacci(n: Int): Int = {
@tailrec
def fib(n: Int, b: Int, a: Int): Int = n match {
case 0 => a
case _ => fib(n - 1, a + b, b)
}
fib(n, 1, 0)
}
}
// Client
object Client extends App {
val duration = 10 seconds
implicit val timeout = Timeout(duration)
val akkaSystem = ActorSystem("Hello-Akka")
val resizer = DefaultResizer(lowerBound = 4, upperBound = 15)
val router = akkaSystem.actorOf(
ScatterGatherFirstCompletedPool(5, within = duration, resizer = Some(resizer))
.props(Props[FibonacciActor]))
val future: Future[Any] = router ? FibonacciNumber(20)
val response: Int = Await.result(future.mapTo[Int], duration)
println(response)
//6765
}
Futures and Agents