connection-manager.js
5.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
'use strict';
const _ = require('lodash');
const AbstractConnectionManager = require('../abstract/connection-manager');
const SequelizeErrors = require('../../errors');
const Utils = require('../../utils');
const DataTypes = require('../../data-types').mysql;
const momentTz = require('moment-timezone');
const debug = Utils.getLogger().debugContext('connection:mysql');
const parserMap = new Map();
/**
* MySQL Connection Managger
*
* Get connections, validate and disconnect them.
* AbstractConnectionManager pooling use it to handle MySQL specific connections
* Use https://github.com/sidorares/node-mysql2 to connect with MySQL server
*
* @extends AbstractConnectionManager
* @return Class<ConnectionManager>
* @private
*/
class ConnectionManager extends AbstractConnectionManager {
constructor(dialect, sequelize) {
super(dialect, sequelize);
this.sequelize = sequelize;
this.sequelize.config.port = this.sequelize.config.port || 3306;
try {
if (sequelize.config.dialectModulePath) {
this.lib = require(sequelize.config.dialectModulePath);
} else {
this.lib = require('mysql2');
}
} catch (err) {
if (err.code === 'MODULE_NOT_FOUND') {
throw new Error('Please install mysql2 package manually');
}
throw err;
}
this.refreshTypeParser(DataTypes);
}
// Update parsing when the user has added additional, custom types
_refreshTypeParser(dataType) {
for (const type of dataType.types.mysql) {
parserMap.set(type, dataType.parse);
}
}
_clearTypeParser() {
parserMap.clear();
}
static _typecast(field, next) {
if (parserMap.has(field.type)) {
return parserMap.get(field.type)(field, this.sequelize.options, next);
}
return next();
}
/**
* Connect with MySQL database based on config, Handle any errors in connection
* Set the pool handlers on connection.error
* Also set proper timezone once conection is connected
*
* @return Promise<Connection>
* @private
*/
connect(config) {
const connectionConfig = {
host: config.host,
port: config.port,
user: config.username,
flags: '-FOUND_ROWS',
password: config.password,
database: config.database,
timezone: this.sequelize.options.timezone,
typeCast: ConnectionManager._typecast.bind(this),
bigNumberStrings: false,
supportBigNumbers: true
};
if (config.dialectOptions) {
for (const key of Object.keys(config.dialectOptions)) {
connectionConfig[key] = config.dialectOptions[key];
}
}
return new Utils.Promise((resolve, reject) => {
const connection = this.lib.createConnection(connectionConfig);
const errorHandler = e => {
// clean up connect event if there is error
connection.removeListener('connect', connectHandler);
if (config.pool.handleDisconnects) {
debug(`connection error ${e.code}`);
if (e.code === 'PROTOCOL_CONNECTION_LOST') {
this.pool.destroy(connection)
.catch(/Resource not currently part of this pool/, () => {});
return;
}
}
connection.removeListener('error', errorHandler);
reject(e);
};
const connectHandler = () => {
if (!config.pool.handleDisconnects) {
// clean up error event if connected
connection.removeListener('error', errorHandler);
}
resolve(connection);
};
connection.on('error', errorHandler);
connection.once('connect', connectHandler);
})
.tap (() => { debug('connection acquired'); })
.then(connection => {
return new Utils.Promise((resolve, reject) => {
// set timezone for this connection
// but named timezone are not directly supported in mysql, so get its offset first
let tzOffset = this.sequelize.options.timezone;
tzOffset = /\//.test(tzOffset) ? momentTz.tz(tzOffset).format('Z') : tzOffset;
connection.query(`SET time_zone = '${tzOffset}'`, err => {
if (err) { reject(err); } else { resolve(connection); }
});
});
})
.catch(err => {
switch (err.code) {
case 'ECONNREFUSED':
throw new SequelizeErrors.ConnectionRefusedError(err);
case 'ER_ACCESS_DENIED_ERROR':
throw new SequelizeErrors.AccessDeniedError(err);
case 'ENOTFOUND':
throw new SequelizeErrors.HostNotFoundError(err);
case 'EHOSTUNREACH':
throw new SequelizeErrors.HostNotReachableError(err);
case 'EINVAL':
throw new SequelizeErrors.InvalidConnectionError(err);
default:
throw new SequelizeErrors.ConnectionError(err);
}
});
}
disconnect(connection) {
// Dont disconnect connections with CLOSED state
if (connection._closing) {
debug('connection tried to disconnect but was already at CLOSED state');
return Utils.Promise.resolve();
}
return new Utils.Promise((resolve, reject) => {
connection.end(err => {
if (err) {
reject(new SequelizeErrors.ConnectionError(err));
} else {
debug('connection disconnected');
resolve();
}
});
});
}
validate(connection) {
return connection && connection._fatalError === null && connection._protocolError === null && !connection._closing &&
!connection.stream.destroyed;
}
}
_.extend(ConnectionManager.prototype, AbstractConnectionManager.prototype);
module.exports = ConnectionManager;
module.exports.ConnectionManager = ConnectionManager;
module.exports.default = ConnectionManager;