Ροές Node.js: Όλα όσα πρέπει να γνωρίζετε

Ενημέρωση: Αυτό το άρθρο είναι πλέον μέρος του βιβλίου μου "Node.js Beyond The Basics".

Διαβάστε την ενημερωμένη έκδοση αυτού του περιεχομένου και περισσότερα σχετικά με τον Node στη διεύθυνση jscomplete.com/node-beyond-basics .

Οι ροές Node.js έχουν τη φήμη ότι είναι δύσκολο να εργαστούν και ακόμη πιο δύσκολο να κατανοηθούν. Λοιπόν έχω καλά νέα για εσάς - αυτό δεν ισχύει πλέον.

Με τα χρόνια, οι προγραμματιστές δημιούργησαν πολλά πακέτα εκεί έξω με μοναδικό σκοπό να διευκολύνουν την εργασία με ροές. Αλλά σε αυτό το άρθρο, θα επικεντρωθώ στο εγγενές API ροής Node.js.

"Οι ροές είναι η καλύτερη και πιο παρεξηγημένη ιδέα του Node."

- Dominic Tarr

Τι ακριβώς είναι οι ροές;

Οι ροές είναι συλλογές δεδομένων - όπως πίνακες ή συμβολοσειρές. Η διαφορά είναι ότι οι ροές ενδέχεται να μην είναι διαθέσιμες ταυτόχρονα και δεν χρειάζεται να ταιριάζουν στη μνήμη. Αυτό καθιστά τις ροές πραγματικά ισχυρές όταν εργάζεστε με μεγάλες ποσότητες δεδομένων ή δεδομένα που προέρχονται από μια εξωτερική πηγή ένα κομμάτι κάθε φορά.

Ωστόσο, οι ροές δεν αφορούν μόνο την εργασία με μεγάλα δεδομένα. Μας δίνουν επίσης τη δύναμη της συνθετικότητας στον κώδικα μας. Ακριβώς όπως μπορούμε να συνθέσουμε ισχυρές εντολές linux διοχετεύοντας άλλες μικρότερες εντολές Linux, μπορούμε να κάνουμε ακριβώς το ίδιο στο Node με ροές.

const grep = ... // A stream for the grep output const wc = ... // A stream for the wc input grep.pipe(wc)

Πολλές από τις ενσωματωμένες ενότητες στο Node εφαρμόζουν τη διεπαφή ροής:

Η παραπάνω λίστα έχει μερικά παραδείγματα για εγγενή αντικείμενα Node.js που είναι επίσης αναγνώσιμα και εγγράψιμα ροές. Ορισμένα από αυτά τα αντικείμενα είναι τόσο ευανάγνωστα όσο και εγγράψιμα ροές, όπως υποδοχές TCP, ροές zlib και crypto.

Παρατηρήστε ότι τα αντικείμενα είναι επίσης στενά συνδεδεμένα. Ενώ μια απόκριση HTTP είναι μια αναγνώσιμη ροή στον πελάτη, είναι μια εγγράψιμη ροή στον διακομιστή. Αυτό συμβαίνει επειδή στην περίπτωση HTTP, διαβάζουμε βασικά από το ένα αντικείμενο ( http.IncomingMessage) και γράφουμε στο άλλο ( http.ServerResponse).

Επίσης, σημειώστε πως τα stdioρεύματα ( stdin, stdout, stderr) έχουν τους τύπους ρεύμα αντίστροφο, όταν πρόκειται για διαδικασίες παιδί. Αυτό επιτρέπει έναν πολύ εύκολο τρόπο διοχέτευσης από και προς αυτές τις ροές από τις κύριες stdioροές διεργασίας .

Ένα πρακτικό παράδειγμα

Η θεωρία είναι μεγάλη, αλλά συχνά δεν είναι 100% πειστική. Ας δούμε ένα παράδειγμα που δείχνει τη διαφορά που μπορούν να κάνουν οι ροές σε κώδικα όταν πρόκειται για κατανάλωση μνήμης.

Ας δημιουργήσουμε πρώτα ένα μεγάλο αρχείο:

const fs = require('fs'); const file = fs.createWriteStream('./big.file'); for(let i=0; i<= 1e6; i++) { file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n'); } file.end();

Κοίτα τι χρησιμοποίησα για να δημιουργήσω αυτό το μεγάλο αρχείο. Μια εγγράψιμη ροή!

Η fsενότητα μπορεί να χρησιμοποιηθεί για ανάγνωση και εγγραφή σε αρχεία χρησιμοποιώντας μια διεπαφή ροής. Στο παραπάνω παράδειγμα, γράφουμε σε αυτό big.fileμέσω μιας εγγράψιμης ροής 1 εκατομμυρίου γραμμών με βρόχο.

Η εκτέλεση του παραπάνω σεναρίου δημιουργεί ένα αρχείο περίπου ~ 400 MB.

Ακολουθεί ένας απλός διακομιστής ιστού Node που έχει σχεδιαστεί για να εξυπηρετεί αποκλειστικά big.file:

const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { fs.readFile('./big.file', (err, data) => { if (err) throw err; res.end(data); }); }); server.listen(8000);

Όταν ο server λαμβάνει ένα αίτημα, αυτό θα εξυπηρετήσει το μεγάλο αρχείο χρησιμοποιώντας την ασύγχρονη μέθοδο, fs.readFile. Αλλά, δεν είναι σαν να αποκλείουμε το βρόχο εκδήλωσης ή οτιδήποτε άλλο. Όλα είναι υπέροχα, σωστά; Σωστά?

Λοιπόν, ας δούμε τι συμβαίνει όταν τρέχουμε τον διακομιστή, συνδέουμε σε αυτόν και παρακολουθούμε τη μνήμη ενώ το κάνουμε.

Όταν έτρεξα τον διακομιστή, ξεκίνησε με κανονική ποσότητα μνήμης, 8,7 MB:

Μετά συνδέθηκα με τον διακομιστή. Σημειώστε τι συνέβη στη μνήμη που καταναλώθηκε:

Ουάου - η κατανάλωση μνήμης αυξήθηκε στα 434,8 MB.

Βασικά βάλαμε ολόκληρο το big.fileπεριεχόμενο στη μνήμη πριν το γράψουμε στο αντικείμενο απόκρισης. Αυτό είναι πολύ αναποτελεσματικό.

Το αντικείμενο απόκρισης HTTP ( resστον παραπάνω κώδικα) είναι επίσης μια εγγράψιμη ροή. Αυτό σημαίνει ότι εάν έχουμε μια αναγνώσιμη ροή που αντιπροσωπεύει το περιεχόμενο big.file, μπορούμε απλώς να συνδέσουμε αυτά τα δύο το ένα στο άλλο και να επιτύχουμε ως επί το πλείστον το ίδιο αποτέλεσμα χωρίς να καταναλώνουμε ~ 400 MB μνήμης.

Η fsενότητα του κόμβου μπορεί να μας δώσει μια ευανάγνωστη ροή για οποιοδήποτε αρχείο χρησιμοποιώντας τη createReadStreamμέθοδο. Μπορούμε να το διοχετεύσουμε στο αντικείμενο απόκρισης:

const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { const src = fs.createReadStream('./big.file'); src.pipe(res); }); server.listen(8000);

Τώρα όταν συνδέεστε σε αυτόν τον διακομιστή, συμβαίνει ένα μαγικό πράγμα (δείτε την κατανάλωση μνήμης):

Τι συμβαίνει?

Όταν ένας πελάτης ζητάει αυτό το μεγάλο αρχείο, το μεταδίδουμε ένα κομμάτι κάθε φορά, πράγμα που σημαίνει ότι δεν το αποθηκεύουμε στη μνήμη. Η χρήση της μνήμης αυξήθηκε κατά περίπου 25 MB και αυτό είναι.

