Πώς να κάνετε μια απλή εφαρμογή με το Akka Cluster

Εάν διαβάσατε την προηγούμενη ιστορία μου για το Scalachain, πιθανότατα έχετε παρατηρήσει ότι απέχει πολύ από το να είναι ένα κατανεμημένο σύστημα. Δεν διαθέτει όλες τις δυνατότητες για να λειτουργεί σωστά με άλλους κόμβους. Προσθέστε σε αυτό ότι ένα blockchain που αποτελείται από έναν μόνο κόμβο είναι άχρηστο. Για αυτόν τον λόγο αποφάσισα ότι είναι καιρός να εργαστώ για το θέμα.

Επειδή το Scalachain τροφοδοτείται από την Akka, γιατί να μην εκμεταλλευτείτε την ευκαιρία να παίξετε με το Akka Cluster; Δημιούργησα ένα απλό έργο για να παίξω λίγο με τον Akka Cluster και σε αυτήν την ιστορία θα μοιραστώ τις γνώσεις μου. Θα δημιουργήσουμε ένα σύμπλεγμα τριών κόμβων, χρησιμοποιώντας το Cluster Aware Routers για να εξισορροπήσουμε το φορτίο μεταξύ τους. Τα πάντα θα εκτελούνται σε κοντέινερ Docker και θα χρησιμοποιήσουμε σύνθετη σύνδεση για εύκολη ανάπτυξη.

Εντάξει, ας κυλήσουμε! ;

Γρήγορη εισαγωγή στο Akka Cluster

Το Akka Cluster παρέχει μεγάλη υποστήριξη στη δημιουργία κατανεμημένων εφαρμογών. Η καλύτερη περίπτωση χρήσης είναι όταν έχετε έναν κόμβο που θέλετε να αναπαραγάγετε N φορές σε κατανεμημένο περιβάλλον. Αυτό σημαίνει ότι όλοι οι κόμβοι N είναι ομότιμοι με τον ίδιο κωδικό. Το Akka Cluster σάς δίνει τη δυνατότητα ανακάλυψης μελών στο ίδιο σύμπλεγμα. Χρησιμοποιώντας Cluster Aware Routers είναι δυνατή η εξισορρόπηση των μηνυμάτων μεταξύ των ηθοποιών σε διαφορετικούς κόμβους. Είναι επίσης δυνατό να επιλέξετε την πολιτική εξισορρόπησης, κάνοντας την εξισορρόπηση φορτίου ένα κομμάτι κέικ!

Στην πραγματικότητα μπορείτε να επιλέξετε μεταξύ δύο τύπων δρομολογητών:

Ομαδικός δρομολογητής - Οι ηθοποιοί που στέλνουν τα μηνύματα στους - καλούμενους δρομολογητές - καθορίζονται χρησιμοποιώντας τη διαδρομή του ηθοποιού. Οι δρομολογητές μοιράζονται τις διαδρομές που δημιουργούνται στο σύμπλεγμα. Σε αυτό το παράδειγμα θα χρησιμοποιήσουμε ένα Group Router.

Pool Router - Οι δρομολογητές δημιουργούνται και αναπτύσσονται από τον δρομολογητή, οπότε είναι τα παιδιά του στην ιεραρχία των ηθοποιών. Οι διαδρομές δεν κοινοποιούνται μεταξύ δρομολογητών. Αυτό είναι ιδανικό για ένα σενάριο πρωτεύοντος-ρεπλίκα, όπου κάθε δρομολογητής είναι ο κύριος και δρομολογεί τα αντίγραφα.

Αυτή είναι μόνο η κορυφή του παγόβουνου, γι 'αυτό σας καλώ να διαβάσετε την επίσημη τεκμηρίωση για περισσότερες πληροφορίες.

Ένα σύμπλεγμα για μαθηματικούς υπολογισμούς

Ας φανταστούμε ένα σενάριο περίπτωσης χρήσης. Ας υποθέσουμε ότι σχεδιάζουμε ένα σύστημα για την εκτέλεση μαθηματικών υπολογισμών κατόπιν αιτήματος. Το σύστημα αναπτύσσεται στο διαδίκτυο, οπότε χρειάζεται ένα REST API για να λαμβάνει τα αιτήματα υπολογισμού. Ένας εσωτερικός επεξεργαστής χειρίζεται αυτά τα αιτήματα, εκτελώντας τον υπολογισμό και επιστρέφοντας το αποτέλεσμα.

