技術をかじる猫

適当に気になった技術や言語、思ったこと考えた事など。

Ubuntu 13.10 に Riak を入れてScalaから操作してみる

ぶっちゃけ公式読みながらサンプル書いて、ソース読んで、必要以上に難しそうに書いてる箇所を簡略化しただけ。

Riak

ほとんど純粋なKVSで、認証機構がないので、ほしければ自力でセキュリティを設定する必要がある。

まぁ IP Tables でいいんじゃないかと思う。

downloads に deb パッケージがあるので悩むことがない。ただし 64bit の dist しかないので、32bit でやるならビルドしかなさそう。

もっとも、今さら開発者で 32bit 環境はないと思うが、、、、

Ubuntu13.10 向けのビルドは現状存在していない(2013/11/21)ので、12 向けをインストール。途中で「パッケージ壊れてそう…」みたいな事を言われるが無視☆

sudo service riak start

でさっくり起動。

/etc/riak/app.config この辺を読むとポート 8098 で開いてるらしい。

開くと、リンクのリストが出るがその先はブラウザでは見れないらしい。どうもリクエストは HTTP だが、帰ってくるレスポンスは独自形式か何からしい。

JavaInterface 経由で弄ってみる

http://docs.basho.com/riak/latest/dev/taste-of-riak/java/

(当たり前のようにSBT入れてね)

build.sbt をざっくり

scalaVersion := "2.10.2"

libraryDependencies += "com.basho.riak" % "riak-client" % "1.4.2"

まぁScalaVersionはおまけ。JavaJarならScalaVersionには依存するまいよ。

object RiakTest extends App {
  import com.basho.riak.client.builders.RiakObjectBuilder
  import com.basho.riak.client.raw.http.HTTPClientAdapter
  val client = new HTTPClientAdapter("http://localhost:8098/riak")

  // save
  val obj    = RiakObjectBuilder.newBuilder("sampleBucket", "testKeyLowJava")
    .withContentType("test/plain")
    .withValue("TestJavaClientValue")
    .build()
  client.store(obj)

  // get
  val response = client.fetch("sampleBucket", "testKeyLowJava")
  println("Written : " + response.getRiakObjects()(0).getValueAsString)

  client.shutdown()
}

次は多分HighLevelClient

import com.basho.riak.client.RiakFactory

object RiakTest extends App {
  val client = RiakFactory.pbcClient()

  // store
  val bucket = client.fetchBucket("sampleBucket").execute()

  bucket.store("testKeyHighJava2", 128).execute()

  // get
  val result = bucket.fetch("testKeyHighJava2", Integer.TYPE).execute()
  println(s"Write : $result")

  client.shutdown()
}

バイナリシリアライズだけきちんとできればこの方が楽はできる。

というかさすがJava、ダサい

ちなみに POJO なオブジェクトなら、シリアライズは不要だそうな。

しかしScalaからPOJOとかアホの所業。

Scala

Scala は Riak Scala Client - Home を使おうと思う。

単純に公式の通りだと、resolver の設定が甘い(多分環境に設定してるんじゃないかと予想)ので、やたら落ちる。

scalaVersion := "2.10.2"

resolvers ++= Seq(
  "Sonatype OSS" at "https://oss.sonatype.org/content/repositories/releases",
  "spray" at "http://repo.spray.io/",
  "scct-github-repository" at "http://mtkopone.github.com/scct/maven-repo"
)

libraryDependencies += "com.scalapenos" %% "riak-scala-client" % "0.8.1.1"

エラーの出るライブラリを探して、そのリポジトリを見つけて、resolver に登録。

sbt のここで躓く人多いよね(私もここで躓いた)

まずは低レベルな設定取得

import scala.concurrent._
import scala.concurrent.duration._
import com.scalapenos.riak._

object RiakTest extends App {
  val client = RiakClient("localhost", 8098)
  val bucket = client.bucket("scalaBucket")

  // store value
  implicit def integerSerializer = new RiakSerializer[Int] {
    def serialize(s: Int): (String, ContentType) = (s.toString, ContentTypes.`text/plain`)
  }
  
  // RiakSerializer をきちんと作れば直接値を設定できる
  val writeFuture = bucket.store("SampleScalaValue1", 8192)
  
  // await for write finish
  Await.result(writeFuture, 10 seconds)

  val fetchFuture = bucket.fetch("SampleScalaValue1")
  val result = Await.result(fetchFuture, 10 seconds).getOrElse(throw new Exception).data

  println(s"GetResult: $result")
}

Serializer がデフォルトで String しか存在しないのは注意。デシリアライザも定義していないので、今回は文字列として取得している。

定義は

@implicitNotFound(msg = "Cannot find RiakDeserializer type class for ${T}")
trait RiakDeserializer[T] {
  /**
   * Deserializes from some raw data and a ContentType to a type T.
   * @throws RiakDeserializationException if the content could not be converted to an instance of T.
   */
  def deserialize(data: String, contentType: ContentType): T
}

後、自身で ActorSystem を作成して設定しない場合、終了する手段がなくなるので注意。

次は case class をマップする

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.concurrent.duration._
import com.scalapenos.riak._
import spray.json.DefaultJsonProtocol._

case class User(name:String, age:Int)
object User {
  implicit val jsonFormat = jsonFormat2(User.apply)
}

case class Group(name:String, users:List[User])
object Group {
  implicit val jsonFormat = jsonFormat2(Group.apply)
}

object RiakTest extends App {
  val client = RiakClient("localhost", 8098)
  val bucket = client.bucket("scalaBucketUser")

  val group = Group("TestGroup", List(
    User("Azalea", 30),
    User("Sorikkc", 28)
  ))

  // store value
  val storeFuture = bucket.store(group.name, group)
  Await.result(storeFuture, 10 seconds)

  // fetch values
  val fetchFuture = bucket.fetch("TestGroup").map(_.map(_.as[Group]))
  val result = Await.result(fetchFuture, 10 seconds)

  println(s"Hoge : $result")
}

つーかあれ?実行速度遅くないか?(;´・ω・)

てーか何だこの有様。Scala用ライブラリ使った方が行数長いぞ(-_-;)

Case class マップはだいぶ短いが、純粋KVで扱うには使い勝手がよろしくない。