index.js 5.98 KB
'use strict';

var STREAM = require('stream'),
    UTIL = require('util'),
    StringDecoder = require('string_decoder').StringDecoder;

function MemoryReadableStream(data, options) {
    if (!(this instanceof MemoryReadableStream))
        return new MemoryReadableStream(data, options);
    MemoryReadableStream.super_.call(this, options);
    this.init(data, options);
}
UTIL.inherits(MemoryReadableStream, STREAM.Readable);


function MemoryWritableStream(data, options) {
    if (!(this instanceof MemoryWritableStream))
        return new MemoryWritableStream(data, options);
    MemoryWritableStream.super_.call(this, options);
    this.init(data, options);
}
UTIL.inherits(MemoryWritableStream, STREAM.Writable);


function MemoryDuplexStream(data, options) {
    if (!(this instanceof MemoryDuplexStream))
        return new MemoryDuplexStream(data, options);
    MemoryDuplexStream.super_.call(this, options);
    this.init(data, options);
}
UTIL.inherits(MemoryDuplexStream, STREAM.Duplex);


MemoryReadableStream.prototype.init =
MemoryWritableStream.prototype.init =
MemoryDuplexStream.prototype.init = function init (data, options) {
    var self = this;
    this.queue = [];

    if (data) {
        if (!Array.isArray(data)) {
            data = [ data ];
        }

        data.forEach(function (chunk) {
            if (!(chunk instanceof Buffer)) {
                chunk = new Buffer(chunk);
            }
            self.queue.push(chunk);
        });

    }
    
    options = options || {};
    
    this.maxbufsize = options.hasOwnProperty('maxbufsize') ? options.maxbufsize
            : null;
    this.bufoverflow = options.hasOwnProperty('bufoverflow') ? options.bufoverflow
            : null;
    this.frequence = options.hasOwnProperty('frequence') ? options.frequence
            : null;
};

function MemoryStream (data, options) {
    if (!(this instanceof MemoryStream))
        return new MemoryStream(data, options);
    
    options = options || {};
    
    var readable = options.hasOwnProperty('readable') ? options.readable : true,
        writable = options.hasOwnProperty('writable') ? options.writable : true;
    
    if (readable && writable) {
        return new MemoryDuplexStream(data, options);
    } else if (readable) {
        return new MemoryReadableStream(data, options);
    } else if (writable) {
        return new MemoryWritableStream(data, options);
    } else {
        throw new Error("Unknown stream type  Readable, Writable or Duplex ");
    }
}


MemoryStream.createReadStream = function (data, options) {
    options = options || {};
    options.readable = true;
    options.writable = false;

    return new MemoryStream(data, options);
};


MemoryStream.createWriteStream = function (data, options) {
    options = options || {};
    options.readable = false;
    options.writable = true;

    return new MemoryStream(data, options);
};


MemoryReadableStream.prototype._read =
MemoryDuplexStream.prototype._read = function _read (n) {
    var self = this,
        frequence = self.frequence || 0,
        wait_data = this instanceof STREAM.Duplex && ! this._writableState.finished ? true : false;
    if ( ! this.queue.length && ! wait_data) {
        this.push(null);// finish stream
    } else if (this.queue.length) {
        setTimeout(function () {
            if (self.queue.length) {
                var chunk = self.queue.shift();
                if (chunk && ! self._readableState.ended) {
                    if ( ! self.push(chunk) ) {
                        self.queue.unshift(chunk);
                    }
                }
            }
        }, frequence);
    }
};


MemoryWritableStream.prototype._write =
MemoryDuplexStream.prototype._write = function _write (chunk, encoding, cb) {
    var decoder = null;
    try {
        decoder = this.decodeStrings && encoding ? new StringDecoder(encoding) : null;
    } catch (err){
        return cb(err);
    }
    
    var decoded_chunk = decoder ? decoder.write(chunk) : chunk,
        queue_size = this._getQueueSize(),
        chunk_size = decoded_chunk.length;
    
    if (this.maxbufsize && (queue_size + chunk_size) > this.maxbufsize ) {
        if (this.bufoverflow) {
            return cb("Buffer overflowed (" + this.bufoverflow + "/" + queue_size + ")");
        } else {
            return cb();
        }
    }
    
    if (this instanceof STREAM.Duplex) {
        while (this.queue.length) {
            this.push(this.queue.shift());
        }
        this.push(decoded_chunk);
    } else {
        this.queue.push(decoded_chunk);
    }
    cb();
};


MemoryDuplexStream.prototype.end = function (chunk, encoding, cb) {
    var self = this;
    return MemoryDuplexStream.super_.prototype.end.call(this, chunk, encoding, function () {
        self.push(null);//finish readble stream too
        if (cb) cb();
    });
};


MemoryReadableStream.prototype._getQueueSize =  
MemoryWritableStream.prototype._getQueueSize = 
MemoryDuplexStream.prototype._getQueueSize = function () {
    var queuesize = 0, i;
    for (i = 0; i < this.queue.length; i++) {
        queuesize += Array.isArray(this.queue[i]) ? this.queue[i][0].length
                : this.queue[i].length;
    }
    return queuesize;
};


MemoryWritableStream.prototype.toString = 
MemoryDuplexStream.prototype.toString = 
MemoryReadableStream.prototype.toString = 
MemoryWritableStream.prototype.getAll = 
MemoryDuplexStream.prototype.getAll = 
MemoryReadableStream.prototype.getAll = function () {
    var self = this,
        ret = '';
    this.queue.forEach(function (data) {
        ret += data;
    });
    return ret;
};


MemoryWritableStream.prototype.toBuffer = 
MemoryDuplexStream.prototype.toBuffer = 
MemoryReadableStream.prototype.toBuffer = function () {
    var buffer = new Buffer(this._getQueueSize()),
        currentOffset = 0;

    this.queue.forEach(function (data) {
        var data_buffer = data instanceof Buffer ? data : new Buffer(data);
        data_buffer.copy(buffer, currentOffset);
        currentOffset += data.length;
    });
    return buffer;
};


module.exports = MemoryStream;