message_stream.js 6.53 KB
'use strict';

const Duplex = require('stream').Duplex;
const BufferList = require('bl');
const MongoParseError = require('../core/error').MongoParseError;
const decompress = require('../core/wireprotocol/compression').decompress;
const Response = require('../core/connection/commands').Response;
const BinMsg = require('../core/connection/msg').BinMsg;
const MongoError = require('../core/error').MongoError;
const OP_COMPRESSED = require('../core/wireprotocol/shared').opcodes.OP_COMPRESSED;
const OP_MSG = require('../core/wireprotocol/shared').opcodes.OP_MSG;
const MESSAGE_HEADER_SIZE = require('../core/wireprotocol/shared').MESSAGE_HEADER_SIZE;
const COMPRESSION_DETAILS_SIZE = require('../core/wireprotocol/shared').COMPRESSION_DETAILS_SIZE;
const opcodes = require('../core/wireprotocol/shared').opcodes;
const compress = require('../core/wireprotocol/compression').compress;
const compressorIDs = require('../core/wireprotocol/compression').compressorIDs;
const uncompressibleCommands = require('../core/wireprotocol/compression').uncompressibleCommands;
const Msg = require('../core/connection/msg').Msg;

const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;
const kBuffer = Symbol('buffer');

/**
 * A duplex stream that is capable of reading and writing raw wire protocol messages, with
 * support for optional compression
 */
class MessageStream extends Duplex {
  constructor(options) {
    options = options || {};
    super(options);

    this.bson = options.bson;
    this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize;

    this[kBuffer] = new BufferList();
  }

  _write(chunk, _, callback) {
    const buffer = this[kBuffer];
    buffer.append(chunk);

    processIncomingData(this, callback);
  }

  _read(/* size */) {
    // NOTE: This implementation is empty because we explicitly push data to be read
    //       when `writeMessage` is called.
    return;
  }

  writeCommand(command, operationDescription) {
    // TODO: agreed compressor should live in `StreamDescription`
    const shouldCompress = operationDescription && !!operationDescription.agreedCompressor;
    if (!shouldCompress || !canCompress(command)) {
      const data = command.toBin();
      this.push(Array.isArray(data) ? Buffer.concat(data) : data);
      return;
    }

    // otherwise, compress the message
    const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBin());
    const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);

    // Extract information needed for OP_COMPRESSED from the uncompressed message
    const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);

    // Compress the message body
    compress({ options: operationDescription }, messageToBeCompressed, (err, compressedMessage) => {
      if (err) {
        operationDescription.cb(err, null);
        return;
      }

      // Create the msgHeader of OP_COMPRESSED
      const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
      msgHeader.writeInt32LE(
        MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length,
        0
      ); // messageLength
      msgHeader.writeInt32LE(command.requestId, 4); // requestID
      msgHeader.writeInt32LE(0, 8); // responseTo (zero)
      msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode

      // Create the compression details of OP_COMPRESSED
      const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
      compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
      compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
      compressionDetails.writeUInt8(compressorIDs[operationDescription.agreedCompressor], 8); // compressorID

      this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage]));
    });
  }
}

// Return whether a command contains an uncompressible command term
// Will return true if command contains no uncompressible command terms
function canCompress(command) {
  const commandDoc = command instanceof Msg ? command.command : command.query;
  const commandName = Object.keys(commandDoc)[0];
  return !uncompressibleCommands.has(commandName);
}

function processIncomingData(stream, callback) {
  const buffer = stream[kBuffer];
  if (buffer.length < 4) {
    callback();
    return;
  }

  const sizeOfMessage = buffer.readInt32LE(0);
  if (sizeOfMessage < 0) {
    callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`));
    return;
  }

  if (sizeOfMessage > stream.maxBsonMessageSize) {
    callback(
      new MongoParseError(
        `Invalid message size: ${sizeOfMessage}, max allowed: ${stream.maxBsonMessageSize}`
      )
    );
    return;
  }

  if (sizeOfMessage > buffer.length) {
    callback();
    return;
  }

  const message = buffer.slice(0, sizeOfMessage);
  buffer.consume(sizeOfMessage);

  const messageHeader = {
    length: message.readInt32LE(0),
    requestId: message.readInt32LE(4),
    responseTo: message.readInt32LE(8),
    opCode: message.readInt32LE(12)
  };

  let ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response;
  const responseOptions = stream.responseOptions;
  if (messageHeader.opCode !== OP_COMPRESSED) {
    const messageBody = message.slice(MESSAGE_HEADER_SIZE);
    stream.emit(
      'message',
      new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions)
    );

    if (buffer.length >= 4) {
      processIncomingData(stream, callback);
    } else {
      callback();
    }

    return;
  }

  messageHeader.fromCompressed = true;
  messageHeader.opCode = message.readInt32LE(MESSAGE_HEADER_SIZE);
  messageHeader.length = message.readInt32LE(MESSAGE_HEADER_SIZE + 4);
  const compressorID = message[MESSAGE_HEADER_SIZE + 8];
  const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9);

  // recalculate based on wrapped opcode
  ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response;

  decompress(compressorID, compressedBuffer, (err, messageBody) => {
    if (err) {
      callback(err);
      return;
    }

    if (messageBody.length !== messageHeader.length) {
      callback(
        new MongoError(
          'Decompressing a compressed message from the server failed. The message is corrupt.'
        )
      );

      return;
    }

    stream.emit(
      'message',
      new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions)
    );

    if (buffer.length >= 4) {
      processIncomingData(stream, callback);
    } else {
      callback();
    }
  });
}

module.exports = MessageStream;