connection-manager.js 8.46 KB
'use strict';

var Pooling = require('generic-pool')
  , Promise = require('../../promise')
  , _ = require('lodash')
  , semver = require('semver')
  , defaultPoolingConfig = {
    max: 5,
    min: 0,
    idle: 10000,
    handleDisconnects: true
  }
  , ConnectionManager;

ConnectionManager = function(dialect, sequelize) {
  var config = _.cloneDeep(sequelize.config);

  this.sequelize = sequelize;
  this.config = config;
  this.dialect = dialect;
  this.versionPromise = null;
  this.dialectName = this.sequelize.options.dialect;

  if (config.pool) {
    config.pool = _.clone(config.pool); // Make sure we don't modify the existing config object (user might re-use it)
    config.pool =_.defaults(config.pool, defaultPoolingConfig, {
      validate: this.$validate.bind(this)
    }) ;
  } else {
    // If the user has turned off pooling we provide a 0/1 pool for backwards compat
    config.pool = _.defaults({
      max: 1,
      min: 0
    }, defaultPoolingConfig, {
      validate: this.$validate.bind(this)
    });
  }

  // Map old names
  if (config.pool.maxIdleTime) config.pool.idle = config.pool.maxIdleTime;
  if (config.pool.maxConnections) config.pool.max = config.pool.maxConnections;
  if (config.pool.minConnections) config.pool.min = config.pool.minConnections;

  this.onProcessExit = this.onProcessExit.bind(this); // Save a reference to the bound version so we can remove it with removeListener
  process.on('exit', this.onProcessExit);
};

ConnectionManager.prototype.refreshTypeParser = function(dataTypes) {
  _.each(dataTypes, function (dataType, key) {
    if (dataType.hasOwnProperty('parse')) {
      var dialectName = this.dialectName;
      if (dialectName === 'mariadb') {
        dialectName = 'mysql';
      }

      if (dataType.types[dialectName]) {
        this.$refreshTypeParser(dataType);
      } else {
        throw new Error('Parse function not supported for type ' + dataType.key + ' in dialect ' + this.dialectName);
      }
    }
  }.bind(this));
};

ConnectionManager.prototype.onProcessExit = function() {
  var self = this;

  if (this.pool) {
    this.pool.drain(function() {
      self.pool.destroyAllNow();
    });
  }
};

ConnectionManager.prototype.close = function () {
  this.onProcessExit();
  process.removeListener('exit', this.onProcessExit); // Remove the listener, so all references to this instance can be garbage collected.

  this.getConnection = function () {
    return Promise.reject(new Error('ConnectionManager.getConnection was called after the connection manager was closed!'));
  };
};