Μπορείτε να προωθήσετε αυτό το παράδειγμα στα όριά του. Αναδημιουργήστε το big.fileμε πέντε εκατομμύρια γραμμές αντί για μόλις ένα εκατομμύριο, κάτι που θα οδηγούσε το αρχείο σε πάνω από 2 GB, και αυτό είναι πραγματικά μεγαλύτερο από το προεπιλεγμένο όριο buffer στον κόμβο.

Εάν προσπαθείτε να προβάλλετε αυτό το αρχείο χρησιμοποιώντας fs.readFile, απλά δεν μπορείτε, από προεπιλογή (μπορείτε να αλλάξετε τα όρια) Αλλά με fs.createReadStream, δεν υπάρχει κανένα πρόβλημα ροής δεδομένων 2 GB στον αιτούντα, και το καλύτερο από όλα, η χρήση της διαδικασίας μνήμης θα είναι περίπου η ίδια.

Είστε έτοιμοι να μάθετε ροές τώρα;

Αυτό το άρθρο αποτελεί σύνταξη μέρους του μαθήματος Pluralsight σχετικά με το Node.js. Καλύπτω παρόμοιο περιεχόμενο σε μορφή βίντεο εκεί.

Ροές 101

Υπάρχουν τέσσερις θεμελιώδεις τύποι ροών στο Node.js: Ροές με δυνατότητα ανάγνωσης, γραφής, διπλής όψης και μετασχηματισμού.

  • Μια ευανάγνωστη ροή είναι μια αφαίρεση για μια πηγή από την οποία μπορούν να καταναλωθούν τα δεδομένα. Ένα παράδειγμα αυτού είναι η fs.createReadStreamμέθοδος.
  • Μια εγγράψιμη ροή είναι μια αφαίρεση για έναν προορισμό στον οποίο μπορούν να γραφτούν δεδομένα. Ένα παράδειγμα αυτού είναι η fs.createWriteStreamμέθοδος.
  • A duplex streams is both Readable and Writable. An example of that is a TCP socket.
  • A transform stream is basically a duplex stream that can be used to modify or transform the data as it is written and read. An example of that is the zlib.createGzip stream to compress the data using gzip. You can think of a transform stream as a function where the input is the writable stream part and the output is readable stream part. You might also hear transform streams referred to as “through streams.”

All streams are instances of EventEmitter. They emit events that can be used to read and write data. However, we can consume streams data in a simpler way using the pipe method.

The pipe method

Here’s the magic line that you need to remember:

readableSrc.pipe(writableDest)

In this simple line, we’re piping the output of a readable stream — the source of data, as the input of a writable stream — the destination. The source has to be a readable stream and the destination has to be a writable one. Of course, they can both be duplex/transform streams as well. In fact, if we’re piping into a duplex stream, we can chain pipe calls just like we do in Linux:

readableSrc .pipe(transformStream1) .pipe(transformStream2) .pipe(finalWrtitableDest)

The pipe method returns the destination stream, which enabled us to do the chaining above. For streams a (readable), b and c (duplex), and d (writable), we can:

a.pipe(b).pipe(c).pipe(d) # Which is equivalent to: a.pipe(b) b.pipe(c) c.pipe(d) # Which, in Linux, is equivalent to: $ a | b | c | d

The pipe method is the easiest way to consume streams. It’s generally recommended to either use the pipe method or consume streams with events, but avoid mixing these two. Usually when you’re using the pipe method you don’t need to use events, but if you need to consume the streams in more custom ways, events would be the way to go.

Stream events

Beside reading from a readable stream source and writing to a writable destination, the pipe method automatically manages a few things along the way. For example, it handles errors, end-of-files, and the cases when one stream is slower or faster than the other.

However, streams can also be consumed with events directly. Here’s the simplified event-equivalent code of what the pipe method mainly does to read and write data:

# readable.pipe(writable) readable.on('data', (chunk) => { writable.write(chunk); }); readable.on('end', () => { writable.end(); });

Here’s a list of the important events and functions that can be used with readable and writable streams:

The events and functions are somehow related because they are usually used together.

The most important events on a readable stream are:

  • The data event, which is emitted whenever the stream passes a chunk of data to the consumer
  • The end event, which is emitted when there is no more data to be consumed from the stream.

