stream.Readable

Class: stream.Readable

The Readable stream interface is the abstraction for a source of data that you are reading from. In other words, data comes out of a Readable stream.

A Readable stream will not start emitting data until you indicate that you are ready to receive it.

Readable streams have two "modes": a flowing mode and a paused mode. When in flowing mode, data is read from the underlying system and provided to your program as fast as possible. In paused mode, you must explicitly call stream.read() to get chunks of data out. Streams start out in paused mode.

Note: If no data event handlers are attached, and there are no stream.pipe() destinations, and the stream is switched into flowing mode, then data will be lost.

You can switch to flowing mode by doing any of the following:

  • Adding a 'data' event handler to listen for data.
  • Calling the stream.resume() method to explicitly open the flow.
  • Calling the stream.pipe() method to send the data to a Writable.

You can switch back to paused mode by doing either of the following:

  • If there are no pipe destinations, by calling the stream.pause() method.
  • If there are pipe destinations, by removing any 'data' event handlers, and removing all pipe destinations by calling the stream.unpipe() method.

Note that, for backwards compatibility reasons, removing 'data' event handlers will not automatically pause the stream. Also, if there are piped destinations, then calling stream.pause() will not guarantee that the stream will remain paused once those destinations drain and ask for more data.

Examples of readable streams include:

Event: 'close'

Emitted when the stream and any of its underlying resources (a file descriptor, for example) have been closed. The event indicates that no more events will be emitted, and no further computation will occur.

Not all streams will emit the 'close' event.

Event: 'data'

Attaching a 'data' event listener to a stream that has not been explicitly paused will switch the stream into flowing mode. Data will then be passed as soon as it is available.

If you just want to get all the data out of the stream as fast as possible, this is the best way to do so.

var readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log('got %d bytes of data', chunk.length);
});

Event: 'end'

This event fires when there will be no more data to read.

Note that the 'end' event will not fire unless the data is completely consumed. This can be done by switching into flowing mode, or by calling stream.read() repeatedly until you get to the end.

var readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log('got %d bytes of data', chunk.length);
});
readable.on('end', () => {
  console.log('there will be no more data.');
});

Event: 'error'

Emitted if there was an error receiving data.

Event: 'readable'

When a chunk of data can be read from the stream, it will emit a 'readable' event.

In some cases, listening for a 'readable' event will cause some data to be read into the internal buffer from the underlying system, if it hadn't already.

var readable = getReadableStreamSomehow();
readable.on('readable', () => {
  // there is some data to read now
});

Once the internal buffer is drained, a 'readable' event will fire again when more data is available.

The 'readable' event is not emitted in the "flowing" mode with the sole exception of the last one, on end-of-stream.

The 'readable' event indicates that the stream has new information: either new data is available or the end of the stream has been reached. In the former case, stream.read() will return that data. In the latter case, stream.read() will return null. For instance, in the following example, foo.txt is an empty file:

const fs = require('fs');
var rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
  console.log('readable:', rr.read());
});
rr.on('end', () => {
  console.log('end');
});

The output of running this script is:

$ node test.js
readable: null
end

readable.isPaused()

This method returns whether or not the readable has been explicitly paused by client code (using stream.pause() without a corresponding stream.resume()).

var readable = new stream.Readable

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false

readable.pause()

  • Return: this

This method will cause a stream in flowing mode to stop emitting 'data' events, switching out of flowing mode. Any data that becomes available will remain in the internal buffer.

var readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log('got %d bytes of data', chunk.length);
  readable.pause();
  console.log('there will be no more data for 1 second');
  setTimeout(() => {
    console.log('now data will start flowing again');
    readable.resume();
  }, 1000);
});

readable.pipe(destination[, options])

This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.

Multiple destinations can be piped to safely.

var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'
readable.pipe(writable);

This function returns the destination stream, so you can set up pipe chains like so:

var r = fs.createReadStream('file.txt');
var z = zlib.createGzip();
var w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

For example, emulating the Unix cat command:

process.stdin.pipe(process.stdout);

By default stream.end() is called on the destination when the source stream emits 'end', so that destination is no longer writable. Pass { end: false } as options to keep the destination stream open.

This keeps writer open so that "Goodbye" can be written at the end.

reader.pipe(writer, { end: false });
reader.on('end', () => {
  writer.end('Goodbye\n');
});

Note that process.stderr and process.stdout are never closed until the process exits, regardless of the specified options.

readable.read([size])

The read() method pulls some data out of the internal buffer and returns it. If there is no data available, then it will return null.