Αυτήν τη στιγμή ο επεξεργαστής μπορεί να υπολογίσει μόνο τον αριθμό Fibonacci. Αποφασίζουμε να χρησιμοποιήσουμε ένα σύμπλεγμα κόμβων για να κατανείμουμε το φορτίο μεταξύ των κόμβων και να βελτιώσουμε την απόδοση. Το Akka Cluster θα χειριστεί τη δυναμική του συμπλέγματος και την εξισορρόπηση φορτίου μεταξύ κόμβων. Εντάξει, ακούγεται καλό!

Ιεραρχία ηθοποιού

Πρώτα πράγματα πρώτα: πρέπει να ορίσουμε την ιεραρχία των ηθοποιών μας. Το σύστημα μπορεί να χωριστεί σε τρία λειτουργικά μέρη: την επιχειρησιακή λογική , τη διαχείριση συμπλεγμάτων και τον ίδιο τον κόμβο . Υπάρχει επίσης ο διακομιστής, αλλά δεν είναι ηθοποιός και θα το δουλέψουμε αργότερα.

Επαγγελματική λογική

Η εφαρμογή πρέπει να κάνει μαθηματικούς υπολογισμούς. Μπορούμε να ορίσουμε έναν απλό Processorπαράγοντα για τη διαχείριση όλων των υπολογιστικών εργασιών. Κάθε υπολογισμός που υποστηρίζουμε μπορεί να εφαρμοστεί σε έναν συγκεκριμένο ηθοποιό, που θα είναι παιδί του Processor. Με αυτόν τον τρόπο η εφαρμογή είναι αρθρωτή και ευκολότερη για επέκταση και συντήρηση. Αυτή τη στιγμή το μόνο παιδί Processorθα είναι ο ProcessorFibonacciηθοποιός. Υποθέτω ότι μπορείτε να μαντέψετε ποια είναι η αποστολή της. Αυτό πρέπει να είναι αρκετό για να ξεκινήσει.

Διαχείριση συμπλέγματος

Για τη διαχείριση του συμπλέγματος χρειαζόμαστε ένα ClusterManager. Ακούγεται απλό, έτσι; Αυτός ο ηθοποιός χειρίζεται όλα όσα σχετίζονται με το σύμπλεγμα, όπως την επιστροφή των μελών του όταν τους ρωτήθηκε. Θα ήταν χρήσιμο να καταγράψετε τι συμβαίνει μέσα στο σύμπλεγμα, οπότε ορίζουμε έναν ClusterListenerηθοποιό. Αυτό είναι παιδί του ClusterManager, και εγγράφεται σε συμβάντα συμπλέγματος που τα καταγράφουν.

Κόμβος

Ο Nodeηθοποιός είναι η ρίζα της ιεραρχίας μας. Είναι το σημείο εισόδου του συστήματός μας που επικοινωνεί με το API. Τα Processorκαι ClusterManagerείναι τα παιδιά του, μαζί με τον ProcessorRouterηθοποιό. Αυτός είναι ο εξισορροπητής φορτίου του συστήματος, κατανέμοντας το φορτίο μεταξύ Processors. Θα το διαμορφώσουμε ως Cluster Aware Router, έτσι ώστε όλοι ProcessorRouterνα μπορούν να στέλνουν μηνύματα Processorσε κάθε κόμβο.

Υλοποίηση ηθοποιού

Ώρα να εφαρμόσουμε τους ηθοποιούς μας! Πρώτα εφαρμόζουμε τους παράγοντες που σχετίζονται με την επιχειρηματική λογική του συστήματος. Συνεχίζουμε λοιπόν τους ηθοποιούς για τη διαχείριση συμπλέγματος και τον αρχικό παράγοντα ( Node) στο τέλος.

Επεξεργαστής Fibonacci

Αυτός ο ηθοποιός εκτελεί τον υπολογισμό του αριθμού Fibonacci. Λαμβάνει ένα Computeμήνυμα που περιέχει τον αριθμό για τον υπολογισμό και την αναφορά του ηθοποιού για να απαντήσει. Η αναφορά είναι σημαντική, καθώς μπορεί να υπάρχουν διαφορετικοί αιτούντες ηθοποιοί. Θυμηθείτε ότι εργαζόμαστε σε ένα κατανεμημένο περιβάλλον!

