Πώς να κάνετε μετεγκατάσταση από το Elasticsearch 1,7 έως 6,8 με μηδενικό χρόνο διακοπής λειτουργίας

Η τελευταία μου εργασία στο BigPanda ήταν να αναβαθμίσω μια υπάρχουσα υπηρεσία που χρησιμοποιούσε το Elasticsearch έκδοση 1.7 σε μια νεότερη έκδοση Elasticsearch, 6.8.1.

Σε αυτήν την ανάρτηση, θα μοιραστώ τον τρόπο μετανάστευσης από το Elasticsearch 1.6 στο 6.8 με σκληρούς περιορισμούς όπως μηδενικό χρόνο διακοπής λειτουργίας, χωρίς απώλεια δεδομένων και μηδέν σφάλματα Θα σας δώσω επίσης ένα σενάριο που κάνει τη μετεγκατάσταση για εσάς.

Αυτή η ανάρτηση περιέχει 6 κεφάλαια (και ένα είναι προαιρετικό):

  • Τι θα κερδίσω? -> Ποιες ήταν οι νέες δυνατότητες που μας οδήγησαν στην αναβάθμιση της έκδοσής μας;
  • Οι περιορισμοί -> Ποιες ήταν οι επιχειρηματικές μας απαιτήσεις;
  • Επίλυση προβλημάτων -> Πώς αντιμετωπίσαμε τους περιορισμούς;
  • Προχωρώντας -> Το σχέδιο.
  • [Προαιρετικό κεφάλαιο] -> Πώς χειριστήκαμε το περίφημο πρόβλημα έκρηξης χαρτογράφησης;
  • Τέλος -> Πώς να κάνετε μετεγκατάσταση δεδομένων μεταξύ συμπλεγμάτων.

Κεφάλαιο 1 - Τι είναι αυτό για μένα;

Ποια οφέλη περιμέναμε να λύσουμε αναβαθμίζοντας το κατάστημα δεδομένων μας;

Υπήρχαν μερικοί λόγοι:

  1. Ζητήματα απόδοσης και σταθερότητας - Αντιμετωπίσαμε έναν τεράστιο αριθμό διακοπών με μεγάλο MTTR που μας προκάλεσε πολλούς πονοκεφάλους. Αυτό αντικατοπτρίστηκε σε συχνές υψηλές καθυστερήσεις, υψηλή χρήση CPU και περισσότερα ζητήματα.
  2. Μη υπάρχουσα υποστήριξη σε παλιές εκδόσεις Elasticsearch - Χάσαμε κάποιες λειτουργικές γνώσεις στο Elasticsearch και όταν ψάξαμε για εξωτερικές συμβουλές, ενθαρρύνθηκε να μεταναστεύσουμε προς τα εμπρός για να λάβουμε υποστήριξη.
  3. Δυναμικές αντιστοιχίσεις στο σχήμα μας - Το τρέχον σχήμα μας στο Elasticsearch 1.7 χρησιμοποίησε μια λειτουργία που ονομάζεται δυναμική αντιστοίχιση που έκανε το σύμπλεγμα μας να εκραγεί πολλές φορές. Θέλαμε λοιπόν να αντιμετωπίσουμε αυτό το ζήτημα.
  4. Κακή ορατότητα στο υπάρχον σύμπλεγμα - Θέλαμε μια καλύτερη προβολή κάτω από την κουκούλα και είδαμε ότι οι νεότερες εκδόσεις είχαν εξαιρετικά εργαλεία εξαγωγής μετρήσεων.

Κεφάλαιο 2 - Οι περιορισμοί

  • Μηδενική μετεγκατάσταση χρόνου διακοπής - Έχουμε ενεργούς χρήστες στο σύστημά μας και δεν μπορούσαμε να αντέξουμε το σύστημα κατά τη μετεγκατάσταση.
  • Σχέδιο ανάκτησης - Δεν μπορούσαμε να "χάσουμε" ή "κατεστραμμένα" δεδομένα, ανεξάρτητα από το κόστος. Επομένως, χρειαζόμασταν να προετοιμάσουμε ένα σχέδιο ανάκαμψης σε περίπτωση αποτυχίας της μετανάστευσης.
  • Μηδενικά σφάλματα - Δεν ήταν δυνατή η αλλαγή της υπάρχουσας λειτουργικότητας αναζήτησης για τελικούς χρήστες.

