Class: stream.Readable
stream.Readable
is an abstract class designed to be extended with an underlying implementation of the stream._read(size)
method.
Please see API for Stream Consumers for how to consume streams in your programs. What follows is an explanation of how to implement Readable streams in your programs.
new stream.Readable([options])
-
options
<Object>-
highWaterMark
<Number> The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Default =16384
(16kb), or16
forobjectMode
streams -
encoding
<String> If specified, then buffers will be decoded to strings using the specified encoding. Default =null
-
objectMode
<Boolean> Whether this stream should behave as a stream of objects. Meaning thatstream.read(n)
returns a single value instead of a Buffer of size n. Default =false
-
read
<Function> Implementation for thestream._read()
method.
-
In classes that extend the Readable class, make sure to call the Readable constructor so that the buffering settings can be properly initialized.
readable._read(size)
-
size
<Number> Number of bytes to read asynchronously
Note: Implement this method, but do NOT call it directly.
This method is prefixed with an underscore because it is internal to the class that defines it and should only be called by the internal Readable class methods. All Readable stream implementations must provide a _read method to fetch data from the underlying resource.
When _read()
is called, if data is available from the resource, the _read()
implementation should start pushing that data into the read queue by calling this.push(dataChunk)
. _read()
should continue reading from the resource and pushing data until push returns false
, at which point it should stop reading from the resource. Only when _read()
is called again after it has stopped should it start reading more data from the resource and pushing that data onto the queue.
Note: once the _read()
method is called, it will not be called again until the stream.push()
method is called.
The size
argument is advisory. Implementations where a "read" is a single call that returns data can use this to know how much data to fetch. Implementations where that is not relevant, such as TCP or TLS, may ignore this argument, and simply provide data whenever it becomes available. There is no need, for example to "wait" until size
bytes are available before calling stream.push(chunk)
.
readable.push(chunk[, encoding])
Note: This method should be called by Readable implementors, NOT by consumers of Readable streams.
If a value other than null is passed, The push()
method adds a chunk of data into the queue for subsequent stream processors to consume. If null
is passed, it signals the end of the stream (EOF), after which no more data can be written.
The data added with push()
can be pulled out by calling the stream.read()
method when the 'readable'
event fires.
This API is designed to be as flexible as possible. For example, you may be wrapping a lower-level source which has some sort of pause/resume mechanism, and a data callback. In those cases, you could wrap the low-level source object by doing something like this:
// source is an object with readStop() and readStart() methods, // and an `ondata` member that gets called when it has data, and // an `onend` member that gets called when the data is over. util.inherits(SourceWrapper, Readable); function SourceWrapper(options) { Readable.call(this, options); this._source = getLowlevelSourceObject(); // Every time there's data, we push it into the internal buffer. this._source.ondata = (chunk) => { // if push() returns false, then we need to stop reading from source if (!this.push(chunk)) this._source.readStop(); }; // When the source ends, we push the EOF-signaling `null` chunk this._source.onend = () => { this.push(null); }; } // _read will be called when the stream wants to pull more data in // the advisory size argument is ignored in this case. SourceWrapper.prototype._read = function(size) { this._source.readStart(); };
Example: A Counting Stream
This is a basic example of a Readable stream. It emits the numerals from 1 to 1,000,000 in ascending order, and then ends.
const Readable = require('stream').Readable; const util = require('util'); util.inherits(Counter, Readable); function Counter(opt) { Readable.call(this, opt); this._max = 1000000; this._index = 1; } Counter.prototype._read = function() { var i = this._index++; if (i > this._max) this.push(null); else { var str = '' + i; var buf = new Buffer(str, 'ascii'); this.push(buf); } };
Example: SimpleProtocol v1 (Sub-optimal)
This is similar to the parseHeader
function described here, but implemented as a custom stream. Also, note that this implementation does not convert the incoming data to a string.
However, this would be better implemented as a Transform stream. See SimpleProtocol v2 for a better implementation.
// A parser for a simple data protocol. // The "header" is a JSON object, followed by 2 \n characters, and // then a message body. // // NOTE: This can be done more simply as a Transform stream! // Using Readable directly for this is sub-optimal. See the // alternative example below under the Transform section. const Readable = require('stream').Readable; const util = require('util'); util.inherits(SimpleProtocol, Readable); function SimpleProtocol(source, options) { if (!(this instanceof SimpleProtocol)) return new SimpleProtocol(source, options); Readable.call(this, options); this._inBody = false; this._sawFirstCr = false; // source is a readable stream, such as a socket or file this._source = source; source.on('end', () => { this.push(null); }); // give it a kick whenever the source is readable // read(0) will not consume any bytes source.on('readable', () => { this.read(0); }); this._rawHeader = []; this.header = null; } SimpleProtocol.prototype._read = function(n) { if (!this._inBody) { var chunk = this._source.read(); // if the source doesn't have data, we don't have data yet. if (chunk === null) return this.push(''); // check if the chunk has a \n\n var split = -1; for (var i = 0; i < chunk.length; i++) { if (chunk[i] === 10) { // '\n' if (this._sawFirstCr) { split = i; break; } else { this._sawFirstCr = true; } } else { this._sawFirstCr = false; } } if (split === -1) { // still waiting for the \n\n // stash the chunk, and try again. this._rawHeader.push(chunk); this.push(''); } else { this._inBody = true; var h = chunk.slice(0, split); this._rawHeader.push(h); var header = Buffer.concat(this._rawHeader).toString(); try { this.header = JSON.parse(header); } catch (er) { this.emit('error', new Error('invalid simple protocol data')); return; } // now, because we got some extra data, unshift the rest // back into the read queue so that our consumer will see it. var b = chunk.slice(split); this.unshift(b); // calling unshift by itself does not reset the reading state // of the stream; since we're inside _read, doing an additional // push('') will reset the state appropriately. this.push(''); // and let them know that we are done parsing the header. this.emit('header', this.header); } } else { // from there on, just provide the data to our consumer. // careful not to push(null), since that would indicate EOF. var chunk = this._source.read(); if (chunk) this.push(chunk); } }; // Usage: // var parser = new SimpleProtocol(source); // Now parser is a readable stream that will emit 'header' // with the parsed header data.
Please login to continue.