Μόλις ληφθεί το Computeμήνυμα, η fibonacciσυνάρτηση υπολογίζει το αποτέλεσμα. Τυλίγουμε σε ένα ProcessorResponseαντικείμενο για να παρέχουμε πληροφορίες σχετικά με τον κόμβο που εκτέλεσε τον υπολογισμό. Αυτό θα είναι χρήσιμο αργότερα για να δείτε την πολιτική του round-robin σε δράση.

Το αποτέλεσμα στη συνέχεια αποστέλλεται στον ηθοποιό στον οποίο πρέπει να απαντήσουμε. Πανεύκολο.

object ProcessorFibonacci { sealed trait ProcessorFibonacciMessage case class Compute(n: Int, replyTo: ActorRef) extends ProcessorFibonacciMessage def props(nodeId: String) = Props(new ProcessorFibonacci(nodeId)) def fibonacci(x: Int): BigInt = { @tailrec def fibHelper(x: Int, prev: BigInt = 0, next: BigInt = 1): BigInt = x match { case 0 => prev case 1 => next case _ => fibHelper(x - 1, next, next + prev) } fibHelper(x) } } class ProcessorFibonacci(nodeId: String) extends Actor { import ProcessorFibonacci._ override def receive: Receive = { case Compute(value, replyTo) => { replyTo ! ProcessorResponse(nodeId, fibonacci(value)) } } }

Επεξεργαστής

Ο Processorηθοποιός διαχειρίζεται τους συγκεκριμένους υποεπεξεργαστές, όπως ο Fibonacci. Πρέπει να δημιουργεί τους υποεπεξεργαστές και να τους διαβιβάζει. Αυτή τη στιγμή έχουμε μόνο ένα υπο-επεξεργαστή, έτσι ώστε το Processorλαμβάνει ένα είδος μηνύματος: ComputeFibonacci. Αυτό το μήνυμα περιέχει τον αριθμό Fibonacci για υπολογισμό. Μόλις ληφθεί, ο αριθμός για τον υπολογισμό αποστέλλεται σε ένα FibonacciProcessor, μαζί με την αναφορά του sender().

object Processor { sealed trait ProcessorMessage case class ComputeFibonacci(n: Int) extends ProcessorMessage def props(nodeId: String) = Props(new Processor(nodeId)) } class Processor(nodeId: String) extends Actor { import Processor._ val fibonacciProcessor: ActorRef = context.actorOf(ProcessorFibonacci.props(nodeId), "fibonacci") override def receive: Receive = { case ComputeFibonacci(value) => { val replyTo = sender() fibonacciProcessor ! Compute(value, replyTo) } } }

ClusterListener

Θα θέλαμε να καταγράψουμε χρήσιμες πληροφορίες για το τι συμβαίνει στο σύμπλεγμα. Αυτό θα μπορούσε να μας βοηθήσει να διορθώσουμε το σύστημα αν χρειαστεί. Αυτός είναι ο σκοπός του ClusterListenerηθοποιού. Πριν ξεκινήσει, εγγράφεται στα μηνύματα συμβάντων του συμπλέγματος. Οι αντιδρά ηθοποιός σε μηνύματα αρέσει MemberUp, UnreachableMemberή MemberRemoved, καταγραφή την αντίστοιχη εκδήλωση. Όταν ClusterListenerσταματήσει, διαγράφεται από τα συμβάντα συμπλέγματος.

object ClusterListener { def props(nodeId: String, cluster: Cluster) = Props(new ClusterListener(nodeId, cluster)) } class ClusterListener(nodeId: String, cluster: Cluster) extends Actor with ActorLogging { override def preStart(): Unit = { cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) } override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case MemberUp(member) => log.info("Node {} - Member is Up: {}", nodeId, member.address) case UnreachableMember(member) => log.info(s"Node {} - Member detected as unreachable: {}", nodeId, member) case MemberRemoved(member, previousStatus) => log.info(s"Node {} - Member is Removed: {} after {}", nodeId, member.address, previousStatus) case _: MemberEvent => // ignore } }

ClusterManager

Ο ηθοποιός υπεύθυνος για τη διαχείριση του συμπλέγματος είναι ClusterManager. Δημιουργεί τον ClusterListenerηθοποιό και παρέχει τη λίστα των μελών του συμπλέγματος κατόπιν αιτήματος. Θα μπορούσε να επεκταθεί για να προσθέσει περισσότερες λειτουργίες, αλλά αυτή τη στιγμή αρκεί.

