k-d-Bäume mit Apache Spark und Scala

Es ist für Firmen nützlich zu wissen, wie sich die Kunden verteilen: ob sie weit verteilt sind oder ob sie sich alle in der gleichen Region aufhalten. Man kann dann z. B. die Kunden informieren, welche anderen Kunden in der Nähe sind, wie bei sozialen Netzwerken, oder man kann als Firma die Standorte der Filialen optimieren. Hierzu werden geometrische Datenstrukturen benötigt, die orthogonale Bereichsabfragen erlauben, wie z. B. k-d-Bäume.

Im Stanford Network Analysis Project (SNAP) habe ich geeignete Testdaten gefunden. Die Testdaten von Gowalla mit dem Namen loc-gowalla_totalCheckins.txt.gz haben das folgende Format:

[user]  [check-in time]       [latitude]    [longitude]    [location id]
196514  2010-07-24T13:45:06Z  53.3648119    -2.2723465833  145064
196514  2010-07-24T13:44:58Z  53.360511233  -2.276369017   1275991
196514  2010-07-24T13:44:46Z  53.3653895945 -2.2754087046  376497
196514  2010-07-24T13:44:38Z  53.3663709833 -2.2700764333  98503

Interessant für mich hier waren die Längen- und die Breitengrade. Man könnte diese Daten natürlich auch künstlich generieren, aber das wäre nicht so "echt" und die Ergebnisse auch nicht so interessant.

Das Ziel der Anwendung war, für jeden Benutzer zu ermitteln, wie viele andere Benutzer in seiner Nachbarschaft sind, z. B. in einem 5km^2 großen Bereich. Die Verarbeitung soll auf einem Cluster laufen können, damit der Prozess später skaliert. Die Daten wurden daher auch in HDFS gespeichert. Ich habe die Anwendung bisher auf einem virtuellen Cluster mit 3 Knoten / Spark-Workern getestet.

Weitere Anforderungen:

  1. k-d-Bäume in Scala
  2. Verarbeitung der Daten eingebettet in Apache Spark, aber "naiv", d.h. nicht optmiert.
  3. Auswertung mit R

Der vollständige Code ist bei GitHub verfügbar.

k-d-Bäume in Scala

Ein k-d-Baum ist eine Verallgemeinerung des eindimensionalen Suchbaums. Die Erklärungen bei Wikipedia sind gut, also erspare ich mir diese hier.

Ein KdTree bietet die Möglichkeit einer rangQuery, einer Bereichsabfrage.

sealed trait KdTree[+T] {
    def rangeQuery(range: Range): List[(Point2, T)]
}

Die drei induktiven Fälle können einfach als case-Klassen implementiert werden.

class Nil[T] extends KdTree[T] with Serializable {
    override def rangeQuery(range: Range): List[(Point2, T)]
        = List()
}
object Nil extends Nil[Nothing]

