Μαθήματα που αντλήθηκαν κατά την επεξεργασία της Wikipedia με το Apache Spark

Το Apache Spark είναι ένα πλαίσιο συμπλέγματος υπολογιστών ανοικτού κώδικα που ανέχεται σφάλματα που υποστηρίζει επίσης SQL analytics, μηχανική εκμάθηση και επεξεργασία γραφημάτων.

Λειτουργεί χωρίζοντας τα δεδομένα σας σε διαμερίσματα και, στη συνέχεια, επεξεργάζεστε αυτά τα διαμερίσματα παράλληλα σε όλους τους κόμβους του συμπλέγματος. Εάν κάποιος κόμβος πέσει κάτω, εκχωρεί εκ νέου την εργασία αυτού του κόμβου σε διαφορετικό κόμβο και ως εκ τούτου παρέχει ανοχή σφαλμάτων.

Όντας 100 φορές γρηγορότερος από το Hadoop, το έκανε πολύ δημοφιλές για την επεξεργασία Big Data. Το Spark είναι γραμμένο στη Scala και τρέχει στο JVM, αλλά τα καλά νέα είναι ότι παρέχει επίσης API για Python και R καθώς και C #. Είναι καλά τεκμηριωμένο με παραδείγματα που πρέπει να δείτε.

Όταν είστε έτοιμοι να το δοκιμάσετε, αυτό το άρθρο θα σας καθοδηγήσει από τη λήψη και τη ρύθμιση έως τον συντονισμό απόδοσης. Το μικροσκοπικό σύμπλεγμα Spark μου πραγματοποίησε 100 εκατομμύρια αγώνες συμβολοσειράς σε όλα τα άρθρα της Wikipedia - σε λιγότερο από δύο ώρες.

Είναι όταν ξεπεράσετε τα μαθήματα και κάνετε κάποια σοβαρή δουλειά που συνειδητοποιείτε όλες τις ταλαιπωρίες της τεχνολογικής στοίβας που χρησιμοποιείτε. Η εκμάθηση λαθών είναι ο καλύτερος τρόπος για να μάθετε. Αλλά μερικές φορές έχετε λίγο χρόνο και εύχεστε να γνωρίζετε κάθε πιθανό πράγμα που θα μπορούσε να πάει στραβά.

Εδώ, περιγράφω μερικά από τα προβλήματα που αντιμετώπισα όταν ξεκίνησα με το Spark και πώς μπορείτε να τα αποφύγετε.

Πως να ξεκινήσεις

Κατεβάστε το δυαδικό Spark που συνοδεύεται από συσκευασμένες εξαρτήσεις Hadoop

Εάν ξεκινήσετε να κατεβάζετε το Spark, θα παρατηρήσετε ότι υπάρχουν διάφορα δυαδικά αρχεία για την ίδια έκδοση. Το Spark διαφημίζει ότι δεν χρειάζεται Hadoop, επομένως μπορείτε να κάνετε λήψη της έκδοσης Hadoop που παρέχεται από τον χρήστη και είναι μικρότερη σε μέγεθος. Μην το κάνεις αυτό .

Αν και το Spark δεν χρησιμοποιεί το πλαίσιο MapReduce του Hadoop, έχει εξαρτήσεις από άλλες βιβλιοθήκες Hadoop όπως το HDFS και το YARN. Η έκδοση χωρίς-hadoop είναι για όταν έχετε ήδη διαθέσει βιβλιοθήκες Hadoop αλλού.

Χρησιμοποιήστε τη λειτουργία αυτόνομου συμπλέγματος, όχι Mesos ή YARN

Μόλις δοκιμάσετε τα ενσωματωμένα παραδείγματα στο localσύμπλεγμα και βεβαιωθείτε ότι όλα έχουν εγκατασταθεί και λειτουργεί σωστά, προχωρήστε στη ρύθμιση του συμπλέγματος σας.

Το Spark σας δίνει τρεις επιλογές: Mesos, YARN και αυτόνομο.

