読者です 読者をやめる 読者になる 読者になる

謎言語使いの徒然

適当に気になった技術や言語を流すブログ。

続、Akka勉強する

scala 勉強

随時更新

Akka について勉強する。 - 謎言語使いの徒然 の続き

Akka2.2-SNAPSHOT で大分 Cluster に動きがあった。流石試験実装、、、、安定しない。 設定ファイルの中身を換えればまだ動きはするが、この際だから最初から考え直す。*1

まずサンプルを求めてアクセスしたものの、https://github.com/akka/akka は 10:55 時点でメンテ中(?)らしい。 仕方ないので http://doc.akka.io/docs/akka/2.2-M3/scala/cluster-usage.html の写経から理解を目指す。

sbt 設定を:

libraryDependencies ++= Seq(
  "org.scala-lang" % "scala-library" % "2.10.0" % "provided",
  "org.scala-lang" % "scala-compiler" % "2.10.0",
  "com.typesafe.akka" % "akka-actor_2.10" % "2.2-M3",
  "com.typesafe.akka" %% "akka-cluster-experimental" % "2.2-M3",
  "ch.qos.logback" % "logback-classic" % "1.0.9"
)

application.conf に:

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }
  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"
    ]
    auto-down = on
  }
}

で、コードが:

package sample.cluster.simple

import akka.actor._
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._

class SimpleClusterListener extends Actor with ActorLogging {
  def receive = {
    case state: CurrentClusterState ⇒
      log.info("Current members: {}", state.members)
    case MemberUp(member) ⇒
      log.info("Member is Up: {}", member)
    case UnreachableMember(member) ⇒
      log.info("Member detected as unreachable: {}", member)
    case MemberRemoved(member) ⇒
      log.info("Member is Removed: {}", member)
    case _: ClusterDomainEvent ⇒ // ignore
  }
}

object SimpleClusterApp extends App {
  if (args.nonEmpty) System.setProperty("akka.remote.netty.tcp.port", args(0))

  val system = ActorSystem("ClusterSystem")
  val ref    = system.actorOf(Props[SimpleClusterListener], name = "clusterListener")
  Cluster(system).subscribe(ref, classOf[ClusterDomainEvent])
}

SimpleClusterListener で受け取ってるものはすべてデフォルトで用意されている型でした。 デフォルトのイベント体系なのかもしれない。クラス名からおおよそ何のイベントかは把握できそう。

起動コマンドが公式で:

run-main sample.cluster.simple.SimpleClusterApp 2551

試してみると:

[info] Compiling 1 Scala source to C:\common\projects\Akka\RemoteCluster\target\scala-2.10\classes...
[info] Running sample.cluster.simple.SimpleClusterApp 2552
[INFO] [05/12/2013 11:13:54.796] [run-main] [Remoting] Starting remoting
[INFO] [05/12/2013 11:13:55.549] [run-main] [Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2552]
[INFO] [05/12/2013 11:13:55.644] [run-main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - is starting up...
[INFO] [05/12/2013 11:13:55.647] [run-main] [Cluster(akka://ClusterSystem)] Using a dedicated scheduler for cluster. Default scheduler can be used if configured with 'akka.scheduler.tick-duration' [100 ms] <=  'akka.cluster.scheduler.tick-duration' [33 ms].
[INFO] [05/12/2013 11:13:55.865] [run-main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - registered cluster JMX MBean [akka:type=Cluster]
[INFO] [05/12/2013 11:13:55.866] [run-main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - has started up successfully
[INFO] [05/12/2013 11:13:55.928] [ClusterSystem-akka.actor.default-dispatcher-7] [MetricsCollector(akka://ClusterSystem)] Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [05/12/2013 11:13:55.950] [ClusterSystem-akka.actor.default-dispatcher-7]     [akka://ClusterSystem/system/cluster/metrics] Metrics collection has started successfully on node     [akka.tcp://ClusterSystem@127.0.0.1:2552]
[INFO] [05/12/2013 11:13:55.996] [ClusterSystem-akka.actor.default-dispatcher-7] [akka://ClusterSystem/user/clusterListener] Current members: TreeSet()

それっぽい。 また、2番目のノードを起動すると、上記っぽいのに加え

join:

> run-main sample.cluster.simple.SimpleClusterApp 2551

中略

[akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles []
[INFO] [05/12/2013 11:16:24.889] [ClusterSystem-akka.actor.default-dispatcher-8] [akka://ClusterSystem/system/cluster/core/daemon] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2551] from [Joining] to [Up]
[INFO] [05/12/2013 11:16:24.900] [ClusterSystem-akka.actor.default-dispatcher-19] [akka://ClusterSystem/user/clusterListener] Member is Up: {Member(address = akka.tcp://ClusterSystem@127.0.0.1:2551, status = Up)
[INFO] [05/12/2013 11:16:29.267] [ClusterSystem-akka.actor.default-dispatcher-19] [akka://ClusterSystem/system/cluster/core/daemon] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles []
[INFO] [05/12/2013 11:16:29.840] [ClusterSystem-akka.actor.default-dispatcher-7] [akka://ClusterSystem/system/cluster/core/daemon] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2552] from [Joining] to [Up]
[INFO] [05/12/2013 11:16:29.844] [ClusterSystem-akka.actor.default-dispatcher-21] [akka://ClusterSystem/user/clusterListener] Member is Up: {Member(address = akka.tcp://ClusterSystem@127.0.0.1:2552, status = Up)

ノードに参加したことが分かる。

*1:といってもいつまで利用できるかは保障できない