case class Leaf[T](val p: Point2, val value: T) extends KdTree[T] with Serializable {
    override def rangeQuery(range: Range): List[(Point2, T)]
        = if (range.inRange(p)) List((p, value)) else List()

case class Node[T](val dimension: Int,
    val median: Double, val ls: KdTree[T] = Nil, val es: KdTree[T] = Nil, val hs: KdTree[T] = Nil)
    extends KdTree[T] with Serializable {

    override def rangeQuery(r: Range): List[(Point2, T)] = {
        r.compareIth(dimension, median) match {
        case (-1,  _) => hs.rangeQuery(r)
        case ( 0,  _) => es.rangeQuery(r) ++ hs.rangeQuery(r)
        case ( 1, -1) => ls.rangeQuery(r) ++ es.rangeQuery(r) ++ hs.rangeQuery(r)
        case ( 1,  0) => ls.rangeQuery(r) ++ es.rangeQuery(r)
        case ( 1,  1) => ls.rangeQuery(r)
        }
    }
}

Das lässt sich mit Scala klar und deutlich ähnlich wie bei Haskell definieren. In meiner Diplomarbeit wird die Bereichssuche und die Erstellung des kd-Baums auch genauer beschrieben.

Verarbeitung mit Apache Spark

Von den eingelesenen Daten in fileSortedByUser werden nur die zum Zeitpunkt dt gültigen genommen. Dieses geschieht mit der Funktion filterToLatest. Dann gibt es zwei Stränge: einmal wird der k-d-Baum kdt erstellt und später wird für jeden Kunden eine Bereichsanfrage mit kdt.rangeQuery(rect) durchgeführt. In diesem Beispiel wird nur die Anzahl der Knoten im spezifizierten Bereich zurückgegeben. Wichtig ist hier anzumerken, dass es sich um den ersten Prototypen handelt, es wurden z. B. keine Spark-Optimierung wegen der Anzahl der Partitionen gemacht. Das kommt in einer späteren Iteration/Sprint.

val rdd: RDD[CheckIn] = filterToLatest(fileSortedByUser, dt)    // get all the latest checkins for dt
rdd.persist()

// build the KD tree
val ps: RDD[(Point2, CustomerId)]            = rdd.map { c => ( Point2(c.locX, c.locY), c.id ) }
val ps2: RDD[(Point2, Iterable[CustomerId])] = ps.groupByKey()
val ps3: RDD[(Point2, List[CustomerId])]     = ps2.mapValues { p => p.toList }
val ps4                                      = ps3.collect()
val kdt: KdTree[List[CustomerId]]            = KdTree.fromList(ps4)

// query for each customer in rdd
val ns : RDD[(CheckIn,Seq[(Point2, List[CustomerId])])] = rdd.map { c =>
    val loc                                  = Point2(c.locX, c.locY)
    val rect                                 = Haversine.neighborhood(loc, windowSizeInKm)
    val ps: Seq[(Point2, List[CustomerId])]  = kdt.rangeQuery(rect)
    (c, ps.filter { x => x._1 != loc })      // ignore the point at loc, this is the current row
}
// reduce to compact output format: (CustId, #Neighbours)
val ns3 = ns.map { x => (x._1.id, x._2.size )}
utils.write(dest, mkCSV("CustomerId", "number of neighbors", ns3.collect()))

Anschließend werden die Daten in einer CSV-Datei gespeichert. Anmerkung: Die Typinformationen sind nicht notwendig, ich finde sie beim Lesen von RDD-Transformationen aber praktisch.

Auswertung mit R

Mit Spark lassen sich auch schnell Summationen/Aggregationen/Reduktionen berechnen. Diese können in R einfach mit ggplot2 dargestellt werden.

Der Graph links zeigt die Anzahl von Check-Ins pro Tag und den gleitende Durchschnitt über 7-Tage (simple moving average). Der Graph in der Mitte stellt die Check-Ins pro Monat dar und der Graph Rechts die Check-Ins nach Uhrzeit.

Für das erste Diagramm funktioniert das beispielsweise folgendermaßen (die theme-Attribute sind vereinfacht dargestellt).

# read the csv file
a <- read.csv(file="sums_ymd.csv", header=TRUE, sep=";", colClasses=c("character", "numeric")))
a$yyyymmdd <- as.Date(a$yyyymmdd, format="%Y%m%d")
a$smoothed <- filter(a$value, rep(1/7, 7), sides=2)               # smooth a 7 day time window

# create a chart with an area and two lines
ggplot(a, aes(x=yyyymmdd, y=value)) +
    geom_area(fill=blue, alpha=.3) +
    geom_line(color=blue) +
    geom_line(aes(y=smoothed), color=red) +
    theme(
        panel.background=element_rect(fill=mk_color(blue, 0.1)),
        legend.position=c(0.1, 0.7)) +
    ggtitle("Number of check-ins per day in the loc-gowalla dataset") +
        xlab("Date") + ylab("Number of check-ins") +
    theme(
        axis.title.x=element_text(size=12, lineheight=.9, colour=red),
        axis.text.x=element_text(size=10, color=blue),
        axis.title.y=element_text(size=12, lineheight=.9, colour=red),
        axis.text.y=element_text(size=10, color=blue),
        plot.title=element_text(size=10, color=red)
)

Mit Hilfe des k-d-Baumes aber können auch Abfragen gestellt werden, die nicht trivial sind. Das folgende Histogramm zeigt z. B. die Anzahl der Nachbarn in einem 5km-Bereich für alle User. Mit Hilfe solcher Informationen können Dienste z. B. verbessert werden. Man könnte den Nutzern z. B. Kontaktanfragen senden oder zusätzliche Filialen eröffnen, etc.

Histogramm

Mit ggmap lassen sich auch Karten von Google oder OpenStreetMap verwenden. In folgender Karten bzw. Diagramm sind die "Hotspots" in Rot eingezeichnet: in der Nähe von Houston und San Francisco.

Diagram: X

Weitere Arbeiten

Das war der erste Prototyp, ein Proof-of-Concept. Die folgenden weiteren Arbeiten sind angedacht:

Der vollständige Code ist bei GitHub verfügbar.