object ClusterManager { sealed trait ClusterMessage case object GetMembers extends ClusterMessage def props(nodeId: String) = Props(new ClusterManager(nodeId)) } class ClusterManager(nodeId: String) extends Actor with ActorLogging { val cluster: Cluster = Cluster(context.system) val listener: ActorRef = context.actorOf(ClusterListener.props(nodeId, cluster), "clusterListener") override def receive: Receive = { case GetMembers => { sender() ! cluster.state.members.filter(_.status == MemberStatus.up) .map(_.address.toString) .toList } } }

Επεξεργαστής δρομολογητής

Η εξισορρόπηση φορτίου μεταξύ των επεξεργαστών αντιμετωπίζεται από το ProcessorRouter. Δημιουργήθηκε από τον Nodeηθοποιό, αλλά αυτή τη φορά όλες οι απαιτούμενες πληροφορίες παρέχονται στη διαμόρφωση του συστήματος.

class Node(nodeId: String) extends Actor { //... val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") //... }

Ας αναλύσουμε το σχετικό μέρος του application.confαρχείου.

akka { actor { ... deployment { /node/processorRouter { router = round-robin-group routees.paths = ["/user/node/processor"] cluster { enabled = on allow-local-routees = on } } } } ... }

Το πρώτο πράγμα είναι να καθορίσετε τη διαδρομή προς τον δρομολογητή, δηλαδή /node/processorRouter. Μέσα σε αυτήν την ιδιότητα μπορούμε να διαμορφώσουμε τη συμπεριφορά του δρομολογητή:

  • router: αυτή είναι η πολιτική για την εξισορρόπηση φορτίου των μηνυμάτων. Επέλεξα το round-robin-group, αλλά υπάρχουν πολλά άλλα.
  • routees.paths: these are the paths to the actors that will receive the messages handled by the router. We are saying: “When you receive a message, look for the actors corresponding to these paths. Choose one according to the policy and forward the message to it.” Since we are using Cluster Aware Routers, the routees can be on any node of the cluster.
  • cluster.enabled: are we operating in a cluster? The answer is on, of course!
  • cluster.allow-local-routees: here we are allowing the router to choose a routee in its node.

Using this configuration we can create a router to load balance the work among our processors.

Node

The root of our actor hierarchy is the Node. It creates the children actors — ClusterManager, Processor, and ProcessorRouter — and forwards the messages to the right one. Nothing complex here.

object Node { sealed trait NodeMessage case class GetFibonacci(n: Int) case object GetClusterMembers def props(nodeId: String) = Props(new Node(nodeId)) } class Node(nodeId: String) extends Actor { val processor: ActorRef = context.actorOf(Processor.props(nodeId), "processor") val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") val clusterManager: ActorRef = context.actorOf(ClusterManager.props(nodeId), "clusterManager") override def receive: Receive = { case GetClusterMembers => clusterManager forward GetMembers case GetFibonacci(value) => processorRouter forward ComputeFibonacci(value) } }

Server and API

Every node of our cluster runs a server able to receive requests. The Server creates our actor system and is configured through the application.conf file.

object Server extends App with NodeRoutes { implicit val system: ActorSystem = ActorSystem("cluster-playground") implicit val materializer: ActorMaterializer = ActorMaterializer() val config: Config = ConfigFactory.load() val address = config.getString("http.ip") val port = config.getInt("http.port") val nodeId = config.getString("clustering.ip") val node: ActorRef = system.actorOf(Node.props(nodeId), "node") lazy val routes: Route = healthRoute ~ statusRoutes ~ processRoutes Http().bindAndHandle(routes, address, port) println(s"Node $nodeId is listening at //$address:$port") Await.result(system.whenTerminated, Duration.Inf) }

Akka HTTP powers the server itself and the REST API, exposing three simple endpoints. These endpoints are defined in the NodeRoutes trait.

The first one is /health, to check the health of a node. It responds with a 200 OK if the node is up and running

lazy val healthRoute: Route = pathPrefix("health") { concat( pathEnd { concat( get { complete(StatusCodes.OK) } ) } ) }

The /status/members endpoint responds with the current active members of the cluster.

lazy val statusRoutes: Route = pathPrefix("status") { concat( pathPrefix("members") { concat( pathEnd { concat( get { val membersFuture: Future[List[String]] = (node ? GetClusterMembers).mapTo[List[String]] onSuccess(membersFuture) { members => complete(StatusCodes.OK, members) } } ) } ) } ) }

The last (but not the least) is the /process/fibonacci/n endpoint, used to request the Fibonacci number of n.

lazy val processRoutes: Route = pathPrefix("process") { concat( pathPrefix("fibonacci") { concat( path(IntNumber) { n => pathEnd { concat( get { val processFuture: Future[ProcessorResponse] = (node ? GetFibonacci(n)).mapTo[ProcessorResponse] onSuccess(processFuture) { response => complete(StatusCodes.OK, response) } } ) } } ) } ) }

It responds with a ProcessorResponse containing the result, along with the id of the node where the computation took place.

Cluster Configuration

Once we have all our actors, we need to configure the system to run as a cluster! The application.conf file is where the magic takes place. I’m going to split it in pieces to present it better, but you can find the complete file here.

Let’s start defining some useful variables.

clustering { ip = "127.0.0.1" ip = ${?CLUSTER_IP} port = 2552 port = ${?CLUSTER_PORT} seed-ip = "127.0.0.1" seed-ip = ${?CLUSTER_SEED_IP} seed-port = 2552 seed-port = ${?CLUSTER_SEED_PORT} cluster.name = "cluster-playground" }

Here we are simply defining the ip and port of the nodes and the seed, as well as the cluster name. We set a default value, then we override it if a new one is specified. The configuration of the cluster is the following.

akka { actor { provider = "cluster" ... /* router configuration */ ... } remote { log-remote-lifecycle-events = on netty.tcp { hostname = ${clustering.ip} port = ${clustering.port} } } cluster { seed-nodes = [ "akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port} ] auto-down-unreachable-after = 10s } } ... /* server vars */ ... /* cluster vars */ }

Akka Cluster is build on top of Akka Remoting, so we need to configure it properly. First of all, we specify that we are going to use Akka Cluster saying that provider = "cluster". Then we bind cluster.ip and cluster.port to the hostname and port of the netty web framework.

The cluster requires some seed nodes as its entry points. We set them in the seed-nodes array, in the format akka.tcp://"{clustering.cluster.name}"@"{clustering.seed-ip}":”${clustering.seed-port}”. Right now we have one seed node, but we may add more later.

The auto-down-unreachable-after property sets a member as down after it is unreachable for a period of time. This should be used only during development, as explained in the official documentation.

Ok, the cluster is configured, we can move to the next step: Dockerization and deployment!

Dockerization and deployment

To create the Docker container of our node we can use sbt-native-packager. Its installation is easy: add addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15") to the plugin.sbt file in the project/ folder. This amazing tool has a plugin for the creation of Docker containers. it allows us to configure the properties of our Dockerfile in the build.sbt file.

// other build.sbt properties enablePlugins(JavaAppPackaging) enablePlugins(DockerPlugin) enablePlugins(AshScriptPlugin) mainClass in Compile := Some("com.elleflorio.cluster.playground.Server") dockerBaseImage := "java:8-jre-alpine" version in Docker := "latest" dockerExposedPorts := Seq(8000) dockerRepository := Some("elleflorio")

Once we have setup the plugin, we can create the docker image running the command sbt docker:publishLocal. Run the command and taste the magic… ?

We have the Docker image of our node, now we need to deploy it and check that everything works fine. The easiest way is to create a docker-compose file that will spawn a seed and a couple of other nodes.

version: '3.5' networks: cluster-network: services: seed: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '2552:2552' - '8000:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: seed CLUSTER_SEED_IP: seed node1: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8001:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node1 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552 node2: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8002:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node2 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552

I won’t spend time going through it, since it is quite simple.

Let’s run it!

Time to test our work! Once we run the docker-compose up command, we will have a cluster of three nodes up and running. The seed will respond to requests at port :8000, while node1 and node2 at port :8001 and :8002. Play a bit with the various endpoints. You will see that the requests for a Fibonacci number will be computed by a different node each time, following a round-robin policy. That’s good, we are proud of our work and can get out for a beer to celebrate! ?

Conclusion

We are done here! We learned a lot of things in these ten minutes:

  • What Akka Cluster is and what can do for us.
  • How to create a distributed application with it.
  • How to configure a Group Router for load-balancing in the cluster.
  • How to Dockerize everything and deploy it using docker-compose.

Μπορείτε να βρείτε την πλήρη εφαρμογή στο repo GitHub μου. Μη διστάσετε να συνεισφέρετε ή να παίξετε με αυτό που θέλετε! ;

Τα λέμε! ;