// This cannot happen in the constructor because the user can specify a min. number of connections to have in the pool
// If he does this, generic-pool will try to call connect before the dialect-specific connection manager has been correctly set up
ConnectionManager.prototype.initPools = function () {
  var self = this
    , config = this.config;

  if (!config.replication) {
    this.pool = Pooling.Pool({
      name: 'sequelize-connection',
      create: function(callback) {
        self.$connect(config).nodeify(function (err, connection) {
          callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
        });
      },
      destroy: function(connection) {
        self.$disconnect(connection);
        return null;
      },
      max: config.pool.max,
      min: config.pool.min,
      validate: config.pool.validate,
      idleTimeoutMillis: config.pool.idle
    });
    return;
  }

  var reads = 0;

  if (!Array.isArray(config.replication.read)) {
    config.replication.read = [config.replication.read];
  }

  // Map main connection config
  config.replication.write = _.defaults(config.replication.write, _.omit(config, 'replication'));

  // Apply defaults to each read config
  config.replication.read = _.map(config.replication.read, function(readConfig) {
    return _.defaults(readConfig, _.omit(self.config, 'replication'));
  });

  // I'll make my own pool, with blackjack and hookers! (original credit goes to @janzeh)
  this.pool = {
    release: function(client) {
      if (client.queryType === 'read') {
        return self.pool.read.release(client);
      } else {
        return self.pool.write.release(client);
      }
    },
    acquire: function(callback, priority, queryType, useMaster) {
      useMaster = _.isUndefined(useMaster) ? false : useMaster;
      if (queryType === 'SELECT' && !useMaster) {
        self.pool.read.acquire(callback, priority);
      } else {
        self.pool.write.acquire(callback, priority);
      }
    },
    destroy: function(connection) {
      return self.pool[connection.queryType].destroy(connection);
    },
    destroyAllNow: function() {
      self.pool.read.destroyAllNow();
      self.pool.write.destroyAllNow();
    },
    drain: function(cb) {
      self.pool.write.drain(function() {
        self.pool.read.drain(cb);
      });
    },
    read: Pooling.Pool({
      name: 'sequelize-connection-read',
      create: function(callback) {
        // Simple round robin config
        var nextRead = reads++ % config.replication.read.length;
        self.$connect(config.replication.read[nextRead]).tap(function (connection) {
          connection.queryType = 'read';
        }).nodeify(function (err, connection) {
          callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
        });
      },
      destroy: function(connection) {
        self.$disconnect(connection);
        return null;
      },
      validate: config.pool.validate,
      max: config.pool.max,
      min: config.pool.min,
      idleTimeoutMillis: config.pool.idle
    }),
    write: Pooling.Pool({
      name: 'sequelize-connection-write',
      create: function(callback) {
        self.$connect(config.replication.write).tap(function (connection) {
          connection.queryType = 'write';
        }).nodeify(function (err, connection) {
          callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
        });
      },
      destroy: function(connection) {
        self.$disconnect(connection);
        return null;
      },
      validate: config.pool.validate,
      max: config.pool.max,
      min: config.pool.min,
      idleTimeoutMillis: config.pool.idle
    })
  };
};

ConnectionManager.prototype.getConnection = function(options) {
  var self = this;
  options = options || {};

  var promise;
  if (this.sequelize.options.databaseVersion === 0) {
    if (this.versionPromise) {
      promise = this.versionPromise;
    } else {
      promise = this.versionPromise = self.$connect(self.config.replication.write || self.config).then(function (connection) {
        var _options = {};
        _options.transaction = { connection: connection }; // Cheat .query to use our private connection
        _options.logging = function () {};
        _options.logging.__testLoggingFn = true;

        return self.sequelize.databaseVersion(_options).then(function (version) {
          self.sequelize.options.databaseVersion = semver.valid(version) ? version : self.defaultVersion;

          self.versionPromise = null;

          self.$disconnect(connection);
          return null;
        });
      }).catch(function (err) {
        self.versionPromise = null;
        throw err;
      });
    }
  } else {
    promise = Promise.resolve();
  }

  return promise.then(function () {
    return new Promise(function (resolve, reject) {
      self.pool.acquire(function(err, connection) {
        if (err) return reject(err);
        resolve(connection);
      }, options.priority, options.type, options.useMaster);
    });
  });
};

ConnectionManager.prototype.releaseConnection = function(connection) {
  var self = this;

  return new Promise(function (resolve, reject) {
    self.pool.release(connection);
    resolve();
  });
};

ConnectionManager.prototype.$connect = function(config) {
  return this.sequelize.runHooks('beforeConnect', config).bind(this).then(function () {
    return this.dialect.connectionManager.connect(config);
  }).then(function(connection) {
    return this.sequelize.runHooks('afterConnect', connection, config).return(connection);
  });
};
ConnectionManager.prototype.$disconnect = function(connection) {
  return this.dialect.connectionManager.disconnect(connection);
};

ConnectionManager.prototype.$validate = function(connection) {
  if (!this.dialect.connectionManager.validate) return true;
  return this.dialect.connectionManager.validate(connection);
};

module.exports = ConnectionManager;