mardi 14 mai 2013

Calcul distribué avec Spark

Spark est un outil de calcul distribué, permettant de lancer de gros calculs sur un cluster.

D'après ses développeurs, Spark est jusqu'à 100 fois plus rapide qu'Hadoop pour effectuer le même genre de taches.
Spark repose sur HDFS (Hadoop Distributed File System) mais les calculs se font en mémoire pour accélérer les traitements. Le but de cette accélération est de pouvoir effectuer des requêtes en temps réel sur de très gros volumes de données, par exemple sur les logs d'accès d'un site Internet. L'analyse des données en temps réel, le rêve quand on fait de la BI!

D'autres applications sont possibles comme le machine learning ou le data mining. Mais commençons par mettre les mains dans le cambouis!
Le coeur de Spark est écrit en Scala mais des API Java et Python sont également proposées pour interagir avec le framework. Un shell est fourni par défaut, il permet de manipuler facilement l'API Scala de Spark. Très pratique pour faire des tests!

Pour découvrir Spark, commençons par lire un fichier en local :
val textFile = sc.textFile("README.md")
Note : sc est un objet disponible par défaut dans le shell, il correspond au contexte Spark courant.



On veut maintenant compter les occurences de chaque mot dans ce fichier :
val wordCounts = textFile.flatMap(line => line.split(" ")) //on crée un tableau de mots
.map(word => (word, 1)) // on crée un tuple par mot pour compter les occurrences
.reduceByKey((a, b) => a + b) // on additionne les nombres d’occurrences en groupant par clé (par mot)

Un aspect vraiment sympa et puissant avec Spark, il est possible de manipuler les fichiers locaux ou les fichiers distribués sur HDFS exactement de la même manière. L'API masque complètement cet aspect.
On peut donc refaire la même chose mais en lisant le fichier sur HDFS comme ceci :
val textFile = sc.textFile("hdfs://masterHost:9000/data/README.md")

Modifions maintenant notre calcul, pour ne compter que les 9èmes mots de chaque ligne. Si la ligne contient moins de 9 mots, on veut afficher "smaller than 9". Pour ça on peut appeler la fonction lift sur notre tableau de mot (fonction standard dans l'API Scala), qui nous renverra une Option[String].
Le code devient alors :
val wordNineCOunts = textFile.map(line => line.split(" ")
.lift(9).getOrElse("smaller than 9")) // si la valeur n'est pas présente dans l'option, on écrit "smaller than 9"
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)

Pour afficher le résultat, on procède comme ceci :
wordNineCOunts.collect
On obtient alors :
Array((different,1), (to,2), (other,1), (no,1), (params`.,1), (its,2), (spark:/,1), (open,1), (`/usr/local/lib/libmesos.so`,1), (of,1), (programming,1), (CPUs.,1), (HDFS,1), (smaller than 9,40), (want,1), (with,1), (so.,1), (the,1), (that,2), (minimum,,1), (Simple,1), (documentation,1), (bin,1), (their,1), (this,1), (variable,1))...

Note : 'collect' permet de passer d'un objet Spark de type RDD (resilient distributed dataset) à un simple tableau de valeurs

On peut aussi trier les résultats sur le deuxième membre de chaque tuple, c'est à dire trier les mots par nombre d’occurrence :
wordNineCounts
.collect
.sortWith((a, b) => a._2 > b._2)
Array((smaller than 9,40), (to,2), (its,2), (that,2), (different,1), (other,1), (no,1), (params`.,1), (spark:/,1), (`/usr/local/lib/libmesos.so`,1), (of,1), (open,1), (programming,1), (CPUs.,1), (HDFS,1), (want,1), (with,1), (so.,1), (the,1), (minimum,,1), (Simple,1), (documentation,1), (bin,1), (their,1), (this,1), (variable,1))...

En conclusion, Spark est un très bon cas d'utilisation de Scala, peu verbeux pour manipuler les données et bien adapté pour les traitements en parallèle, notamment grâce à l'immutabilité poussée par le langage.
Juste pour vous faire une idée, regardez ce que donnerait la même chose en Java.

Pour aller plus loin, vous pouvez aussi regarder Shark, qui repose sur Spark et apporte la possibilité de faire des requêtes avec un langage "SQL like" sur vos données distribuées. Shark est compatible avec Apache Hive tout en permettant de profiter de la vitesse d’exécution de Spark.