Κεφάλαιο 3 - Επίλυση προβλημάτων και σκέψη ενός σχεδίου

Ας αντιμετωπίσουμε τους περιορισμούς από το απλούστερο στο πιο δύσκολο:

Μηδενικά σφάλματα

Προκειμένου να καλύψω αυτήν την απαίτηση, μελέτησα όλα τα πιθανά αιτήματα που λαμβάνει η υπηρεσία και ποια ήταν τα αποτελέσματά της. Στη συνέχεια, πρόσθεσα μονάδες-τεστ όπου χρειάζεται.

Επιπλέον, πρόσθεσα πολλές μετρήσεις (στο Elasticsearch Indexerκαι το new Elasticsearch Indexer) για παρακολούθηση του λανθάνοντος χρόνου, της απόδοσης και της απόδοσης, οι οποίες μου επέτρεψαν να επιβεβαιώσω ότι τις βελτιώσαμε μόνο.

Πλάνο ανάκαμψης

Αυτό σημαίνει ότι έπρεπε να αντιμετωπίσω την ακόλουθη κατάσταση: ανέπτυξα τον νέο κώδικα στην παραγωγή και τα πράγματα δεν λειτουργούσαν όπως αναμενόταν. Τι μπορώ να κάνω γι 'αυτό τότε;

Δεδομένου ότι δούλευα σε μια υπηρεσία που χρησιμοποιούσε προμήθεια συμβάντων, θα μπορούσα να προσθέσω έναν άλλο ακροατή (διάγραμμα επισυνάπτεται παρακάτω) και να αρχίσω να γράφω σε ένα νέο σύμπλεγμα Elasticsearch χωρίς να επηρεάσω την κατάσταση παραγωγής

Μηδενική μετεγκατάσταση εκτός λειτουργίας

Η τρέχουσα υπηρεσία βρίσκεται σε ζωντανή λειτουργία και δεν μπορεί να "απενεργοποιηθεί" για περιόδους μεγαλύτερες από 5-10 λεπτά. Το τέχνασμα για να γίνει αυτό είναι το εξής:

  • Αποθηκεύστε ένα αρχείο καταγραφής όλων των ενεργειών που χειρίζεται η υπηρεσία σας (χρησιμοποιούμε την Kafka στην παραγωγή)
  • Ξεκινήστε τη διαδικασία μετεγκατάστασης εκτός σύνδεσης (και παρακολουθήστε την μετατόπιση πριν ξεκινήσετε τη μετεγκατάσταση)
  • Όταν ολοκληρωθεί η μετεγκατάσταση, ξεκινήστε τη νέα υπηρεσία με το αρχείο καταγραφής με την καταγεγραμμένη μετατόπιση και καλύψτε την καθυστέρηση
  • Όταν τελειώσει η καθυστέρηση, αλλάξτε τη διεπαφή σας σε ερώτηση έναντι της νέας υπηρεσίας και τελειώσατε

Κεφάλαιο 4 - Το σχέδιο

Η τρέχουσα υπηρεσία μας χρησιμοποιεί την ακόλουθη αρχιτεκτονική (με βάση το μήνυμα που περνά στο Kafka):

  1. Event topicπεριέχει εκδηλώσεις που παράγονται από άλλες εφαρμογές (για παράδειγμα, UserId 3 created)
  2. Command topicπεριέχει τη μετάφραση αυτών των γεγονότων σε εντολές συγκεκριμένες χρησιμοποιούνται από την παρούσα αίτηση (για παράδειγμα: Add userId 3)
  3. Elasticsearch 1.7 - Η βάση δεδομένων της command Topicανάγνωσης από το Elasticsearch Indexer.

Σχεδιάσαμε να προσθέσουμε έναν άλλο καταναλωτή ( new Elasticsearch Indexer) command topic, ο οποίος θα διαβάσει τα ίδια ακριβή μηνύματα και θα τα γράψει παράλληλα με το Elasticsearch 6.8.

Πού πρέπει να ξεκινήσω;

