Μια βιβλιοθήκη ροής με υπερδύναμη: FS2 και λειτουργικό προγραμματισμό

Το Scala διαθέτει μια πολύ ειδική βιβλιοθήκη ροής που ονομάζεται FS2 (Functional Streams for Scala). Αυτή η βιβλιοθήκη ενσωματώνει όλα τα πλεονεκτήματα του λειτουργικού προγραμματισμού (FP). Με την κατανόηση των σχεδιαστικών του στόχων, θα έχετε έκθεση στις βασικές ιδέες που κάνουν το FP τόσο ελκυστικό.

Το FS2 έχει έναν κεντρικό τύπο: Stream[Effect,Output]

Μπορεί να λάβετε από αυτόν τον τύπο ότι είναι Streamκαι ότι εκπέμπει τιμές τύπου Output.

Το προφανές ερώτημα εδώ είναι τι είναι Effect; Ποια είναι η σχέση μεταξύ Effectκαι Output; Και ποια πλεονεκτήματα έχει το FS2 σε σχέση με άλλες βιβλιοθήκες ροής;

ΣΦΑΙΡΙΚΗ ΕΙΚΟΝΑ

Θα ξεκινήσω εξετάζοντας τα προβλήματα που λύνει το FS2. Στη συνέχεια, συγκρίνω Listκαι Streamμε πολλά παραδείγματα κώδικα. Μετά από αυτό, θα επικεντρωθώ στον τρόπο χρήσης Streamμε DB ή οποιοδήποτε άλλο IO. Εκεί λάμπει το FS2 και χρησιμοποιείται ο Effectτύπος. Μόλις καταλάβετε τι Effectείναι, τα πλεονεκτήματα του Λειτουργικού Προγραμματισμού θα πρέπει να είναι εμφανή σε εσάς.

Στο τέλος αυτής της ανάρτησης θα λάβετε τις απαντήσεις στις ακόλουθες ερωτήσεις:

  • Ποια προβλήματα μπορώ να λύσω με το FS2;
  • Τι μπορώ να κάνω με Streamαυτό Listδεν μπορεί;
  • Πώς μπορώ να τροφοδοτήσω δεδομένα από API / File / DB Stream;
  • Τι είναι αυτός ο Effectτύπος και πώς σχετίζεται με τον λειτουργικό προγραμματισμό;

Σημείωση: Ο κωδικός είναι στη Scala και πρέπει να είναι κατανοητός ακόμη και χωρίς προηγούμενη γνώση της σύνταξης.

Ποια προβλήματα μπορώ να λύσω με το FS2;

  1. Streaming I / O: Φόρτωση σταδιακά μεγάλων συνόλων δεδομένων που δεν θα χωρέσουν στη μνήμη και λειτουργούν σε αυτά χωρίς να φυσήξετε το σωρό σας.
  2. Ροή ελέγχου (δεν καλύπτεται): Μετακίνηση δεδομένων από ένα / περισσότερα DB / αρχεία / API σε άλλα με έναν ωραίο δηλωτικό τρόπο.
  3. Ταυτότητα (δεν καλύπτεται): Εκτελέστε παράλληλα διαφορετικές ροές και κάντε τις να επικοινωνούν μαζί. Για παράδειγμα, φόρτωση δεδομένων από πολλά αρχεία και επεξεργασία τους ταυτόχρονα σε αντίθεση με διαδοχικά. Μπορείτε να κάνετε κάποια προηγμένα πράγματα εδώ. Οι ροές μπορούν να επικοινωνούν μαζί κατά τη διάρκεια του σταδίου επεξεργασίας και όχι μόνο στο τέλος.

List εναντίον Stream

Listείναι η πιο γνωστή και χρησιμοποιημένη δομή δεδομένων. Για να έχουμε μια αίσθηση για το πώς διαφέρει από το FS2 Stream, θα περάσουμε από μερικές περιπτώσεις χρήσης. Θα δούμε πώς Streamμπορεί να λύσει προβλήματα που Listδεν μπορούν.

Τα δεδομένα σας είναι πολύ μεγάλα και δεν χωρά στη μνήμη

Ας υποθέσουμε ότι έχετε ένα πολύ μεγάλο αρχείο (40 GB) fahrenheit.txt. Το αρχείο έχει θερμοκρασία σε κάθε γραμμή και θέλετε να το μετατρέψετε σε celsius.txt.

Φόρτωση ενός μεγάλου αρχείου χρησιμοποιώντας List

import scala.io.Source val list = Source.fromFile("testdata/fahrenheit.txt").getLines.toList java.lang.OutOfMemoryError: Java heap space java.util.Arrays.copyOfRange(Arrays.java:3664) java.lang.String.(String.java:207) java.io.BufferedReader.readLine(BufferedReader.java:356) java.io.BufferedReader.readLine(BufferedReader.java:389)