If you pass in a size argument, then it will return that many bytes. If size bytes are not available, then it will return null, unless we've ended, in which case it will return the data remaining in the buffer.

If you do not specify a size argument, then it will return all the data in the internal buffer.

This method should only be called in paused mode. In flowing mode, this method is called automatically until the internal buffer is drained.

var readable = getReadableStreamSomehow();
readable.on('readable', () => {
  var chunk;
  while (null !== (chunk = readable.read())) {
    console.log('got %d bytes of data', chunk.length);
  }
});

If this method returns a data chunk, then it will also trigger the emission of a 'data' event.

Note that calling stream.read([size]) after the 'end' event has been triggered will return null. No runtime error will be raised.

readable.resume()

  • Return: this

This method will cause the readable stream to resume emitting 'data' events.

This method will switch the stream into flowing mode. If you do not want to consume the data from a stream, but you do want to get to its 'end' event, you can call stream.resume() to open the flow of data.

var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', () => {
  console.log('got to the end, but did not read anything');
});

readable.setEncoding(encoding)

  • encoding <String> The encoding to use.
  • Return: this

Call this function to cause the stream to return strings of the specified encoding instead of Buffer objects. For example, if you do readable.setEncoding('utf8'), then the output data will be interpreted as UTF-8 data, and returned as strings. If you do readable.setEncoding('hex'), then the data will be encoded in hexadecimal string format.

This properly handles multi-byte characters that would otherwise be potentially mangled if you simply pulled the Buffers directly and called buf.toString(encoding) on them. If you want to read the data as strings, always use this method.

Also you can disable any encoding at all with readable.setEncoding(null). This approach is very useful if you deal with binary data or with large multi-byte strings spread out over multiple chunks.

var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
  assert.equal(typeof chunk, 'string');
  console.log('got %d characters of string data', chunk.length);
});

readable.unpipe([destination])

This method will remove the hooks set up for a previous stream.pipe() call.

If the destination is not specified, then all pipes are removed.

If the destination is specified, but no pipe is set up for it, then this is a no-op.

var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second
readable.pipe(writable);
setTimeout(() => {
  console.log('stop writing to file.txt');
  readable.unpipe(writable);
  console.log('manually close the file stream');
  writable.end();
}, 1000);

readable.unshift(chunk)

This is useful in certain cases where a stream is being consumed by a parser, which needs to "un-consume" some data that it has optimistically pulled out of the source, so that the stream can be passed on to some other party.

Note that stream.unshift(chunk) cannot be called after the 'end' event has been triggered; a runtime error will be raised.

If you find that you must often call stream.unshift(chunk) in your programs, consider implementing a Transform stream instead. (See API for Stream Implementors.)

// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
const StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  var decoder = new StringDecoder('utf8');
  var header = '';
  function onReadable() {
    var chunk;
    while (null !== (chunk = stream.read())) {
      var str = decoder.write(chunk);
      if (str.match(/\n\n/)) {
        // found the header boundary
        var split = str.split(/\n\n/);
        header += split.shift();
        var remaining = split.join('\n\n');
        var buf = new Buffer(remaining, 'utf8');
        if (buf.length)
          stream.unshift(buf);
        stream.removeListener('error', callback);
        stream.removeListener('readable', onReadable);
        // now the body of the message can be read from the stream.
        callback(null, header, stream);
      } else {
        // still reading the header.
        header += str;
      }
    }
  }
}

Note that, unlike stream.push(chunk), stream.unshift(chunk) will not end the reading process by resetting the internal reading state of the stream. This can cause unexpected results if unshift() is called during a read (i.e. from within a stream._read() implementation on a custom stream). Following the call to unshift() with an immediate stream.push('') will reset the reading state appropriately, however it is best to simply avoid calling unshift() while in the process of performing a read.

readable.wrap(stream)

  • stream <Stream> An "old style" readable stream

Versions of Node.js prior to v0.10 had streams that did not implement the entire Streams API as it is today. (See Compatibility for more information.)

If you are using an older Node.js library that emits 'data' events and has a stream.pause() method that is advisory only, then you can use the wrap() method to create a Readable stream that uses the old stream as its data source.

You will very rarely ever need to call this function, but it exists as a convenience for interacting with old Node.js programs and libraries.

For example:

const OldReader = require('./old-api-module.js').OldReader;
const Readable = require('stream').Readable;
const oreader = new OldReader;
const myReader = new Readable().wrap(oreader);

myReader.on('readable', () => {
  myReader.read(); // etc.
});
doc_Nodejs
2016-04-30 04:42:28
Comments
Leave a Comment

Please login to continue.