Για να είμαι ειλικρινής, θεώρησα τον εαυτό μου ως αρχάριο χρήστη Elasticsearch. Για να αισθάνομαι σίγουροι για την εκτέλεση αυτής της εργασίας, έπρεπε να σκεφτώ τον καλύτερο τρόπο προσέγγισης αυτού του θέματος και να το μάθω. Μερικά πράγματα που βοήθησαν ήταν:

  1. Τεκμηρίωση - Είναι ένας εξαιρετικά χρήσιμος πόρος για όλα τα Elasticsearch. Αφιερώστε χρόνο για να το διαβάσετε και να σημειώσετε (μην χάσετε: Χαρτογράφηση και QueryDsl).
  2. HTTP API - τα πάντα στο CAT API. Αυτό ήταν εξαιρετικά χρήσιμο για τον εντοπισμό σφαλμάτων σε τοπικό επίπεδο και για να δείτε πώς ανταποκρίνεται η Elasticsearch (μην χάσετε: υγεία συμπλέγματος, δείκτες γάτας, αναζήτηση, διαγραφή ευρετηρίου)
  3. Metrics (❤️) — From the first day, we configured a shiny new dashboard with lots of cool metrics (taken from elasticsearch-exporter-for-Prometheus) that helped and pushed us to understand more about Elasticsearch.

The code

Our codebase was using a library called elastic4s and was using the oldest release available in the library — a really good reason to migrate! So the first thing to do was just to migrate versions and see what broke.

There are a few tactics on how to do this code migration. The tactic we chose was to try and restore existing functionality first in the new Elasticsearch version without re-writing the all code from the start. In other words, to reach existing functionality but on a newer version of Elasticsearch.

Luckily for us, the code already contained almost full testing coverage so our task was much much simpler, and that took around 2 weeks of development time.

It's important to note that, if that wasn't the case, we would have had to invest some time in filling that coverage up. Only then would we be able to migrate since one of our constraints was to not break existing functionality.

Chapter 5 — The mapping explosion problem

Let’s describe our use-case in more detail. This is our model:

class InsertMessageCommand(tags: Map[String,String])

And for example, an instance of this message would be:

new InsertMessageCommand(Map("name"->"dor","lastName"->"sever"))

And given this model, we needed to support the following query requirements:

  1. Query by value
  2. Query by tag name and value

The way this was modeled in our Elasticsearch 1.7 schema was using a dynamic template schema (since the tag keys are dynamic, and cannot be modeled in advanced).

The dynamic template caused us multiple outages due to the mapping explosion problem, and the schema looked like this:

curl -X PUT "localhost:9200/_template/my_template?pretty" -H 'Content-Type: application/json' -d ' { "index_patterns": [ "your-index-names*" ], "mappings": { "_doc": { "dynamic_templates": [ { "tags": { "mapping": { "type": "text" }, "path_match": "actions.tags.*" } } ] } }, "aliases": {} }' curl -X PUT "localhost:9200/your-index-names-1/_doc/1?pretty" -H 'Content-Type: application/json' -d' { "actions": { "tags" : { "name": "John", "lname" : "Smith" } } } ' curl -X PUT "localhost:9200/your-index-names-1/_doc/2?pretty" -H 'Content-Type: application/json' -d' { "actions": { "tags" : { "name": "Dor", "lname" : "Sever" } } } ' curl -X PUT "localhost:9200/your-index-names-1/_doc/3?pretty" -H 'Content-Type: application/json' -d' { "actions": { "tags" : { "name": "AnotherName", "lname" : "AnotherLastName" } } } ' 
 curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "match" : { "actions.tags.name" : { "query" : "John" } } } } ' # returns 1 match(doc 1) curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "match" : { "actions.tags.lname" : { "query" : "John" } } } } ' # returns zero matches # search by value curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "query_string" : { "fields": ["actions.tags.*" ], "query" : "Dor" } } } ' 

Nested documents solution

Our first instinct in solving the mapping explosion problem was to use nested documents.

We read the nested data type tutorial in the Elastic docs and defined the following schema and queries:

curl -X PUT "localhost:9200/my_index?pretty" -H 'Content-Type: application/json' -d' { "mappings": { "_doc": { "properties": { "tags": { "type": "nested" } } } } } ' curl -X PUT "localhost:9200/my_index/_doc/1?pretty" -H 'Content-Type: application/json' -d' { "tags" : [ { "key" : "John", "value" : "Smith" }, { "key" : "Alice", "value" : "White" } ] } ' # Query by tag key and value curl -X GET "localhost:9200/my_index/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "nested": { "path": "tags", "query": { "bool": { "must": [ { "match": { "tags.key": "Alice" }}, { "match": { "tags.value": "White" }} ] } } } } } ' # Returns 1 document curl -X GET "localhost:9200/my_index/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "nested": { "path": "tags", "query": { "bool": { "must": [ { "match": { "tags.value": "Smith" }} ] } } } } } ' # Query by tag value # Returns 1 result 

And this solution worked. However, when we tried to insert real customer data we saw that the number of documents in our index increased by around 500 times.

We thought about the following problems and went on to find a better solution:

  1. The amount of documents we had in our cluster was around 500 million documents. This meant that, with the new schema, we were going to reach two hundred fifty billion documents (that’s 250,000,000,000 documents ?).
  2. We read this really good blog post — //blog.gojekengineering.com/elasticsearch-the-trouble-with-nested-documents-e97b33b46194 which highlights that nested documents can cause high latency in queries and heap usage problems.
  3. Testing — Since we were converting 1 document in the old cluster to an unknown number of documents in the new cluster, it would be much harder to track if the migration process worked without any data loss. If our conversion was 1:1, we could assert that the count in the old cluster equalled the count in the new cluster.

Avoiding nested documents

The real trick in this was to focus on what supported queries we were running: search by tag value, and search by tag key and value.

The first query does not require nested documents since it works on a single field. For the latter, we did the following trick. We created a field that contains the combination of the key and the value. Whenever a user queries on a key, value match, we translate their request to the corresponding text and query against that field.

Example:

curl -X PUT "localhost:9200/my_index_2?pretty" -H 'Content-Type: application/json' -d' { "mappings": { "_doc": { "properties": { "tags": { "type": "object", "properties": { "keyToValue": { "type": "keyword" }, "value": { "type": "keyword" } } } } } } } ' curl -X PUT "localhost:9200/my_index_2/_doc/1?pretty" -H 'Content-Type: application/json' -d' { "tags" : [ { "keyToValue" : "John:Smith", "value" : "Smith" }, { "keyToValue" : "Alice:White", "value" : "White" } ] } ' # Query by key,value # User queries for key: Alice, and value : White , we then query elastic with this query: curl -X GET "localhost:9200/my_index_2/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "bool": { "must": [ { "match": { "tags.keyToValue": "Alice:White" }}] }}} ' # Query by value only curl -X GET "localhost:9200/my_index_2/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "bool": { "must": [ { "match": { "tags.value": "White" }}] }}} ' 

Chapter 6 — The migration process

We planned to migrate about 500 million documents with zero downtime. To do that we needed:

  1. A strategy on how to transfer data from the old Elastic to the new Elasticsearch
  2. A strategy on how to close the lag between the start of the migration and the end of it

And our two options in closing the lag:

  1. Our messaging system is Kafka based. We could have just taken the current offset before the migration started, and after the migration ended, start consuming from that specific offset. This solution requires some manual tweaking of offsets and some other stuff, but will work.
  2. Another approach to solving this issue was to start consuming messages from the beginning of the topic in Kafka and make our actions on Elasticsearch idempotent — meaning, if the change was “applied” already, nothing would change in Elastic store.

The requests made by our service against Elastic were already idempotent, so we choose option 2 because it required zero manual work (no need to take specific offsets, and then set them afterward in a new consumer group).

How can we migrate the data?

These were the options we thought of:

  1. If our Kafka contained all messages from the beginning of time, we could just play from the start and the end state would be equal. But since we apply retention to out topics, this was not an option.
  2. Dump messages to disk and then ingest them to Elastic directly – This solution looked kind of weird. Why store them in disk instead of just writing them directly to Elastic?
  3. Transfer messages between old Elastic to new Elastic — This meant, writing some sort of “script” (did anyone say Python? ?) that will connect to the old Elasticsearch cluster, query for items, transform them to the new schema, and index them in the cluster.

We choose the last option. These were the design choices we had in mind:

  1. Let’s not try to think about error handling unless we need to. Let’s try to write something super simple, and if errors occur, let’s try to address them. In the end, we did not need to address this issue since no errors occurred during the migration.
  2. It’s a one-off operation, so whatever works first / KISS.
  3. Metrics — Since the migration processes can take hours to days, we wanted the ability from day 1 to be able to monitor the error count and to track the current progress and copy rate of the script.