Οι δύο πρώτοι είναι διανομείς πόρων που ελέγχουν τους κόμβους ρεπλίκα σας. Ο Spark πρέπει να τους ζητήσει να διαθέσει τις δικές του περιπτώσεις. Ως αρχάριος, μην αυξήσετε την πολυπλοκότητά σας πηγαίνοντας έτσι.

Το αυτόνομο σύμπλεγμα είναι το πιο εύκολο στη ρύθμιση. Έρχεται με λογικές προεπιλογές, όπως η χρήση όλων των πυρήνων σας για εκτελεστές. Είναι μέρος της ίδιας της διανομής Spark και έχει ένα sbin/start-all.shσενάριο που μπορεί να εμφανίσει το πρωτεύον καθώς και όλα τα αντίγραφα σας που αναφέρονται στη conf/slavesχρήση του ssh.

Τα Mesos / YARN είναι ξεχωριστά προγράμματα που χρησιμοποιούνται όταν το σύμπλεγμα δεν είναι απλώς ένα σπινθήρα. Επίσης, δεν συνοδεύονται από λογικές προεπιλογές: οι εκτελεστές δεν χρησιμοποιούν όλους τους πυρήνες στα αντίγραφα, εκτός αν ορίζεται ρητά.

Έχετε επίσης την επιλογή λειτουργίας υψηλής διαθεσιμότητας χρησιμοποιώντας το Zookeeper, το οποίο διατηρεί μια λίστα εφεδρικών πρωτογενών σε περίπτωση που τυχόν πρωτεύουσες αποτυχίες. Εάν είστε αρχάριος, πιθανότατα δεν χειρίζεστε ένα σύμπλεγμα χίλιων κόμβων όπου ο κίνδυνος αποτυχίας κόμβων είναι σημαντικός. Είναι πιο πιθανό να δημιουργήσετε ένα σύμπλεγμα σε μια διαχειριζόμενη πλατφόρμα cloud όπως το Amazon ή το Google, το οποίο ήδη φροντίζει για αποτυχίες κόμβων.

Δεν χρειάζεστε υψηλή διαθεσιμότητα με υποδομή cloud ή ένα μικρό σύμπλεγμα

Είχα δημιουργήσει το σύμπλεγμα μου σε ένα εχθρικό περιβάλλον όπου οι ανθρώπινοι παράγοντες ήταν υπεύθυνοι για τις διακοπές ρεύματος και τους κόμβους εκτός δικτύου. (Βασικά το εργαστήριο υπολογιστών μου στο κολέγιο όπου οι επιμελείς μαθητές απενεργοποιούν τη μηχανή και απρόσεκτοι μαθητές βγάζουν καλώδια LAN). Θα μπορούσα να συνεχίσω χωρίς υψηλή διαθεσιμότητα με προσεκτική επιλογή του πρωτεύοντος κόμβου. Δεν θα πρέπει να ανησυχείτε για αυτό.

Ελέγξτε την έκδοση Java που χρησιμοποιείτε για την εκτέλεση του Spark

Μια πολύ σημαντική πτυχή είναι η έκδοση Java που χρησιμοποιείτε για την εκτέλεση του Spark. Κανονικά, μια νεότερη έκδοση του Java λειτουργεί με κάτι που έχει συνταχθεί για παλαιότερες εκδόσεις.

Αλλά με το Project Jigsaw, η αρθρωτότητα εισήγαγε αυστηρότερη απομόνωση και όρια στην Java 9 που σπάει ορισμένα πράγματα που χρησιμοποιούν την αντανάκλαση. Στο Spark 2.3.0 που εκτελείται στο Java 9, έχω παράνομη πρόσβαση προβληματισμού. Η Java 8 δεν είχε προβλήματα.

Αυτό σίγουρα θα αλλάξει στο εγγύς μέλλον, αλλά να το έχετε υπόψη σας μέχρι τότε.

Καθορίστε το κύριο URL ακριβώς όπως είναι. Μην επιλύετε τα ονόματα τομέα σε διευθύνσεις IP ή το αντίστροφο

Το αυτόνομο σύμπλεγμα είναι πολύ ευαίσθητο σχετικά με τις διευθύνσεις URL που χρησιμοποιούνται για την επίλυση των κύριων και των αντιγράφων κόμβων. Ας υποθέσουμε ότι ξεκινάτε τον κύριο κόμβο όπως παρακάτω:

> sbin/start-master.sh 

και ο κύριος σας έχει τελειώσει στις localhost:8080

Από προεπιλογή, το όνομα κεντρικού υπολογιστή του υπολογιστή σας επιλέγεται ως η κύρια διεύθυνση URL. x360επιλύει, localhostαλλά η έναρξη ενός αντιγράφου όπως παρακάτω δεν θα λειτουργήσει.

# does not work > sbin/start-slave.sh spark://localhost:7077 
# works > sbin/start-slave.sh spark://x360:7077

Αυτό λειτουργεί και το αντίγραφο μας έχει προστεθεί στο σύμπλεγμα:

Το αντίγραφο μας έχει διεύθυνση IP στον υποτομέα 172.17.xx, ο οποίος είναι στην πραγματικότητα ο υποτομέας που έχει ρυθμιστεί από τον Docker στον υπολογιστή μου.

Ο πρωτεύων μπορεί να επικοινωνήσει με αυτό το αντίγραφο επειδή και οι δύο βρίσκονται στο ίδιο μηχάνημα. Ωστόσο, το αντίγραφο δεν μπορεί να επικοινωνήσει με άλλα αντίγραφα στο δίκτυο ή με ένα πρωτεύον σε διαφορετικό μηχάνημα, επειδή η διεύθυνση IP του δεν είναι δρομολογημένη.

Όπως στην κύρια περίπτωση παραπάνω, ένα αντίγραφο σε ένα μηχάνημα χωρίς πρωτεύον θα καταλάβει το όνομα κεντρικού υπολογιστή του μηχανήματος. Όταν έχετε πανομοιότυπα μηχανήματα, όλα καταλήγουν να χρησιμοποιούν το ίδιο όνομα κεντρικού υπολογιστή με τη διεύθυνσή τους. Αυτό δημιουργεί ένα πλήρες χάος και κανείς δεν μπορεί να επικοινωνήσει με τον άλλο.

Έτσι οι παραπάνω εντολές θα αλλάξουν σε:

# start master> sbin/start-master.sh -h $myIP # start slave > sbin/start-slave.sh -h $myIP spark://:7077 # submit a job > SPARK_LOCAL_IP=$myIP bin/spark-submit ...

πού myIPείναι η διεύθυνση IP του μηχανήματος που μπορεί να δρομολογηθεί μεταξύ των κόμβων συμπλέγματος. Είναι πιο πιθανό ότι όλοι οι κόμβοι βρίσκονται στο ίδιο δίκτυο, οπότε μπορείτε να γράψετε ένα σενάριο που θα ρυθμιστεί myIPσε κάθε υπολογιστή.

# assume all nodes in the 10.1.26.x subdomain [email protected]:~$ myIP=`hostname -I | tr " " "\n" | grep 10.1.26. | head`

Ροή του κώδικα

So far we have set up our cluster and seen that it is functional. Now its time to code. Spark is quite well-documented and comes with lots of examples, so its very easy to get started with coding. What is less obvious is how the whole thing works which results in some very hard to debug errors during runtime. Suppose you coded something like this:

class SomeClass { static SparkSession spark; static LongAccumulator numSentences; 
 public static void main(String[] args) { spark = SparkSession.builder() .appName("Sparkl") .getOrCreate(); (1) numSentences = spark.sparkContext() .longAccumulator("sentences"); (2) spark.read() .textFile(args[0]) .foreach(SomeClass::countSentences); (3) } static void countSentences(String s) { numSentences.add(1); } (4) }

1 create a spark session

2 create a long counter to keep track of job progress

3 traverse a file line by line calling countSentences for each line

4 add 1 to the accumulator for each sentence

The above code works on a local cluster but will fail with a null pointer exception when run on a multinode cluster. Both spark as well as numSentences will be null on the replica machine.

To solve this problem, encapsulate all initialized states in non-static fields of an object. Use main to create the object and defer further processing to it.

What you need to understand is that the code you write is run by the driver node exactly as is, but what the replica nodes execute is a serialized job that spark gives them. Your classes will be loaded by the JVM on the replica.

Static initializers will run as expected, but functions like main won’t, so static values initialized in the driver won’t be seen in the replica. I am not sure how the whole thing works, and am only inferring from experience, so take my explanation with a grain of salt. So your code now looks like:

class SomeClass { SparkSession spark; (1) LongAccumulator numSentences; String[] args; SomeClass(String[] args) { this.args = args; } public static void main(String[] args){ new SomeClass(args).process(); (2) } void process() { spark = SparkSession.builder().appName("Sparkl").getOrCreate(); numSentences = spark.sparkContext().longAccumulator("sentences"); spark.read().textFile(args[0]).foreach(this::countSentences); (3) } void countSentences(String s) { numSentences.add(1); }}

1 Make fields non static

2 create instance of the class and then execute spark jobs

3 reference to this in the foreach lambda brings the object in the closure of accessible objects and thus gets serialized and sent to all replicas.

Those of you who are programming in Scala might use Scala objects which are singleton classes and hence may never come across this problem. Nevertheless, it is something you should know.

Submit app and dependencies

There is more to coding above, but before that you need to submit your application to the cluster. Unless your app is extremely trivial, chances are you are using external libraries.

When you submit your app jar, you also need to tell Spark the dependent libraries that you are using, so it will make them available on all nodes. It is pretty straightforward. The syntax is:

bin/spark-submit --packages groupId:artifactId:version,...

I have had no issues with this scheme. It works flawlessly. I generally develop on my laptop and then submit jobs from a node on the cluster. So I need to transfer the app and its dependencies to whatever node I ssh into.

Spark looks for dependencies in the local maven repo, then the central repo and any repos you specify using --repositories option. It is a little cumbersome to sync all that on the driver and then type out all those dependencies on the command line. So I prefer all dependencies packaged in a single jar, called an uber jar.

Use Maven shade plugin to generate an uber jar with all dependencies so job submitting becomes easier

Just include the following lines in your pom.xml

   org.apache.maven.plugins maven-shade-plugin  shade      

When you build and package your project, the default distribution jar will have all dependencies included.

As you submit jobs, the application jars get accumulated in the work directory and fill up over time.

Set spark.worker.cleanup.enabled to true in conf/spark-defaults.conf

This option is false by default and is applicable to the stand-alone mode.

Input and Output files

This was the most confusing part that was difficult to diagnose.

Spark supports reading/writing of various sources such as hdfs, ftp, jdbc or local files on the system when the protocol is file:// or missing. My first attempt was to read from a file on my driver. I assumed that the driver would read the file, turn it into partitions, and then distribute those across the cluster. Turns out it doesn’t work that way.

When you read a file from the local filesystem, ensure that the file is present on all the worker nodes at exactly the same location. Spark does not implicitly distribute files from the driver to the workers.

So I had to copy the file to every worker at the same location. The location of the file was passed as an argument to my app. Since the file was located in the parent folder, I specified its path as ../wikiArticles.txt. This did not work on the worker nodes.

Always pass absolute file paths for reading

It could be a mistake from my side, but I know that the filepath made it as is into the textFile function and it caused “file not found” errors.

Spark supports common compression schemes, so most gzipped or bzipped text files will be uncompressed before use. It might seem that compressed files will be more efficient, but do not fall for that trap.

Don’t read from compressed text files, especially gzip. Uncompressed files are faster to process.

Gzip cannot be uncompressed in parallel like bzip2, so nodes spend the bulk of their time uncompressing large files.

It is a hassle to make the input files available on all workers. You can instead use Spark’s file broadcast mechanism. When submitting a job, specify a comma separated list of input files with the --files option. Accessing these files requires SparkFiles.get(filename). I could not find enough documentation on this feature.

