server.js 10.7 KB
"use strict";
/*
 * Copyright 2019 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */
Object.defineProperty(exports, "__esModule", { value: true });
const http2 = require("http2");
const url_1 = require("url");
const constants_1 = require("./constants");
const metadata_1 = require("./metadata");
const server_call_1 = require("./server-call");
function noop() { }
const unimplementedStatusResponse = {
    code: constants_1.Status.UNIMPLEMENTED,
    details: 'The server does not implement this method',
    metadata: new metadata_1.Metadata(),
};
const defaultHandler = {
    unary(call, callback) {
        callback(unimplementedStatusResponse, null);
    },
    clientStream(call, callback) {
        callback(unimplementedStatusResponse, null);
    },
    serverStream(call) {
        call.emit('error', unimplementedStatusResponse);
    },
    bidi(call) {
        call.emit('error', unimplementedStatusResponse);
    },
};
// tslint:enable:no-any
class Server {
    constructor(options) {
        this.http2Server = null;
        this.handlers = new Map();
        this.sessions = new Set();
        this.started = false;
    }
    addProtoService() {
        throw new Error('Not implemented. Use addService() instead');
    }
    addService(service, implementation) {
        if (this.started === true) {
            throw new Error("Can't add a service to a started server.");
        }
        if (service === null ||
            typeof service !== 'object' ||
            implementation === null ||
            typeof implementation !== 'object') {
            throw new Error('addService() requires two objects as arguments');
        }
        const serviceKeys = Object.keys(service);
        if (serviceKeys.length === 0) {
            throw new Error('Cannot add an empty service to a server');
        }
        serviceKeys.forEach(name => {
            const attrs = service[name];
            let methodType;
            if (attrs.requestStream) {
                if (attrs.responseStream) {
                    methodType = 'bidi';
                }
                else {
                    methodType = 'clientStream';
                }
            }
            else {
                if (attrs.responseStream) {
                    methodType = 'serverStream';
                }
                else {
                    methodType = 'unary';
                }
            }
            let implFn = implementation[name];
            let impl;
            if (implFn === undefined && typeof attrs.originalName === 'string') {
                implFn = implementation[attrs.originalName];
            }
            if (implFn !== undefined) {
                impl = implFn.bind(implementation);
            }
            else {
                impl = defaultHandler[methodType];
            }
            const success = this.register(attrs.path, impl, attrs.responseSerialize, attrs.requestDeserialize, methodType);
            if (success === false) {
                throw new Error(`Method handler for ${attrs.path} already provided.`);
            }
        });
    }
    bind(port, creds) {
        throw new Error('Not implemented. Use bindAsync() instead');
    }
    bindAsync(port, creds, callback) {
        if (this.started === true) {
            throw new Error('server is already started');
        }
        if (typeof port !== 'string') {
            throw new TypeError('port must be a string');
        }
        if (creds === null || typeof creds !== 'object') {
            throw new TypeError('creds must be an object');
        }
        if (typeof callback !== 'function') {
            throw new TypeError('callback must be a function');
        }
        const url = new url_1.URL(`http://${port}`);
        const options = { host: url.hostname, port: +url.port };
        if (creds._isSecure()) {
            this.http2Server = http2.createSecureServer(creds._getSettings());
        }
        else {
            this.http2Server = http2.createServer();
        }
        this.http2Server.setTimeout(0, noop);
        this._setupHandlers();
        function onError(err) {
            callback(err, -1);
        }
        this.http2Server.once('error', onError);
        this.http2Server.listen(options, () => {
            const server = this.http2Server;
            const port = server.address().port;
            server.removeListener('error', onError);
            callback(null, port);
        });
    }
    forceShutdown() {
        // Close the server if it is still running.
        if (this.http2Server && this.http2Server.listening) {
            this.http2Server.close();
        }
        this.started = false;
        // Always destroy any available sessions. It's possible that one or more
        // tryShutdown() calls are in progress. Don't wait on them to finish.
        this.sessions.forEach(session => {
            // Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
            // recognize destroy(code) as a valid signature.
            // tslint:disable-next-line:no-any
            session.destroy(http2.constants.NGHTTP2_CANCEL);
        });
        this.sessions.clear();
    }
    register(name, handler, serialize, deserialize, type) {
        if (this.handlers.has(name)) {
            return false;
        }
        this.handlers.set(name, {
            func: handler,
            serialize,
            deserialize,
            type,
        });
        return true;
    }
    start() {
        if (this.http2Server === null || this.http2Server.listening !== true) {
            throw new Error('server must be bound in order to start');
        }
        if (this.started === true) {
            throw new Error('server is already started');
        }
        this.started = true;
    }
    tryShutdown(callback) {
        let pendingChecks = 0;
        function maybeCallback() {
            pendingChecks--;
            if (pendingChecks === 0) {
                callback();
            }
        }
        // Close the server if necessary.
        this.started = false;
        if (this.http2Server && this.http2Server.listening) {
            pendingChecks++;
            this.http2Server.close(maybeCallback);
        }
        // If any sessions are active, close them gracefully.
        pendingChecks += this.sessions.size;
        this.sessions.forEach(session => {
            session.close(maybeCallback);
        });
        // If the server is closed and there are no active sessions, just call back.
        if (pendingChecks === 0) {
            callback();
        }
    }
    addHttp2Port() {
        throw new Error('Not yet implemented');
    }
    _setupHandlers() {
        if (this.http2Server === null) {
            return;
        }
        this.http2Server.on('stream', (stream, headers) => {
            const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
            if (typeof contentType !== 'string' ||
                !contentType.startsWith('application/grpc')) {
                stream.respond({
                    [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
                }, { endStream: true });
                return;
            }
            try {
                const path = headers[http2.constants.HTTP2_HEADER_PATH];
                const handler = this.handlers.get(path);
                if (handler === undefined) {
                    throw unimplementedStatusResponse;
                }
                const call = new server_call_1.Http2ServerCallStream(stream, handler);
                const metadata = call.receiveMetadata(headers);
                switch (handler.type) {
                    case 'unary':
                        handleUnary(call, handler, metadata);
                        break;
                    case 'clientStream':
                        handleClientStreaming(call, handler, metadata);
                        break;
                    case 'serverStream':
                        handleServerStreaming(call, handler, metadata);
                        break;
                    case 'bidi':
                        handleBidiStreaming(call, handler, metadata);
                        break;
                    default:
                        throw new Error(`Unknown handler type: ${handler.type}`);
                }
            }
            catch (err) {
                const call = new server_call_1.Http2ServerCallStream(stream, null);
                if (err.code === undefined) {
                    err.code = constants_1.Status.INTERNAL;
                }
                call.sendError(err);
            }
        });
        this.http2Server.on('session', session => {
            if (!this.started) {
                session.destroy();
                return;
            }
            this.sessions.add(session);
        });
    }
}
exports.Server = Server;
async function handleUnary(call, handler, metadata) {
    const emitter = new server_call_1.ServerUnaryCallImpl(call, metadata);
    const request = await call.receiveUnaryMessage();
    if (request === undefined || call.cancelled) {
        return;
    }
    emitter.request = request;
    handler.func(emitter, (err, value, trailer, flags) => {
        call.sendUnaryMessage(err, value, trailer, flags);
    });
}
function handleClientStreaming(call, handler, metadata) {
    const stream = new server_call_1.ServerReadableStreamImpl(call, metadata, handler.deserialize);
    function respond(err, value, trailer, flags) {
        stream.destroy();
        call.sendUnaryMessage(err, value, trailer, flags);
    }
    if (call.cancelled) {
        return;
    }
    stream.on('error', respond);
    handler.func(stream, respond);
}
async function handleServerStreaming(call, handler, metadata) {
    const request = await call.receiveUnaryMessage();
    if (request === undefined || call.cancelled) {
        return;
    }
    const stream = new server_call_1.ServerWritableStreamImpl(call, metadata, handler.serialize);
    stream.request = request;
    handler.func(stream);
}
function handleBidiStreaming(call, handler, metadata) {
    const stream = new server_call_1.ServerDuplexStreamImpl(call, metadata, handler.serialize, handler.deserialize);
    if (call.cancelled) {
        return;
    }
    handler.func(stream);
}
//# sourceMappingURL=server.js.map