Akka について勉強する。
終了まで随時更新
教科書はこれ
http://www.slideshare.net/scalaconfjp/scaling-software-with-akka
概要から読んでいく。
重要そうな箇所だけまずはメモ
Program at higher level
- 共有するステータス、ステータスの可視性、スレッド、ロック、同期、スレッドの通知など等を考えない
- 低レベルの並列動作*1はシンプルなワークフローに落ちる。プログラマはシステムのメッセージフローだけ考えればいい。
- 高レベルなCOUユーティリティ、低レイテンシ、高スループットとスケーラビリティを簡単に得られる
- エラーを検出し、回復するための実証済みの優れたモデルの提供
Distributable by Design
- Actor はロケーション透過性と配布可能なデザインをしている
- スケールアップ・ダウンが容易
- クラウドにおける完璧な構造
これらをAkkaが実現すると。
What is an Actor?
- コード構成を含むAkkaユニットをActorという
- Actorでは、並列的、スケーラブルでフォールトトレラントなアプリケーションを作成するのに役立ちます
- Java EE servlets と session beans のように、ビジネスロジックから様々な決定すべきポリシーを切り離す
- Actor は多くのJavaコミュニティにとって新しいものかもしれない、だが、電気通信システムと9 nines *2によって、長年にわたって実証されてきたコンセプト(Hewitt 1973)です
What can I use Actors for?(何に使うの?)
- a thread
- an object instance or component
- a callback or listener
- a singleton or service
- a router, load-balancer or pool
- a Java EE Session Bean or Message-Driven Bean
- an out-of-process service
- a Finite State Machine (FSM)
4 core Actor operations
- DEFINE
- CREATE
- SEND
- BECOME
- SUPERVISE
昨日寝ちゃったので続き
CREATE
説明いらんよね。
import akka.actor.{ActorLogging, Props, Actor, ActorSystem} import org.slf4j.LoggerFactory case class Greeting(who: String) class GreetingActor extends Actor with ActorLogging { def receive = { case Greeting(who) => log.info("Hello " + who) } } object Sample extends App { val logger = LoggerFactory.getLogger("timeLogging") val system = ActorSystem("SampleSystem") val greeter = system.actorOf(Props[GreetingActor], name = "greeter") }
system.actorOf を使ってActorRef を得るが、階層構造を作る場合、Actor 内で context.actorOf で Actor を拾えばよい。
その階層構造はディレクトリ型の名前解決でアクセスできる。
SEND
Actor にメッセージを通知する。非同期かつブロックせずに動作可能だ。
greeter ! Greeting("White azalea")
ROUTERS
てかルータって何なのか説明がないのでどういうものなのか?
http://doc.akka.io/docs/akka/snapshot/java/routing.html
ルータは、メッセージを受信して、効率的にrouteesとして知られている他のアクターにルーティングするアクターである。
異なるルーティング戦略をアプリケーションのニーズに応じて使用することができる。
作ることもできるお。デフォだとこんなんあるぜよ
akka.routing.RoundRobinRouter
akka.routing.RandomRouter
akka.routing.SmallestMailboxRouter
akka.routing.BroadcastRouter
akka.routing.ScatterGatherFirstCompletedRouter
akka.routing.ConsistentHashingRouter
とな。まぁ名前の通りで安心しました。
val greeter = system.actorOf( Props[GreetingActor].withRouter(RoundRobinRouter(nrOfInstances = 5)), name = "greeter")
Router+Resizer
Resizer とは何ぞや?→さっきのURL
動的にサイズ変更可能なルータ
Dynamically Resizable Routers
rutouters はこ定数の routees やリサイズ戦略、routees数の調整を行うことができます。
All routers can be used with a fixed number of routees or with a resize strategy to adjust the number of routees dynamically.
val system = ActorSystem("SampleSystem") val resizer = DefaultResizer(lowerBound=2, upperBound = 15) val greeter = system.actorOf( Props[GreetingActor].withRouter(RoundRobinRouter(resizer = Some(resizer))), name = "greeter")
もしくは設定ファイルからいける。
これはやったはず。
akka.actor.deployment { /greeter { router = round-robin resizer { lower-bound = 8 upper-bound = 15 } } }
実行してみる
[info] Compiling 1 Scala source to C:\common\projects\AkkaSample\target\scala-2.10\classes... [info] Running Sample [INFO] [04/07/2013 18:30:32.334] [SampleSystem-akka.actor.default-dispatcher-5] [akka://SampleSystem/user/greeter/$a] Hello alpha [INFO] [04/07/2013 18:30:32.353] [SampleSystem-akka.actor.default-dispatcher-1] [akka://SampleSystem/user/greeter/$b] Hello bravo [INFO] [04/07/2013 18:30:32.354] [SampleSystem-akka.actor.default-dispatcher-1] [akka://SampleSystem/user/greeter/$b] Hello delta [INFO] [04/07/2013 18:30:32.354] [SampleSystem-akka.actor.default-dispatcher-5] [akka://SampleSystem/user/greeter/$a] Hello cherie [INFO] [04/07/2013 18:30:32.355] [SampleSystem-akka.actor.default-dispatcher-1] [akka://SampleSystem/user/greeter/$b] Hello foxtrot [INFO] [04/07/2013 18:30:32.355] [SampleSystem-akka.actor.default-dispatcher-5] [akka://SampleSystem/user/greeter/$a] Hello echo [INFO] [04/07/2013 18:30:32.356] [SampleSystem-akka.actor.default-dispatcher-5] [akka://SampleSystem/user/greeter/$a] Hello golf
BECOME
- BECOME - 動的なアクターの再定義
- メッセージ受信による応答のトリガー
- In a type system analogy it is as if the object changed type - changed interface, protocol & implementation*3
- それらがメッセージに対して異なる反応を示します
- これらは動作がスタックされ、これはプッシュ、ポップすることができる。
クラッシュ耐性
Akka に関していえば Java/C/C# やそれ以外の手続き+オブジェクト指向な防御的プログラミングなんてしない。
てーかそのスレッド落ちたら終わりとかそういう話になってしまう。
クラッシュさせよーぜ!
SUPERVISE
- SUPERVISE はほかのアクターの落ちを管理する
- エラーハンドリングはこのスーパバイザーに一任される
- これが何を意味するかというと、Actorに何か問題が発生した場合、そのSupervisorに通知される
- これでエラーハンドリングをきれいに分離できるわけだ
SUPERVISE Actor
ひとつひとつのアクターは、デフォルトのスーパーバイザ戦略を持っています。これは通常は十分です。しかし、それは上書きすることができます。
import akka.actor.SupervisorStrategy._ override def supervisorStrategy = AllForOneStrategy(maxNrOfRetries = 10, withinTimeRange = Duration(1, "minute")) { case _: NullPointerException => Restart case _:Exception => Escalate }
同様に、Actor の preRestart, postRestart もあるでな。
リモートにデプロイすることもできますねん。
akka.actor{ provider = akka.remote.RemoteActorRefProvider deployment { /greeter { remote=akka://RemoteSystem@hostName:2552 } } }
デプロイはこれだけ。
使うほうは
val sample = system.actorFor("akka://SampleSystem@hostName:2552/path/to/actor")
クラスタリングも設定でいける
akka{ actor{ provider = "akka.cluster.ClusterActorRefProvider" deployment { /greeter { router = consistent-hashing nr-of-instances = 180 cluster { enabled=on max-nr-of-instances-per-node=3 allow-local-routers=on } } } } cluster { seed-nodes = { "akka://ClusterSystem1@host:2552", "akka://ClusterSystem2@host:2552", "akka://ClusterSystem3@host:2552" } auto-down = on } }