Listαποτυγχάνει άσχημα επειδή, φυσικά, το αρχείο είναι πολύ μεγάλο για να χωρέσει στη μνήμη. Εάν είστε περίεργοι, μπορείτε να ελέγξετε την πλήρη λύση χρησιμοποιώντας Streamεδώ - αλλά το κάνετε αργότερα, διαβάστε :)

Όταν η Λίστα δεν θα κάνει ... Ροή στη διάσωση!

Ας πούμε ότι κατάφερα να διαβάσω το αρχείο μου και θέλω να το γράψω ξανά. Θα ήθελα να διατηρήσω τη δομή της γραμμής. Πρέπει να εισαγάγω έναν νέο χαρακτήρα \nμετά από κάθε θερμοκρασία.

Μπορώ να χρησιμοποιήσω το intersperseσυνδυασμό για να το κάνω αυτό

import fs2._ Stream(1,2,3,4).intersperse("\n").toList

Ένα άλλο ωραίο είναι zipWithNext

scala> Stream(1,2,3,4).zipWithNext.toList res1: List[(Int, Option[Int])] = List((1,Some(2)), (2,Some(3)), (3,Some(4)), (4,None))

Συνδυάζει διαδοχικά πράγματα μαζί, πολύ χρήσιμο εάν θέλετε να καταργήσετε διαδοχικά διπλότυπα.

Αυτά είναι μόνο μερικά από πολλά πολύ χρήσιμα, εδώ είναι η πλήρης λίστα.

Προφανώς Streamμπορεί να κάνει πολλά πράγματα που Listδεν μπορούν, αλλά το καλύτερο χαρακτηριστικό έρχεται στην επόμενη ενότητα, είναι όλα σχετικά με τον τρόπο χρήσης Streamστον πραγματικό κόσμο με DBs / Files / API ...

Πώς μπορώ να τροφοδοτήσω δεδομένα από API / File / DB Stream;

Ας πούμε για τώρα ότι αυτό είναι το πρόγραμμά μας

scala> Stream(1,2,3) res2: fs2.Stream[fs2.Pure,Int] = Stream(..)

Τι σημαίνει αυτό Pure; Εδώ είναι το scaladoc από τον πηγαίο κώδικα:

/** * Indicates that a stream evaluates no effects. * * A `Stream[Pure,O]` can be safely converted to a `Stream[F,O]` for all `F`. */ type Pure[A] <: Nothing

Δεν σημαίνει κανένα εφέ, εντάξει…, αλλά τι είναι το αποτέλεσμα; και πιο συγκεκριμένα ποια είναι η επίδραση του προγράμματος μας Stream(1,2,3);

Αυτό το πρόγραμμα δεν έχει κυριολεκτικά καμία επίδραση στον κόσμο. Το μόνο αποτέλεσμα θα είναι να κάνει τη CPU σας να λειτουργεί και να καταναλώνει κάποια ισχύ !! Δεν επηρεάζει τον κόσμο γύρω σας.

Επηρεάζοντας τον κόσμο εννοώ ότι καταναλώνει οποιονδήποτε σημαντικό πόρο όπως ένα αρχείο, μια βάση δεδομένων ή παράγει οτιδήποτε όπως ένα αρχείο, ανεβάζοντας κάποια δεδομένα κάπου, γράφοντας στο τερματικό σας και ούτω καθεξής.

Πώς μπορώ να μετατρέψω μια Pureροή σε κάτι χρήσιμο;

Ας πούμε ότι θέλω να ταυτότητες των χρηστών του φορτίου από την DB, μου δίνεται αυτή τη λειτουργία, θα κάνει μια κλήση με το ΣΠ και επιστρέφει το userId ως Long.

import scala.concurrent.Future def loadUserIdByName(userName: String): Future[Long] = ???

Επιστρέφει ένα Futureπου δείχνει ότι αυτή η κλήση είναι ασύγχρονη και η τιμή θα είναι διαθέσιμη κάποια στιγμή στο μέλλον. Περιτυλίγει την τιμή που επιστρέφεται από το DB.

Έχω αυτήν τη Pureροή.

scala> val names = Stream("bob", "alice", "joe") names: fs2.Stream[fs2.Pure,String] = Stream(..)

Πώς μπορώ να αποκτήσω ένα Streamαναγνωριστικό;

Η αφελής προσέγγιση θα ήταν να χρησιμοποιήσετε τη mapσυνάρτηση, θα πρέπει να εκτελεί τη συνάρτηση για κάθε τιμή στο Stream.

scala> userIdsFromDB.compile res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = [email protected]