We thought long and hard and choose Python as our weapon of choice. The final version of the code is below:

dictor==0.1.2 - to copy and transform our Elasticsearch documentselasticsearch==1.9.0 - to connect to "old" Elasticsearchelasticsearch6==6.4.2 - to connect to the "new" Elasticsearchstatsd==3.3.0 - to report metrics 
from elasticsearch import Elasticsearch from elasticsearch6 import Elasticsearch as Elasticsearch6 import sys from elasticsearch.helpers import scan from elasticsearch6.helpers import parallel_bulk import statsd ES_SOURCE = Elasticsearch(sys.argv[1]) ES_TARGET = Elasticsearch6(sys.argv[2]) INDEX_SOURCE = sys.argv[3] INDEX_TARGET = sys.argv[4] QUERY_MATCH_ALL = {"query": {"match_all": {}}} SCAN_SIZE = 1000 SCAN_REQUEST_TIMEOUT = '3m' REQUEST_TIMEOUT = 180 MAX_CHUNK_BYTES = 15 * 1024 * 1024 RAISE_ON_ERROR = False def transform_item(item, index_target): # implement your logic transformation here transformed_source_doc = item.get("_source") return {"_index": index_target, "_type": "_doc", "_id": item['_id'], "_source": transformed_source_doc} def transformedStream(es_source, match_query, index_source, index_target, transform_logic_func): for item in scan(es_source, query=match_query, index=index_source, size=SCAN_SIZE, timeout=SCAN_REQUEST_TIMEOUT): yield transform_logic_func(item, index_target) def index_source_to_target(es_source, es_target, match_query, index_source, index_target, bulk_size, statsd_client, logger, transform_logic_func): ok_count = 0 fail_count = 0 count_response = es_source.count(index=index_source, body=match_query) count_result = count_response['count'] statsd_client.gauge(stat='elastic_migration_document_total_count,index={0},type=success'.format(index_target), value=count_result) with statsd_client.timer('elastic_migration_time_ms,index={0}'.format(index_target)): actions_stream = transformedStream(es_source, match_query, index_source, index_target, transform_logic_func) for (ok, item) in parallel_bulk(es_target, chunk_size=bulk_size, max_chunk_bytes=MAX_CHUNK_BYTES, actions=actions_stream, request_timeout=REQUEST_TIMEOUT, raise_on_error=RAISE_ON_ERROR): if not ok: logger.error("got error on index {} which is : {}".format(index_target, item)) fail_count += 1 statsd_client.incr('elastic_migration_document_count,index={0},type=failure'.format(index_target), 1) else: ok_count += 1 statsd_client.incr('elastic_migration_document_count,index={0},type=success'.format(index_target), 1) return ok_count, fail_count statsd_client = statsd.StatsClient(host='localhost', port=8125) if __name__ == "__main__": index_source_to_target(ES_SOURCE, ES_TARGET, QUERY_MATCH_ALL, INDEX_SOURCE, INDEX_TARGET, BULK_SIZE, statsd_client, transform_item) 

Conclusion

Migrating data in a live production system is a complicated task that requires a lot of attention and careful planning. I recommend taking the time to work through the steps listed above and figure out what works best for your needs.

Κατά κανόνα, προσπαθήστε πάντα να μειώσετε όσο το δυνατόν περισσότερο τις απαιτήσεις σας. Για παράδειγμα, απαιτείται μηδενική μετεγκατάσταση εκτός λειτουργίας; Μπορείτε να αντέξετε οικονομικά την απώλεια δεδομένων;

Η αναβάθμιση των καταστημάτων δεδομένων είναι συνήθως μαραθώνιος και όχι σπριντ, οπότε πάρτε μια βαθιά ανάσα και προσπαθήστε να απολαύσετε τη διαδρομή.

  • Η όλη διαδικασία που αναφέρεται παραπάνω μου πήρε περίπου 4 μήνες εργασίας
  • Όλα τα παραδείγματα Elasticsearch που εμφανίζονται σε αυτήν την ανάρτηση έχουν δοκιμαστεί έναντι της έκδοσης 6.8.1