技術をかじる猫

適当に気になった技術や言語、思ったこと考えた事など。

昨日の続き

昨日 お勧め映画検索のデータ準備 - 謎言語使いの徒然 の続きで、実際に人の組み合わせをやってみたわけ、、、、だがっ!

総勢 6040 人。組み合わせ数 18,237,780 ユーザIDを Long として保存したので 4 バイト。72,951,120 MB のデータ容量。

まぁこの位ははいんだろーとタカ括ったら、大量に OutOfMemory しまくって最後に落ちた。

いや待て、何にメモリ食ってるんだよよ、、、72MB なんぞ普通にメモリ確保してんだろーとか思う(加えてユーザ間で相関とったらデータは削除してるし、身に覚えがない)

ということで、ログを吐きまくってみたところ、人によっては 400 件とかアホなレビューをしていた。なんてこったい、、、、そりゃOutOfMemoryも出るわ!

メモリ 8 G に CPU(3.9GHz) 4 つ与えてVM再実行。加えて、JVMも fork 設定して走らせたらまともに動いた。ってーか sbt はデフォルトで 1G しか設定されてないのか、、、、そりゃヒープで落ちるわ!

fork in run := true

javaOptions in run += "-Xmx6G"

sbt run してみた。

前回の case class を引き継ぎつつ、コードは

package net.white_azalea

import net.white_azalea.models.{Rating, User, DB, Movie}
import com.novus.salat._
import com.novus.salat.global._
import com.mongodb.casbah.Imports._
import akka.actor.{Props, ActorSystem, ActorLogging, Actor}
import akka.routing.RoundRobinRouter

object Util {
  def findCommonItem(left:Map[Long, Int], right:Map[Long, Int]):Iterable[(Int, Int)] = {
    left.filter(kv => {
      val (key, _) = kv
      right.contains(key)
    }).map(kv => {
      val (key, value) = kv
      (value, right(key))
    })
  }

  def correlationCoefficient(left:Map[Long, Int], right:Map[Long, Int]) = {
    val commonValues = findCommonItem(left, right).toList
    if (commonValues.isEmpty) {
      0.0
    } else {
      // 全ての嗜好を合計
      val leftSum  = commonValues.map(_._1).sum.toDouble
      val rightSum = commonValues.map(_._2).sum.toDouble

      // 平方和
      val leftSqSum  = commonValues.map(v => Math.pow(v._1, 2.0)).sum
      val rightSqSum = commonValues.map(v => Math.pow(v._2, 2.0)).sum

      // 積
      val pSum = commonValues.map(lr => lr._1 * lr._2).sum

      // 統計量の計算
      val n   = commonValues.length.toDouble
      def num = pSum.toDouble - (leftSum * rightSum / n)
      def den = Math.sqrt((leftSqSum - Math.pow(leftSum, 2) / n) * (rightSqSum - Math.pow(rightSum, 2) / n))

      if (den == 0.0) {
        0.0
      } else {
        num / den
      }
    }
  }
}

class ExecActor extends Actor with ActorLogging {
  val mongo = DB.get
  def ratings = DB.getCollection("ratings")

  def setUserCorrelation(lid:Long, rid:Long, result:Double) {
    if (result != 0.0) {
      val status = mongo("user_relate").insert(MongoDBObject(
        "lid" -> lid, "rid" -> rid, "correlation" -> result
      ))
      log.info(status.toString)
    }
  }

  def getRatings(id: Long) = ratings
    .find(MongoDBObject("uid" -> id)).map(v => grater[Rating].asObject(v)).map(v => v.mid -> v.rating)

  def exec(lid:Long, rid:Long) {
    val left   = getRatings(lid).toMap
    val right  = getRatings(rid).toMap

    if (left.isEmpty || right.isEmpty) {
      log.info(s"[EMPTY] Left($lid) is ${left.isEmpty}, Right($rid) is ${right.isEmpty}")
    } else {
      def result = Util.correlationCoefficient(left, right)
      log.info(s"[FINISH] $lid, $rid = $result")
      setUserCorrelation(lid, rid, result)
    }
  }

  def receive = {
    case (lid:Long, rid:Long) => {
      exec(lid, rid)
    }
    case v => log.info(s"Other message received ${v.toString}")
  }
}

object Runner extends App {
  val system = ActorSystem()
  val actorRef = system.actorOf(Props[ExecActor].withRouter(new RoundRobinRouter(8)))

  val MAX_USER_COUNT = DB.get("users").count()
  (1L to (MAX_USER_COUNT - 1L)).foreach(lid => {
    (lid to MAX_USER_COUNT).foreach(rid => {
      actorRef ! (lid, rid)
    })
  })
}

どうせ読み込みと計算で1CPUを2スレッドでスケジューリングされないかなーと期待しつつ走らせた。メモリは 3G 超えないくらいを行ったり来たり。

しかし、このユーザ間の総当たりはひどいw

一応これでも4GHzCPU4コアに、メモリ8G投入して、JVMでは 6GB 8スレッドで実行し始めたが、終わる気配がない。

10分超えても一人目が終わらないとかなかなかに果てしないですね。

結果、一人目と、その他全員相関を取ると、約18分待たされる事になった。これは酷い。

目的は「おすすめの映画を選ぶ」ということだ。つまり、「似ている人を探して、そのお勧めを得る」という方法とは別に、「お気に入りの映画に似た映画を探せばよい」とも取れる。

何でこんなことをいうかというと、なんせ映画のレコード数は 3000 行台なのだ。この時点で計算回数が一気に減る。

ということで先ほどパラメータを、検索対象を mid (映画ID)にして相関に使うIDを uid (ユーザID)で置き換えて実行。

勿論、計算に用いるサンプル数は大きく変わるが、1回の相関計算にかかる時間はそう大差ないことが分かった。

というかこちらの方がメモリ消費が少ない。およそ1.5GBを切ってる。結果的には、最初の映画が他の映画すべてと比較を終えるのに、約 13 分程度で済んでいる。

統計でも言えることだけど、こういうものの効率とかは発想の世界っぽい。