Marco Ordoñez

Event Sourcing con Akka y mongodb + Akka Http

Event sourcing se basa en el almacenamiento del estado de un sistema como una secuencia ordenada de eventos, la ventaja de esta técnica es que el sistema puede recuperar su estado repitiendo todos los eventos que ha recibido.

Akka persitance es el módulo de persistencia de datos de Akka, la idea detrás de este módulo es poder recuperar el estado de un actor en cualquier caso en el que haya sido reiniciado.

La demo que verán a continuación muestra lo simple que es implementar este tipo de actores en Akka.

A continuación, el actor que persiste los datos:

package pe.evolbit.actors

import akka.actor._  
import akka.persistence._  
import akka.event.Logging

case class AddFund(amount: Int)

trait FundEvent {  
  def amount:Int
}
case class AddFundEvent(amount:Int) extends FundEvent

case class FundState(totalBalance:Int = 0) {  
  def updateBalance(amount:Int) = copy(totalBalance + amount)
}

class FundActor extends PersistentActor {

  override def persistenceId = this.getClass.getName.toLowerCase()

  val log = Logging(context.system, this)

  var state = FundState()
  var messagesCounter = 0

  def updateState(event: FundEvent): Unit = {
    val amount = event match {
      case AddFundEvent(amount) => event.amount
    }
    log.info(s"current balance: ${state.totalBalance}")
    log.info(s"new amount: ${amount}")
    log.info(s"new balance: ${state.totalBalance + amount}")
    state = state.updateBalance(amount)

    messagesCounter = messagesCounter + 1
      if(messagesCounter > 5){
        saveSnapshot(state)
        messagesCounter = 0
    }
  }

  val receiveRecover: Receive = {
    case AddFundEvent(amount) =>
      val addFundEvent = AddFundEvent(amount)
      log.info(s"recover addFundEvent $addFundEvent")
      updateState(addFundEvent)

    case SnapshotOffer(_, snapshot: FundState) => 
      log.info(s"snapshot $snapshot")
      state = snapshot
  }

  val receiveCommand: Receive = {
    case AddFund(amount) =>
      val addFundEvent = AddFundEvent(amount)
      persist(addFundEvent) { event =>
        updateState(addFundEvent)
      }
      context.system.eventStream.publish(addFundEvent)

    case "getState" =>  sender() ! state
  }

}

Las partes más resaltantes del código anterior son las siguientes:

La función persist se encarga de persistir los datos de forma asíncrona.

persist(addFundEvent) { event =>  
  updateState(addFundEvent)
}

La función saveSnapshot se encarga de persistir los datos a modo de snapshot, esto nos sirve para evitar recibir muchos mensajes. Una vez creado un snapshot, se recibirá el snapshot y posibles futuros mensajes antes del siguiente snapshot.

if(messagesCounter > 5){  
    saveSnapshot(state)
    messagesCounter = 0
}

El método receiveRecover que recibe los mensajes persistidos y los snapshots que se hayan creado.

val receiveRecover: Receive = {  
    case AddFundEvent(amount) =>
      val addFundEvent = AddFundEvent(amount)
      log.info(s"recover addFundEvent $addFundEvent")
      updateState(addFundEvent)

    case SnapshotOffer(_, snapshot: FundState) => 
      log.info(s"snapshot $snapshot")
      state = snapshot
}

De esta forma podemos persistir mensajes y snapshots, al iniciar el actor este recibirá automáticamente todos los mensajes y snapshots almacenados y con esos datos podremos recuperar el estado del actor.

Un punto importante a resaltar es que al tener el estado en memoria, acceder al mismo es mucho más rápido que a través de una aplicación estándar con crud. Si bien esta implementación se puede replicar en otras tecnologías Akka hace que sea extremadamente fácil hacerlo.

Akka http es la mejora de Typesafe sobre el framework Spray, es bastante fácil de usar (como lo fue su antecesor).

Para iniciar un server en akka http se debe escribir la siguiente linea:

val bindingFuture = Http().bindAndHandle(routes, "0.0.0.0", 8080)  

El formato de rutas es intuitivo para escribir, por ejemplo:

path("add-funds" / IntNumber)  { fund =>  
    get {
       complete {
          persistentActor ! AddFund(fund)
          s"""New fund has been added $fund. 
            |Try to stop your app and open it again to recover your balance""".stripMargin
       }
    }
}

Lo más importante a resaltar es que todo el código visto en el ejemplo es completamente asíncrono (non-blocking).

Este ejemplo estuvo basado en la plantilla de persistencia de Akka de activator. Typesafe brinda a disposición de todos los desarrolladores plantillas para iniciarse con sus tecnologías.

El ejemplo completo se puede descargar desde Github:
https://github.com/evolbit/akka-http-with-persistance-mongodb

Si desean conocer más sobre las tecnologías de Typesafe pueden revisar las plantillas en:
http://www.typesafe.com/activator/templates

Si desean conocer más sobre Scala y las tecnologías de Typesafe pueden revisar la siguiente presentación de la comunidad de Scala de Perú:
https://speakerdeck.com/mordonez/conociendo-scala-y-las-tecnologias-de-typesafe

La comunidad de Scala Perú en Meetup la pueden encontrar en el siguiente enlace:
http://www.meetup.com/es/scalaperu/

Marco Ordonez