To read a file broadcasted with the --files option, use SparkFiles.get( h>) as the pathname in read functions.

So a file submitted as --files /opt/data/wikiAbstracts.txt would be accesed as SparkFiles.get("WikiAbstracts.txt"). This returns a string which you can use in any read function that expects a path. Again, remember to specify absolute paths.

Since my input file was 5GB gzipped, and my network was quite slow at 12MB/s, I tried to use Spark’s file broadcast feature. But the decompression itself was taking so long that I manually copied the file to every worker. If your network is fast enough, you can use uncompressed files. Or alternatively, use HDFS or FTP server.

Writing files also follows the semantics of reading. I was saving my DataFrame to a csv file on the local system. Again I had the assumption that the results would be sent back to the driver node. Didn’t work for me.

When a DataFrame is saved to local file path, each worker saves its computed partitions to its own disk. No data is sent back to the driver

I was only getting a fraction of the results I was expecting. Initially I had misdiagnosed this problem as an error in my code. Later I found out that each worker was storing its computed results on its own disk.

Partitions

The number of partitions you make affects the performance. By default, Spark will make as many partitions as there are cores in the cluster. This is not always optimal.

Keep an eye on how many workers are actively processing tasks. If too few, increase the number of partitions.

If you read from a gzipped file, Spark creates just one partition which will be processed by only one worker. That is also one reason why gzipped files are slow to process. I have observed slower performance with small number of large partitions as compared to a large number of small partitions.

It’s better to explicitly set the number of partitions while reading data.

You may not have to do this when reading from HDFS, as Hadoop files are already partitioned.

Wikipedia and DBpedia

There are no gotchas here, but I thought it would be good to make you aware of alternatives. The entire Wikipedia xml dump is 14GB compressed and 65 GB uncompressed. Most of the time you only want the plain text of the article, but the dump is in MediaWiki markup so it needs some preprocessing. There are many tools available for this in various languages. Although I haven’t used them personally, I am pretty sure it must be a time consuming task. But there are alternatives.

If all you want is the Wikipedia article plaintext, mostly for NLP, then download the dataset made available by DBpedia.

I used the full article dump (NIF Context) available at DBpedia (direct download from here). This dataset gets rid of unwanted stuff like tables, infoboxes, and references. The compressed download is 4.3GB in the turtle format. You can covert it to tsv like so

Similar datasets are available for other properties like page links, anchor texts, and so on. Do check out DBpedia.

A word about databases

I never quite understood why there is a plethora of databases, all so similar, and on top of that people buy database licenses. Until this project I hadn’t seriously used any. I ever only used MySQL and Apache Derby.

For my project I used a SPARQL triple store database, Apache Jena TDB, accessed over a REST API served by Jena Fuseki. This database would give me RDF urls, labels, and predicates for all the resources mentioned in the supplied article. Every node would make a database call and only then would proceed with further processing.

My workload had become IO bound, as I could see near 0% CPU utilization on worker nodes. Each partition of the data would result in two SPARQL queries. In the worst case scenario, one of the two queries was taking 500–1000 seconds to process. Thankfully, the TDB database relies on Linux’s memory mapping. I could map the whole DB into RAM and significantly improve performance.

If you are IO bound and your database can fit into RAM, run it in memory.

I found a tool called vmtouch which would show what percentage of the database directory had been mapped into memory. This tool also allows you to explicitly map any files/directories into the RAM and optionally lock it so it wont get paged out.

My 16GB database could easily fit into my 32 GB RAM server. This boosted query performance by orders of magnitude to 1–2 seconds per query. Using a rudimentary form of database load balancing based on partition number, I could cut down my execution time to half by using 2 SPARQL servers instead of one.

Conclusion

I truly enjoyed distributed computing on Spark. Without it I could not have completed my project. It was quite easy to take my existing app and have it run on Spark. I definitely would recommend anyone to give it a try.

Originally published at siddheshrane.github.io.