1. 前言

当我尝试去看process.stdin和process.stdout的概念时,发现它是基于stream的,似乎node中很多都是基于此的。

stream是什么呢?是处理系统缓存的一种方式,在node中,处理缓存有2种方式:

很明显,在处理较大文件时stream的方式更好。

可读流和可写流都会在一个内部的缓冲器中存储数据,可以分别使用writable.writableBufferreadable.readableBuffer来获取

可缓冲的数据的数量取决于传入流构造函数的highWaterMark的值,例如fs.createReadStream(path, { highWaterMark: 12 }),缓冲区最多为12个字节。

当调用stream.push(chunk)时,数据会被缓冲到可读流中。如果消费程序没有调用stream.read(),那这些数据会停留在内部中直到被消费。

一旦内部的可读缓冲大小达到了highWaterMark,就会暂停读取数据,直到缓冲区中的数据被消费。

当反复的调用writable.write(chunk)方法时,数据就会被写入到可写流中,同理。

stream.pipe()的作用是为了限制数据缓冲到可读写流中,保证内存不会被占用完全。

因为Duplex和Transform是可读写的,所以他们各自维护着两个相互独立的内部缓冲区用于读写,这样他们可以独立的读写数据。

很多时候,我们不直接使用stream,更多的使用场景是在使用其它支持流的类中。

stream的有点还在于处理异步io的友好性,例如:在异步读写文件时的回调地狱中,如果改用stream

// 都是回调
fs.readFile(tmp, (err, data) => {
    fs.writeFile(tmp, err => {
    })
})

// stream
const readStream = fs.createReadStream(path.resolve(__dirname, '../../README.md'))
const writeStream = fs.createWriteStream(path.resolve(__dirname, '../../test.md'))

readStream.on('data', chunk => {
  writeStream.write(chunk)
})

readStream.on('end', () => {
  writeStream.end()
})

从上面的例子可以看出,所有的流都是EventEmitter的实例,stream流可以理解为生产者消费者,但数据被读取消费时才会继续生产,而不是一次性读取完。

node中部署了Stream接口的如下:

2. Stream分类

有4种stream类型

每个类分别有自己的属性和方法,上述说的4种node中部署了Stream接口,都是基于这些类的实例。

2.1 可读流

可以想象成它是数据的生产者,支持可读流的有

可读流的2种模式:

所有流默认都是paused模式,可以切换到flowing模式

flowing模式切换到paused模式:

可读流的3种状态切换:

// 使用paused()
readStream.on('data', chunk => {
  process.stdout.write(chunk)
})
readStream.paused()

// 使用pipe()
console.log(readStream._readableState.flowing) // null
readStream.pipe(process.stdout)
console.log(readStream._readableState.flowing)  // true
readStream.unpipe()
console.log(readStream._readableState.flowing)  // false

可读流的属性和方法,还有EventEmitter事件,我们可以打印readStream

// 可以看到是ReadStream类的实例
ReadStream {
  _readableState: // _readableState是ReadableState的实例
   ReadableState {
     objectMode: false,
     highWaterMark: 65536,
     buffer: BufferList { head: null, tail: null, length: 0 },
     length: 0,
     pipes: null,
     pipesCount: 0,
     flowing: null,
     ended: false,
     endEmitted: false,
     reading: false,
     sync: true,
     needReadable: false,
     emittedReadable: false,
     readableListening: false,
     resumeScheduled: false,
     destroyed: false,
     defaultEncoding: 'utf8',
     awaitDrain: 0,
     readingMore: false,
     decoder: null,
     encoding: null },
  readable: true,   // 表示可读
  domain: null,
  _events: { end: [Function] },     // EventEmitter事件函数
  _eventsCount: 1,      // 事件的数量
  _maxListeners: undefined,
  path: '/Users/yangming/Documents/github/nodejs-learn/README.md',
  fd: null,     // 文件描述符
  flags: 'r',   // 模式
  mode: 438,
  start: undefined,
  end: Infinity,
  autoClose: true,
  pos: undefined,
  bytesRead: 0 }

EventEmitter事件有

可读流的方法

2.2 可写流

同理,可写流可以想象成数据的消费者。

可写流的EventEmitter事件

可写流的方法

例如创建http服务器

const http = require('http')

const server = http.createServer((req, res) => {
  // request为客户端请求对象 http.IncomingMessagede 实例
  // response为服务端响应数据 http.ServerResponse 实例
  let body = ''

  // req为可读流,可以设置编码
  // res为可写流
  req.setEncoding('utf8')
  // 转换为flowing模式
  req.on('data', chunk => {
    body += chunk // 这里的chunk为字符串,不存在自动转码的问题
  })

  // 可读流的end事件
  req.on('end', () => {
    try {
      const data = JSON.parse(body)
      // writable.write()
      res.write(typeof data)
      // writable.end()
      res.end()
    } catch (er) {
      res.statusCode = 400
      return res.end(`错误: ${er.message}`)
    }
  })
})

server.listen(8088)

// $ curl localhost:8088 -d "{}"
// object
// $ curl localhost:8088 -d "\"foo\""
// string
// $ curl localhost:8088 -d "not json"
// 错误: Unexpected token o in JSON at position 1

2.3 读写流

读写流为stream.Duplexstream.Transform

读写流内部都有自己的实现方法

但是Transform有额外的转换方法,为了将输入和输出的数据关联起来

对于可读写流在实际例子中的作用与实现还有待验证。

node中的2个核心Stream和EventEmitter,几乎所有的类都继承了它们。

参考: