vendredi 1 juin 2012

Akka : à la découverte des acteurs

Akka est un framework permettant de gérer des problématiques de parallélisation et d'asynchronisme, basé sur un système d'acteurs. Les acteurs sont des objets capables de communiquer entre eux par messages pour échanger des ordres d’exécution et des informations sur leur état.

Dans cet article nous allons étudier un cas assez simple : comment appliquer une transformation (une multiplication par deux) sur une liste d'entiers en parallélisant les calculs.


Pour ceci nous allons créer un système avec plusieurs acteurs :
  • Un master qui va dispatcher le travail puis récupérer les résultats
  • Des workers qui vont traiter individuellement les transformations d'entiers
Cette première portion de code va initialiser le système et créer les acteurs. On démarre le master en lui envoyant un message de type Start.
    
final int nbWorkers = 6;
List<Integer> numbers = Arrays.asList(1, 5, 15, 22, 66, 55);
ActorSystem system = ActorSystem.create("mySystem");
ActorRef master = system.actorOf(new Props(new UntypedActorFactory() {
  public UntypedActor create() {
    return new Master(nbWorkers, numbers);
  }
}), "master");
Start startMessage = new Start();
master.tell(startMessage);

Au moment de sa création, le master va déclarer un routeur capable de répartir les traitements demandés sur un nombre de nœuds définis par nos soins (ici on en a 6) .

public class Master extends UntypedActor {
  private List<Integer> numbers;
  private List transformedNumbers;
  private final ActorRef workerRouter;
  long start = System.currentTimeMillis();

