stream.Readable

Class: stream.Readable

stream.Readable is an abstract class designed to be extended with an underlying implementation of the stream._read(size) method.

Please see API for Stream Consumers for how to consume streams in your programs. What follows is an explanation of how to implement Readable streams in your programs.

new stream.Readable([options])

  • options <Object>
    • highWaterMark <Number> The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Default = 16384 (16kb), or 16 for objectMode streams
    • encoding <String> If specified, then buffers will be decoded to strings using the specified encoding. Default = null
    • objectMode <Boolean> Whether this stream should behave as a stream of objects. Meaning that stream.read(n) returns a single value instead of a Buffer of size n. Default = false
    • read <Function> Implementation for the stream._read() method.

In classes that extend the Readable class, make sure to call the Readable constructor so that the buffering settings can be properly initialized.

readable._read(size)

  • size <Number> Number of bytes to read asynchronously

Note: Implement this method, but do NOT call it directly.

This method is prefixed with an underscore because it is internal to the class that defines it and should only be called by the internal Readable class methods. All Readable stream implementations must provide a _read method to fetch data from the underlying resource.

When _read() is called, if data is available from the resource, the _read() implementation should start pushing that data into the read queue by calling this.push(dataChunk). _read() should continue reading from the resource and pushing data until push returns false, at which point it should stop reading from the resource. Only when _read() is called again after it has stopped should it start reading more data from the resource and pushing that data onto the queue.

Note: once the _read() method is called, it will not be called again until the stream.push() method is called.

The size argument is advisory. Implementations where a "read" is a single call that returns data can use this to know how much data to fetch. Implementations where that is not relevant, such as TCP or TLS, may ignore this argument, and simply provide data whenever it becomes available. There is no need, for example to "wait" until size bytes are available before calling stream.push(chunk).

readable.push(chunk[, encoding])

  • chunk <Buffer> | <Null> | <String> Chunk of data to push into the read queue
  • encoding <String> Encoding of String chunks. Must be a valid Buffer encoding, such as 'utf8' or 'ascii'
  • return <Boolean> Whether or not more pushes should be performed

Note: This method should be called by Readable implementors, NOT by consumers of Readable streams.

If a value other than null is passed, The push() method adds a chunk of data into the queue for subsequent stream processors to consume. If null is passed, it signals the end of the stream (EOF), after which no more data can be written.

The data added with push() can be pulled out by calling the stream.read() method when the 'readable' event fires.

This API is designed to be as flexible as possible. For example, you may be wrapping a lower-level source which has some sort of pause/resume mechanism, and a data callback. In those cases, you could wrap the low-level source object by doing something like this:

// source is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.

util.inherits(SourceWrapper, Readable);

function SourceWrapper(options) {
  Readable.call(this, options);

  this._source = getLowlevelSourceObject();

  // Every time there's data, we push it into the internal buffer.
  this._source.ondata = (chunk) => {
    // if push() returns false, then we need to stop reading from source
    if (!this.push(chunk))
      this._source.readStop();
  };

  // When the source ends, we push the EOF-signaling `null` chunk
  this._source.onend = () => {
    this.push(null);
  };
}

// _read will be called when the stream wants to pull more data in
// the advisory size argument is ignored in this case.
SourceWrapper.prototype._read = function(size) {
  this._source.readStart();
};

Example: A Counting Stream

This is a basic example of a Readable stream. It emits the numerals from 1 to 1,000,000 in ascending order, and then ends.

const Readable = require('stream').Readable;
const util = require('util');
util.inherits(Counter, Readable);

function Counter(opt) {
  Readable.call(this, opt);
  this._max = 1000000;
  this._index = 1;
}

Counter.prototype._read = function() {
  var i = this._index++;
  if (i > this._max)
    this.push(null);
  else {
    var str = '' + i;
    var buf = new Buffer(str, 'ascii');
    this.push(buf);
  }
};

Example: SimpleProtocol v1 (Sub-optimal)

This is similar to the parseHeader function described here, but implemented as a custom stream. Also, note that this implementation does not convert the incoming data to a string.

However, this would be better implemented as a Transform stream. See SimpleProtocol v2 for a better implementation.

// A parser for a simple data protocol.
// The "header" is a JSON object, followed by 2 \n characters, and
// then a message body.
//
// NOTE: This can be done more simply as a Transform stream!
// Using Readable directly for this is sub-optimal. See the
// alternative example below under the Transform section.

const Readable = require('stream').Readable;
const util = require('util');

util.inherits(SimpleProtocol, Readable);

function SimpleProtocol(source, options) {
  if (!(this instanceof SimpleProtocol))
    return new SimpleProtocol(source, options);

  Readable.call(this, options);
  this._inBody = false;
  this._sawFirstCr = false;

  // source is a readable stream, such as a socket or file
  this._source = source;

  source.on('end', () => {
    this.push(null);
  });

  // give it a kick whenever the source is readable
  // read(0) will not consume any bytes
  source.on('readable', () => {
    this.read(0);
  });

  this._rawHeader = [];
  this.header = null;
}

SimpleProtocol.prototype._read = function(n) {
  if (!this._inBody) {
    var chunk = this._source.read();

    // if the source doesn't have data, we don't have data yet.
    if (chunk === null)
      return this.push('');

    // check if the chunk has a \n\n
    var split = -1;
    for (var i = 0; i < chunk.length; i++) {
      if (chunk[i] === 10) { // '\n'
        if (this._sawFirstCr) {
          split = i;
          break;
        } else {
          this._sawFirstCr = true;
        }
      } else {
        this._sawFirstCr = false;
      }
    }

    if (split === -1) {
      // still waiting for the \n\n
      // stash the chunk, and try again.
      this._rawHeader.push(chunk);
      this.push('');
    } else {
      this._inBody = true;
      var h = chunk.slice(0, split);
      this._rawHeader.push(h);
      var header = Buffer.concat(this._rawHeader).toString();
      try {
        this.header = JSON.parse(header);
      } catch (er) {
        this.emit('error', new Error('invalid simple protocol data'));
        return;
      }
      // now, because we got some extra data, unshift the rest
      // back into the read queue so that our consumer will see it.
      var b = chunk.slice(split);
      this.unshift(b);
      // calling unshift by itself does not reset the reading state
      // of the stream; since we're inside _read, doing an additional
      // push('') will reset the state appropriately.
      this.push('');

      // and let them know that we are done parsing the header.
      this.emit('header', this.header);
    }
  } else {
    // from there on, just provide the data to our consumer.
    // careful not to push(null), since that would indicate EOF.
    var chunk = this._source.read();
    if (chunk) this.push(chunk);
  }
};

// Usage:
// var parser = new SimpleProtocol(source);
// Now parser is a readable stream that will emit 'header'
// with the parsed header data.
doc_Nodejs
2016-04-30 04:42:29
Comments
Leave a Comment

Please login to continue.