Επέστρεψα ακόμα ένα Pure! Έδωσα τη Streamλειτουργία που επηρεάζει τον κόσμο και έχω ακόμα ένα Pure, όχι δροσερό ... Θα ήταν τακτοποιημένο αν το FS2 θα ανιχνεύσει αυτόματα ότι η loadUserIdByNameλειτουργία έχει επίδραση στον κόσμο και μου επέστρεψε κάτι που ΔΕΝ είναι, Pureαλλά κάνει δεν δουλεύει έτσι. Πρέπει να χρησιμοποιήσετε έναν ειδικό συνδυασμό αντί για map: πρέπει να χρησιμοποιήσετε evalMap.

scala> userIdsFromDB.toList :18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long] userIdsFromDB.toList ^

Όχι πια Pure! πήραμε Futureαντ 'αυτού, ναι! Τι συνέβη μόλις τώρα?

Πήρε:

  • loadUserIdByName: Future[Long]
  • Stream[Pure, String]

Και άλλαξα τους τύπους της ροής σε

  • Stream[Future, Long]

Το χώρισε Futureκαι το απομόνωσε! Η αριστερή πλευρά που ήταν η Effectπαράμετρος τύπου είναι τώρα ο Futureτύπος σκυροδέματος .

Τακτοποιημένο κόλπο, αλλά πώς με βοηθάει;

Μόλις είδατε τον αληθινό διαχωρισμό των ανησυχιών. Μπορείτε να συνεχίσετε να λειτουργεί στη ροή με όλους τους συμπαθητικούς Listσυνδυασμούς και δεν χρειάζεται να ανησυχείτε εάν το DB είναι εκτός λειτουργίας, αργό ή όλα τα πράγματα που σχετίζονται με τις ανησυχίες του δικτύου (εφέ).

Όλα λειτουργούν έως ότου θέλω να χρησιμοποιήσω toListγια να επαναφέρω τις τιμές

scala> userIdsFromDB.toList :18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long] userIdsFromDB.toList ^

Τι???!!! Θα μπορούσα να ορκιστώ ότι χρησιμοποιούσα toListπριν και λειτούργησε, πώς μπορεί να πει ότι toListδεν είναι πλέον μέλος fs2.Stream[Future,String]; Είναι σαν να καταργήθηκε αυτή η λειτουργία τη στιγμή που άρχισα να χρησιμοποιώ μια ροή εφέ, αυτό είναι εντυπωσιακό! Αλλά πώς μπορώ να επαναφέρω τις αξίες μου;

scala> userIdsFromDB.compile res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = [email protected]

Πρώτα χρησιμοποιούμε compileγια να πούμε στο Streamνα συνδυάσουμε όλα τα εφέ σε ένα, ουσιαστικά διπλώνει όλες τις κλήσεις loadUserIdByNameσε ένα μεγάλο Future. Αυτό απαιτείται από το πλαίσιο και θα καταστεί προφανές γιατί αυτό το βήμα απαιτείται σύντομα.

Τώρα toListπρέπει να λειτουργήσει

scala> userIdsFromDB.compile.toList :18: error: could not find implicit value for parameter F: cats.effect.Sync[scala.concurrent.Future] userIdsFromDB.compile.toList ^

Τι?! ο μεταγλωττιστής εξακολουθεί να παραπονιέται. Αυτό γιατί Futureδεν είναι καλός Effectτύπος - σπάει τη φιλοσοφία του διαχωρισμού των ανησυχιών όπως εξηγείται στην επόμενη πολύ σημαντική ενότητα.

ΣΗΜΑΝΤΙΚΟ: Το ένα πράγμα που πρέπει να αφαιρέσετε από αυτήν την ανάρτηση

Ένα βασικό σημείο εδώ, είναι ότι το DB δεν έχει κληθεί σε αυτό το σημείο. Τίποτα δεν συνέβη πραγματικά, το πλήρες πρόγραμμα δεν παράγει τίποτα.

def loadUserIdByName(userName: String): Future[Long] = ??? Stream("bob", "alice", "joe").evalMap(loadUserIdByName).compile

Διαχωρισμός της περιγραφής του προγράμματος από την αξιολόγηση

Ναι μπορεί να είναι εκπληκτικό, αλλά το κύριο θέμα στο FP είναι ο διαχωρισμός του

  • Description of your program: a good example is the program we just wrote, it’s a pure description of the problem “I give you names and a DB, give me back IDs”

And the

  • Execution of your program: running the actual code and asking it to go to the DB

One more time our program has literally no effect on the world besides making your computer warm, exactly like our Pure stream.

Code that does not have an effect is called pure and that’s what all Functional Programming is about: writing programs with functions that are pure. Bravo, you now know what FP is all about.

