2014年1月24日金曜日

3台構成のAkka Clusterを簡単に試してみる。

3台くらいのLinuxサーバで、Akka Clusterをちらっと試してみるメモです。

akka (http://akka.io/) は、分散システムを作るためのツールキット(ライブラリと小さいプログラム)です。分散は、ロバストとかスケールとかのために。かけ声は、Let it crash (http://letitcrash.com/)。(1台のサーバでちょっとためす例は、こっちを。)

akka 2.2.3 + Java 7(OpenJDK) + SBT(scala 2.10)を使います。
OSは、ここでは、CentOS6かUbuntu12.04ですが、Javaさえ動けば、わりとなんでもよいはずです。

3台(4台)のクラスタを動かします。サンプルのプログラムは、モンテカルロ法での円周率の計算です。重めになるかなと思ってBigIntegerにしてみましたが、πの精度は目指していません。計算をスタートさせるノード(10.0.0.4)から、クラスタに適当にたくさんの計算ジョブ(ノード数より多いジョブ)をばらまきます。ばらまきはrouterに任せます。各ノードは、計算が出来上がったら、最初のノードに結果を返します。最初のノードは、返された結果を集計して円周率を表示します。返される結果が増えると、次第に円周率っぽくなっていく、という感じ。(絵は4台ですが、10.0.0.3は省略してもかまいません。)


0. 準備

0-1. Linuxサーバーを3つ以上用意します。

ほぼ最小インストールのLinuxを準備します。必要なのは、ネットワークと作業用のssh serverくらいです。
(ファイアウォールはtcpのポート2551を許可、SELinuxはOFF)

クラスタの構成は、3台または4台で、サーバーIP:利用ポートは、以下です。
・10.0.0.1:2551を、seedノードとして。
・10.0.0.2:2551を、seedノードとして。
・10.0.0.3:2551を、普通のノードとして。
・10.0.0.4:2551を、計算スタートをさせるノードとして。4台が面倒なら、10.0.0.3と10.0.0.4を兼用で、合計3台でも。

あとは、普通のノードは、お好みで、10.0.0.5, ...と、いくつでも。

seedは、AkkaのClusterの中の特別なノードですが、プログラムからみるとseedかどうかはあまり関係ないはずです。とりあえずは、きっかけのために2個ぐらいあればいい、程度かと。

0-2. すべてのサーバーにJava 7 (OpenJDK) を入れます。

CentOS6では、
yum -y install java-1.7.0-openjdk-devel

Ubuntu12.04では、
apt-get -y install openjdk-7-jdk

ここでは全サーバにJDKを入れていますが、開発機以外はJREだけでいいのかも。

0-3. プログラム開発用サーバー1台だけに、SBT(scala)を入れます。

SBTのパッッケージをダウンロードします。
パッケージは、ここ(http://www.scala-sbt.org/release/docs/Getting-Started/Setup.html)の、 RPM package(CentOS)やDEB package(Ubuntu)のリンクから。
(RPM: http://repo.scala-sbt.org/scalasbt/sbt-native-packages/org/scala-sbt/sbt/0.13.1/sbt.rpm)
(DEB: http://repo.scala-sbt.org/scalasbt/sbt-native-packages/org/scala-sbt/sbt/0.13.1/sbt.deb)

CentOS6では、sbt.rpmをインストール
rpm -ivh sbt.rpm
Ubuntu12.04では、sbt.debをインストール
dpkg -i sbt.deb

そして、CentOSでもUbuntuでも、確認と初期設定をかねて、
sbt version
とし、実行結果は例えば、
...(省略: 一度目だけ時間がかかります)...
[info] 0.1-SNAPSHOT 

1. 開発機上で、プログラムを作成して、配布用にjarパッケージ化

1-1. プログラムやSBTプロジェクト設定ファイルを作成

SBTのしきたりにあわせて、プロジェクトのディレクトリを作り、4つのファイルを作成します。
ここでは、~/work内で作業します。

ディレクトリ作成
mkdir ~/work
cd ~/work
mkdir project
mkdir -p src/main/scala
mkdir -p src/main/resources

ファイル作成(4つ)
・build.sbt (SBTプロジェクトの設定。空白もこのままじゃないと駄目だそうです。)
import AssemblyKeys._

assemblySettings

name := "HeavyPiClusterProject"

version := "1.0"
     
scalaVersion := "2.10.3"
     
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
     
libraryDependencies +=
    "com.typesafe.akka" %% "akka-actor" % "2.2.3"

libraryDependencies +=
    "com.typesafe.akka" %% "akka-cluster" % "2.2.3"

・project/assembly.sbt (SBTのjarパッケージプラグインの設定)
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2")

・src/main/resources/application.conf (akkaの設定、主にCluster関連)
akka {
    actor {
        provider = "akka.cluster.ClusterActorRefProvider"
    }
    remote {
        log-remote-lifecycle-events = off
        netty.tcp {
            hostname = "127.0.0.1"
            port = 0
        }
    }
     
    cluster {
        seed-nodes = [
            "akka.tcp://HeavyPiCluster@10.0.0.1:2551",
            "akka.tcp://HeavyPiCluster@10.0.0.2:2551"]
     
        auto-down = on
    }
}

・src/main/scala/HeavyPiCluster.scala (akkaのプログラム)
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.routing.ClusterRouterConfig
import akka.cluster.routing.ClusterRouterSettings
import akka.routing.ConsistentHashingRouter
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope

import java.math.BigInteger
import java.util.Random
 
object HeavyPiCluster {
    def main(args: Array[String]): Unit = {
        if (args.nonEmpty){
            System.setProperty("akka.remote.netty.tcp.hostname", args(0))
            System.setProperty("akka.remote.netty.tcp.port", args(1))
        }
 
        val system = ActorSystem("HeavyPiCluster")
        val clusterListener = system.actorOf(Props[HeavyPiListener],
          name = "clusterListener")
 
        Cluster(system).subscribe(clusterListener, classOf[ClusterDomainEvent])

        if(args.length > 2){
            Thread.sleep(3000)

            val router = system.actorOf(Props[HeavyPi].withRouter(
                ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings(
                totalInstances = 100, maxInstancesPerNode = 3,
                allowLocalRoutees = false, useRole = None))),
                name = "clusterrouter")

            (1 to 100).foreach{ i =>
                router.tell(ConsistentHashableEnvelope(HeavyPi.Request(1000), i), clusterListener)
            }
        }
    }
}

class HeavyPiListener extends Actor with ActorLogging {
    var totalreq = 0
    var totaliter = 0
    var totalp = 0
    def receive = {
        case state: CurrentClusterState =>
            log.info("Current members: {}", state.members.mkString(", "))
        case MemberUp(member) =>
            log.info("Member is Up: {}", member.address)
        case UnreachableMember(member) =>
            log.info("Member detected as unreachable: {}", member)
        case MemberRemoved(member, previousStatus) =>
            log.info("Member is Removed: {} after {}",
            member.address, previousStatus)
        case HeavyPi.Response(i, p) =>
            totalreq += 1
            totaliter += i
            totalp += p
            val pi = 4D * totalp / totaliter
            log.info("Actor Response {}, pi={}", totaliter, pi)
        case _: ClusterDomainEvent => // ignore
    }
}

object HeavyPi {
    case class Request(iter: Integer)
    case class Response(iter: Integer, p: Integer)
}
     
class HeavyPi extends Actor with akka.actor.ActorLogging {
    val b = 256
    val m = new BigInteger("2").pow(b).subtract(new BigInteger("1"))
    val m2 = m.pow(2)

    def receive = {
        case HeavyPi.Request(iter) =>
            var p = 0

            (1 to iter).foreach{ i =>
                val rnd = new Random()
                val x = new BigInteger(b, rnd)
                val y = new BigInteger(b, rnd)
                val r2 = x.multiply(x).add(y.multiply(y))
                if(r2.compareTo(m2) <= 0) p += 1
            }
            sender ! HeavyPi.Response(iter, p)
    }
}

1-2. プログラムをコンパイルして、jarパッケージを作成し、各サーバーに配布

コンパイルしてjarファイルを作成。
sbt assembly
結果例
...(長いので省略)...
[info] SHA-1: f53f2a9fc0c122bfcea1620de9a2fdfb892403ca
[info] Packaging /home/test/work/target/scala-2.10/HeavyPiClusterProject-assembly-1.0.jar ...
[info] Done packaging.
[success] Total time: 30 s, completed 2014/01/01 20:12:34

できあがりは、HeavyPiClusterProject-assembly-1.0.jarです。

そして、jarパッケージファイルを各サーバーにコピーします。一応、コマンド例は、
scp target/scala-2.10/HeavyPiClusterProject-assembly-1.0.jar 10.0.0.2:/tmp/

2. 実行

一台目(seedノード)
java -cp HeavyPiClusterProject-assembly-1.0.jar HeavyPiCluster 10.0.0.1 2551

二台目(seedノード)
java -cp HeavyPiClusterProject-assembly-1.0.jar HeavyPiCluster 10.0.0.2 2551

三台目(普通のノード。同様に、いくつでも増やせます。0台でも。...4台目サーバーが無い人は、ここを省略。)
java -cp HeavyPiClusterProject-assembly-1.0.jar HeavyPiCluster 10.0.0.3 2551

このあたりで、各コンソールの表示を見ると、以下の感じの行が見えるはずです。
[INFO] ...(省略)... [HeavyPiCluster-akka.actor.default-dispatcher-4] [akka://HeavyPiCluster/user/clusterListener] Member is Up: akka.tcp://HeavyPiCluster@10.0.0.1:2551
[INFO] ...(省略)... [HeavyPiCluster-akka.actor.default-dispatcher-14] [akka://HeavyPiCluster/user/clusterListener] Member is Up: akka.tcp://HeavyPiCluster@10.0.0.2:2551
[INFO] ...(省略)... [HeavyPiCluster-akka.actor.default-dispatcher-12] [akka://HeavyPiCluster/user/clusterListener] Member is Up: akka.tcp://HeavyPiCluster@10.0.0.3:2551
この場合、この時点では、3台構成のAkkaのクラスターが動作していることになります。

四台目のノードで、π計算を開始。(routerを作って、各ノードに計算要求のmessageを投入。)
java -cp HeavyPiClusterProject-assembly-1.0.jar HeavyPiCluster 10.0.0.4 2551 pi

ここで、少しすると、各ノードから結果がかえってきて、以下のようになります。
 ...(省略)...
[INFO] ...(省略)... [HeavyPiCluster-akka.actor.default-dispatcher-2] [akka://HeavyPiCluster/user/clusterListener] Actor Response 98000, pi=3.1386938775510203
[INFO] ...(省略)... [HeavyPiCluster-akka.actor.default-dispatcher-2] [akka://HeavyPiCluster/user/clusterListener] Actor Response 99000, pi=3.1387878787878787
[INFO] ...(省略)... [HeavyPiCluster-akka.actor.default-dispatcher-16] [akka://HeavyPiCluster/user/clusterListener] Actor Response 100000, pi=3.13968
--
以上

0 件のコメント:

コメントを投稿