  public Master(int nbWorkers, List<Integer> numbers) {
    transformedNumbers = new ArrayList<Integer>();
    this.numbers = numbers;
    //create the worker dispatcher
    workerRouter = this.getContext().actorOf(new Props(Worker.class).withRouter(new RoundRobinRouter(nbWorkers)), "workerRouter");
    System.out.println("master and router created");
  }

Voyons maintenant comment le master répond aux messages. Cet acteur reconnaît deux types de messages : Start et Response. Leur code est assez sommaire :

public class Start{}
public class Result{
    public Result(int value) {
      this.value = value;
    }
    public int value;
  }

Le premier est vide, il permet simplement de démarrer le master pour lui demander de dispatcher le travail de calcul envoyant chaque entier à un worker (comme nous l'avons vu plus haut). L'autre est un simple conteneur qui permet aux workers de renvoyer les résultats partiels du calcul.

Lé méthode "onReceive" que l'on définit dans le master reçoit les messages qui sont adressés à celui ci :
@Override
  public void onReceive(Object message) {
    if(message instanceof Start){
      System.out.println("master received a start event");
      //dispatch
      for (Integer number : numbers){
        // send to a worker
        workerRouter.tell(number, getSelf());
      }
    }
    else if(message instanceof Result){
      System.out.println("master received result event");
      Result result = (Result) message;
      //join
      transformedNumbers.add(result.value);
      if(transformedNumbers.size() == numbers.size()){
        Duration duration = Duration.create(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
        System.out.println(transformedNumbers.size() + " numbers computed in " + duration);
        //stop master and its workers
        getContext().stop(getSelf());
        getContext().system().shutdown();
      }
    }
  }

Une fois que le master a récupéré le nombre de réponses attendues, le travail est terminé et on peut afficher le résultat.
L'instruction "tell" permet de travailler de manière asynchrone : lorsque l'on demande à un worker de traiter un entier, on continue à exécuter le code pour traiter les éléments suivants sans attendre le résultat, qui sera communiqué au master à l'aide d'une notification.

Enfin, voici le code de la classe Worker :

public class Worker extends UntypedActor {

  @Override
  public void onReceive(Object message) {
    System.out.println("worker received an event");
    if(message instanceof  Integer){
      int number = (Integer) message;
      int transformedNumber = tranform(number);
      //send result to master
      getSender().tell(new Result(transformedNumber), self());
    }
  }

  private int tranform(int number){
    try {
      Thread.sleep(4000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return number *2;
  }
}

La méthode transform contient le code de la transformation. On introduit volontairement un "sleep" de 4 secondes pour allonger le traitement et mesurer plus facilement le gain de la parallélisation.

Premières observations

Définir plusieurs workers permet de répartir le traitement sur plusieurs threads en faisant abstraction de la programmation parallèle :

  • Avec un seul worker, transformer les 6 entiers demandés prend environ 24 secondes
  • Avec 4 workers : environ 8 secondes
  • Avec 6 workers : environ 4 secondes
Pour un nombre de workers supérieur ou égal au nombre d'éléments, le temps de calcul revient à peu près à passer une seule fois dans le sleep de 4 secondes : on n'attend pas la fin d'un calcul et du sleep associé pour passer aux itérations suivantes.
Dans notre cas comme il y a des phases d'attente on peut même aller à un nombre de workers supérieurs au nombre de coeurs de la machine (j'ai lancé ce code sur un dual-core).  Ceci peut également être pertinent lorsqu'on attend la réponse d'un webservice par exemple ou pour tout autre type d'entrées/sorties.

Quelques modifications

On aimerait pouvoir récupérer le résultat du traitement depuis une méthode appelante.
Cependant si celle ci n'est pas gérée par un acteur, on ne peut pas répondre directement depuis notre master à l'aide d'un "tell". On passe alors par l'API Future d'Akka.
Un future permet de lancer un traitement en parallèle (dans un thread séparé) puis de récupérer le résultat.
Par exemple :

public static void main(String[]args) throws Exception{
    ActorSystem system = ActorSystem.create("mySystem");

    Future<String> f1 = future(new Callable<String>() {
      public String call() throws Exception {
        Thread.sleep(4000);
        return "You String is ready !!";
      }
    }, system.dispatcher());
    Timeout timeout = new Timeout(Duration.parse("30 seconds"));
    String result = Await.result(f1, timeout.duration());
    System.out.println(result);
  }

Note : On utilise ici la classe Future d'Akka et non celle du JDK. Un des avantages de cette version est de permettre la récupération de la valeur dès qu'elle est disponible sans avoir à vérifier périodiquement que le traitement est terminé.

On peut utiliser le même mécanisme pour obtenir la réponse d'un acteur une fois ses taches terminées.
La méthode Patterns.ask permet de définir un acteur implicite auquel notre master pourra répondre à travers un objet de type Future :

Future future = Patterns.ask(master, startMessage, timeout);
List<Integer> result = (List<Integer>) Await.result(future, timeout.duration());

On peut ainsi imaginer une méthode qui appelle notre système d'acteurs et qui retourne le résultat :

public List<Integer> transform(List<Integer> numbers){
 ActorSystem system = ActorSystem.create("mySystem");
    ActorRef master = system.actorOf(new Props(new UntypedActorFactory() {
      public UntypedActor create() {
        System.out.println("create master");
        return new Master(nbWorkers, numbers);
      }
    }), "master");
    Start startMessage =  new Start();
    Timeout timeout = new Timeout(Duration.parse("30 seconds"));
    Future future = Patterns.ask(master, startMessage, timeout);
    List<Integer> result = (List<Integer>) Await.result(future, timeout.duration());
    system.shutdown();
 return result;
  }

Pour renvoyer le message depuis le master, on garde une référence de l'expéditeur initial (déclaré implicitement lors de l'appel de 'ask') pour pouvoir lui répondre plus tard.
Attention, appeler la méthode 'getSender' en fin de traitement renverrait le message au dernier expéditeur, dans ce cas ce serait un worker.
On sauvegarde donc cette référence dans le bloc "Start" du worker :

public class Master extends UntypedActor {
  
  private ActorRef initialSender;
 ...

if(message instanceof Start){
  this.initialSender = getSender();
 ... 
}

if(message instanceof Response){
if(transformedNumbers.size() == numbers.size()){
    reply();
  }
...
}

public void reply(){
    initialSender.tell(transformedNumbers);
    //stop master and its workers
    getContext().stop(getSelf());
  }
}

Note : Attention à l'utilisation de la classe Await.result qui est bloquante (d'où l'obligation d'utiliser un timeout). Ici cela ne pose pas de problème car elle intervient après avoir lancé tous les calculs en parallèle mais l'utiliser à un mauvais endroit aurait un impact sur les performances.

Pour terminer ce long article, Akka est un framework écrit avec le langage Scala, il propose donc naturellement une API Scala pour manipuler les acteurs. Scala permet d'écrire un code un peu plus élégant que Java pour ce genre de choses (par exemple en évitant les "cast"). Voici à quoi pourrait ressembler un worker en Scala :

class Worker extends Actor { 
    def receive = {
 case Int(number) => 
  sender ! number * 2
    }

Vous pouvez télécharger le code source des exemples ici.