Why would you want write code this way? Simple: to achieve separation of concerns between the IO parts and the rest of our code.

Now let’s fix our program and take care of this Future problem.

As we said Future is a bad Effect type, it goes against the separation of concerns principle. Indeed, Future is eager in Scala: the moment you create one it starts to executes on some thread, you don't have control of the execution and thus it breaks. FS2 is well aware of that and does not let you compile. To fix this we have to use a type called IO that wraps our bad Future.

That brings us to the last part, what is this IO type? and how do I finally get my list of usedIds back?

scala> import cats.effect.IO import cats.effect.IO scala> Stream("bob", "alice", "joe").evalMap(name => IO.fromFuture(IO(loadUserIdByName(name)))).compile.toList res8: cats.effect.IO[List[Long]] = IO$2104439279

It now gives us back a List but still, we didn't get our IDs back, so one last thing must be missing.

What does IO really mean?

IO comes from cats-effect library. First let's finish our program and finally get out the ids back from the DB.

scala> userIds.compile.toList.unsafeRunSync :18: error: not found: value userIds userIds.compile.toList.unsafeRunSync ^

The proof that it’s doing something is the fact that it’s failing.

loadUserIdByName(userName: String): Future[Long] = ???

When ??? is called you will get this exception, it means the function was executed (as opposed to before when we made the point that nothing was really happening). When we implement this function it will go to the DB and load the ids, and it will have an effect on the world (network/files system).

IO[Long] is a description of how to get a value of type Long and it most certainly involves doing some I/O i.e going to the network, loading a file,...

It’s the How and not the What. It describes how to get the value from the network. If you want to execute this description, you can use unsafeRunSync (or other functions prefixed unsafe). You can guess why they are called this way: indeed a call to a DB is inherently unsafe as it could fail if, for example, your Internet connection is out.

Recap

Let’s take a last look at Stream[Effect,Output].

Output is the type that the stream emits (could be a stream of String, Long or whatever type you defined).

Effect is the way (the recipe) to produce the Output (i.e go to the DB and give me an id of type Long).

It’s important to understand that if these types are separated to make things easier, breaking down a problem in subproblems allows you to reason about the subproblems independently. You can then solve them and combine their solutions.

The link between these 2 types is the following :

In order for the Stream to emit an element of type

  • Output

It needs to evaluate a type

  • Effect

A special type that encodes an effective action as a value of type IO, this IO value allows the separation of 2 concerns:

  • Description:IO is a simple immutable value, it’s a recipe to get a type A by doing some kind of IO(network/filesystem/…)
  • Execution: in order forIO to do something, you need to execute/run it using io.unsafeRunSync

Putting it all together

Stream[IO,Long] says:

This is a Stream that emits values of type Long and in order to do so, it needs to run an effective function that producesIO[Long] for each value.

That’s a lot of details packed in this very short type. The more details you get about how things happen the fewer errors you make.

Takeaways

  • Stream is a super charged version of List
  • Stream(1,2,3) is of type Stream[Pure, Int] , the second type Int is the type of all values that this stream will emit
  • Pure means no effect on the world. It just makes your CPU work and consumes some power, but besides that it does not affect the world around you.
  • Use evalMap instead of map when you want to apply a function that has an effect like loadUserIdByName to a Stream.
  • Stream[IO, Long] διαχωρίζει τις ανησυχίες του Τι και πώς επιτρέποντάς σας να εργάζεστε μόνο με τις τιμές και να μην ανησυχείτε για τον τρόπο λήψης τους (φόρτωση από το db).
  • Ο διαχωρισμός της περιγραφής του προγράμματος από την αξιολόγηση αποτελεί βασική πτυχή του ΠΠ.
  • Όλα τα προγράμματα με τα οποία γράφετε Streamδεν θα κάνουν τίποτα μέχρι να το χρησιμοποιήσετε unsafeRunSync. Πριν από αυτό, ο κωδικός σας είναι ουσιαστικά καθαρός.
  • IO[Long]είναι ένας τύπος εφέ που σας λέει: θα λάβετε Longτιμές από το IO (θα μπορούσε να είναι ένα αρχείο, το δίκτυο, η κονσόλα ...). Είναι μια περιγραφή και όχι ένα περιτύλιγμα!
  • Futureδεν συμμορφώνεται με αυτήν τη φιλοσοφία και συνεπώς δεν είναι συμβατή με το FS2, πρέπει να χρησιμοποιήσετε τον IOτύπο αντ 'αυτού.

Βίντεο FS2

  • Hands on screencast από τον Michael Pilquist: //www.youtube.com/watch?v=B1wb4fIdtn4
  • Συζήτηση από τον Fabio Labella //www.youtube.com/watch?v=x3GLwl1FxcA