The most important events on a writable stream are:

  • The drain event, which is a signal that the writable stream can receive more data.
  • The finish event, which is emitted when all data has been flushed to the underlying system.

Events and functions can be combined to make for a custom and optimized use of streams. To consume a readable stream, we can use the pipe/unpipe methods, or the read/unshift/resume methods. To consume a writable stream, we can make it the destination of pipe/unpipe, or just write to it with the write method and call the end method when we’re done.

Paused and Flowing Modes of Readable Streams

Readable streams have two main modes that affect the way we can consume them:

  • They can be either in the paused mode
  • Or in the flowing mode

Those modes are sometimes referred to as pull and push modes.

All readable streams start in the paused mode by default but they can be easily switched to flowing and back to paused when needed. Sometimes, the switching happens automatically.

When a readable stream is in the paused mode, we can use the read() method to read from the stream on demand, however, for a readable stream in the flowing mode, the data is continuously flowing and we have to listen to events to consume it.

In the flowing mode, data can actually be lost if no consumers are available to handle it. This is why, when we have a readable stream in flowing mode, we need a data event handler. In fact, just adding a data event handler switches a paused stream into flowing mode and removing the data event handler switches the stream back to paused mode. Some of this is done for backward compatibility with the older Node streams interface.

To manually switch between these two stream modes, you can use the resume() and pause() methods.

When consuming readable streams using the pipe method, we don’t have to worry about these modes as pipe manages them automatically.

Implementing Streams

When we talk about streams in Node.js, there are two main different tasks:

  • The task of implementing the streams.
  • The task of consuming them.

So far we’ve been talking about only consuming streams. Let’s implement some!

Stream implementers are usually the ones who require the stream module.

Implementing a Writable Stream

To implement a writable stream, we need to to use the Writable constructor from the stream module.

const { Writable } = require('stream');

We can implement a writable stream in many ways. We can, for example, extend the Writable constructor if we want

class myWritableStream extends Writable { }

However, I prefer the simpler constructor approach. We just create an object from the Writable constructor and pass it a number of options. The only required option is a write function which exposes the chunk of data to be written.

const { Writable } = require('stream'); const outStream = new Writable({ write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } }); process.stdin.pipe(outStream);

This write method takes three arguments.

  • The chunk is usually a buffer unless we configure the stream differently.
  • The encoding argument is needed in that case, but usually we can ignore it.
  • The callback is a function that we need to call after we’re done processing the data chunk. It’s what signals whether the write was successful or not. To signal a failure, call the callback with an error object.

In outStream, we simply console.log the chunk as a string and call the callback after that without an error to indicate success. This is a very simple and probably not so useful echo stream. It will echo back anything it receives.

To consume this stream, we can simply use it with process.stdin, which is a readable stream, so we can just pipe process.stdin into our outStream.

When we run the code above, anything we type into process.stdin will be echoed back using the outStreamconsole.log line.

This is not a very useful stream to implement because it’s actually already implemented and built-in. This is very much equivalent to process.stdout. We can just pipe stdin into stdout and we’ll get the exact same echo feature with this single line:

process.stdin.pipe(process.stdout);

Implement a Readable Stream

To implement a readable stream, we require the Readable interface, and construct an object from it, and implement a read() method in the stream’s configuration parameter:

const { Readable } = require('stream'); const inStream = new Readable({ read() {} });

There is a simple way to implement readable streams. We can just directly push the data that we want the consumers to consume.

const { Readable } = require('stream'); const inStream = new Readable({ read() {} }); inStream.push('ABCDEFGHIJKLM'); inStream.push('NOPQRSTUVWXYZ'); inStream.push(null); // No more data inStream.pipe(process.stdout);

When we push a null object, that means we want to signal that the stream does not have any more data.

To consume this simple readable stream, we can simply pipe it into the writable stream process.stdout.

When we run the code above, we’ll be reading all the data from inStream and echoing it to the standard out. Very simple, but also not very efficient.

We’re basically pushing all the data in the stream before piping it to process.stdout. The much better way is to push data on demand, when a consumer asks for it. We can do that by implementing the read() method in the configuration object:

const inStream = new Readable({ read(size) { // there is a demand on the data... Someone wants to read it. } });

When the read method is called on a readable stream, the implementation can push partial data to the queue. For example, we can push one letter at a time, starting with character code 65 (which represents A), and incrementing that on every push:

const inStream = new Readable({ read(size) { this.push(String.fromCharCode(this.currentCharCode++)); if (this.currentCharCode > 90) { this.push(null); } } }); inStream.currentCharCode = 65; inStream.pipe(process.stdout);

While the consumer is reading a readable stream, the read method will continue to fire, and we’ll push more letters. We need to stop this cycle somewhere, and that’s why an if statement to push null when the currentCharCode is greater than 90 (which represents Z).

This code is equivalent to the simpler one we started with but now we’re pushing data on demand when the consumer asks for it. You should always do that.

Implementing Duplex/Transform Streams

With Duplex streams, we can implement both readable and writable streams with the same object. It’s as if we inherit from both interfaces.

Here’s an example duplex stream that combines the two writable and readable examples implemented above:

const { Duplex } = require('stream'); const inoutStream = new Duplex({ write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); }, read(size) { this.push(String.fromCharCode(this.currentCharCode++)); if (this.currentCharCode > 90) { this.push(null); } } }); inoutStream.currentCharCode = 65; process.stdin.pipe(inoutStream).pipe(process.stdout);

By combining the methods, we can use this duplex stream to read the letters from A to Z and we can also use it for its echo feature. We pipe the readable stdin stream into this duplex stream to use the echo feature and we pipe the duplex stream itself into the writable stdout stream to see the letters A through Z.

It’s important to understand that the readable and writable sides of a duplex stream operate completely independently from one another. This is merely a grouping of two features into an object.

A transform stream is the more interesting duplex stream because its output is computed from its input.

For a transform stream, we don’t have to implement the read or write methods, we only need to implement a transform method, which combines both of them. It has the signature of the write method and we can use it to push data as well.

Here’s a simple transform stream which echoes back anything you type into it after transforming it to upper case format:

const { Transform } = require('stream'); const upperCaseTr = new Transform({ transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); callback(); } }); process.stdin.pipe(upperCaseTr).pipe(process.stdout);

In this transform stream, which we’re consuming exactly like the previous duplex stream example, we only implemented a transform() method. In that method, we convert the chunk into its upper case version and then push that version as the readable part.

Streams Object Mode

By default, streams expect Buffer/String values. There is an objectMode flag that we can set to have the stream accept any JavaScript object.

Here’s a simple example to demonstrate that. The following combination of transform streams makes for a feature to map a string of comma-separated values into a JavaScript object. So “a,b,c,d” becomes {a: b, c: d}.

const { Transform } = require('stream'); const commaSplitter = new Transform({ readableObjectMode: true, transform(chunk, encoding, callback) { this.push(chunk.toString().trim().split(',')); callback(); } }); const arrayToObject = new Transform({ readableObjectMode: true, writableObjectMode: true, transform(chunk, encoding, callback) { const obj = {}; for(let i=0; i < chunk.length; i+=2) { obj[chunk[i]] = chunk[i+1]; } this.push(obj); callback(); } }); const objectToString = new Transform({ writableObjectMode: true, transform(chunk, encoding, callback) { this.push(JSON.stringify(chunk) + '\n'); callback(); } }); process.stdin .pipe(commaSplitter) .pipe(arrayToObject) .pipe(objectToString) .pipe(process.stdout)

We pass the input string (for example, “a,b,c,d”) through commaSplitter which pushes an array as its readable data ([“a”, “b”, “c”, “d”]). Adding the readableObjectMode flag on that stream is necessary because we’re pushing an object there, not a string.

We then take the array and pipe it into the arrayToObject stream. We need a writableObjectMode flag to make that stream accept an object. It’ll also push an object (the input array mapped into an object) and that’s why we also needed the readableObjectMode flag there as well. The last objectToString stream accepts an object but pushes out a string, and that’s why we only needed a writableObjectMode flag there. The readable part is a normal string (the stringified object).

Node’s built-in transform streams

Node has a few very useful built-in transform streams. Namely, the zlib and crypto streams.

Here’s an example that uses the zlib.createGzip() stream combined with the fs readable/writable streams to create a file-compression script:

const fs = require('fs'); const zlib = require('zlib'); const file = process.argv[2]; fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(fs.createWriteStream(file + '.gz'));

You can use this script to gzip any file you pass as the argument. We’re piping a readable stream for that file into the zlib built-in transform stream and then into a writable stream for the new gzipped file. Simple.

The cool thing about using pipes is that we can actually combine them with events if we need to. Say, for example, I want the user to see a progress indicator while the script is working and a “Done” message when the script is done. Since the pipe method returns the destination stream, we can chain the registration of events handlers as well:

const fs = require('fs'); const zlib = require('zlib'); const file = process.argv[2]; fs.createReadStream(file) .pipe(zlib.createGzip()) .on('data', () => process.stdout.write('.')) .pipe(fs.createWriteStream(file + '.zz')) .on('finish', () => console.log('Done'));

So with the pipe method, we get to easily consume streams, but we can still further customize our interaction with those streams using events where needed.

What’s great about the pipe method though is that we can use it to compose our program piece by piece, in a much readable way. For example, instead of listening to the data event above, we can simply create a transform stream to report progress, and replace the .on() call with another .pipe() call:

const fs = require('fs'); const zlib = require('zlib'); const file = process.argv[2]; const { Transform } = require('stream'); const reportProgress = new Transform({ transform(chunk, encoding, callback) { process.stdout.write('.'); callback(null, chunk); } }); fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(reportProgress) .pipe(fs.createWriteStream(file + '.zz')) .on('finish', () => console.log('Done'));

This reportProgress stream is a simple pass-through stream, but it reports the progress to standard out as well. Note how I used the second argument in the callback() function to push the data inside the transform() method. This is equivalent to pushing the data first.

The applications of combining streams are endless. For example, if we need to encrypt the file before or after we gzip it, all we need to do is pipe another transform stream in that exact order that we needed. We can use Node’s crypto module for that:

const crypto = require('crypto'); // ... fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(crypto.createCipher('aes192', 'a_secret')) .pipe(reportProgress) .pipe(fs.createWriteStream(file + '.zz')) .on('finish', () => console.log('Done'));

The script above compresses and then encrypts the passed file and only those who have the secret can use the outputted file. We can’t unzip this file with the normal unzip utilities because it’s encrypted.

To actually be able to unzip anything zipped with the script above, we need to use the opposite streams for crypto and zlib in a reverse order, which is simple:

fs.createReadStream(file) .pipe(crypto.createDecipher('aes192', 'a_secret')) .pipe(zlib.createGunzip()) .pipe(reportProgress) .pipe(fs.createWriteStream(file.slice(0, -3))) .on('finish', () => console.log('Done'));

Υποθέτοντας ότι το αρχείο που έχει περάσει είναι η συμπιεσμένη έκδοση, ο παραπάνω κώδικας θα δημιουργήσει μια ροή ανάγνωσης από αυτήν, θα το διοχετεύσει στη createDecipher()ροή κρυπτογράφησης (χρησιμοποιώντας το ίδιο μυστικό), διοχετεύσει την έξοδο αυτού στη createGunzip()ροή zlib και, στη συνέχεια, γράψτε τα πράγματα πίσω σε ένα αρχείο χωρίς το τμήμα επέκτασης.

Αυτό έχω μόνο για αυτό το θέμα. Ευχαριστώ για την ανάγνωση! Μέχρι την επόμενη φορά!

Εκμάθηση αντίδρασης ή κόμβος; Δείτε τα βιβλία μου:

  • Μάθετε το React.js δημιουργώντας παιχνίδια
  • Node.js Πέρα από τα βασικά