/*jshint laxcomma: true, smarttabs: true, node: true, esnext: true*/ 'use strict' /** * Primary server instance for a skyring app. * @module skyring/lib/server * @requires http * @requires debug * @requires @esaterwhite/micromock * @requires skyring/lib/server/node * @requires skyring/lib/server/router * @requires skyring/lib/timer */ const {isFunction} = require('util') const http = require('http') const mock = require('@esatterwhite/micromock') const util = require('util') const Debug = require('debug') const routes = require('./api') const Node = require('./node') const Router = require('./router') const Timer = require('../timer') const conf = require('../../conf') const debug = Debug('skyring:server') /** * @constructor * @extends http.Server * @alias module:skyring/lib/server * @author Eric Satterwhite * @since 1.0.0 * @param {Object} [options] * @param {module:skyring/lib/server/node} [options.node] A customer node instance * @param {String} [optiopns.node.host] host name for the node to listen on - 127.0.0.1 must be used for localhost ( not 0.0.0.0 ) * @param {Number} [options.node.port] Port number for the node to listen on in the ring * @param {String} [options.node.app=timers] name of the active ring to join * @param {Object} [options.nats] * @param {String[]} [options.nats.servers] An array of nats `host:port` addresses to connect to * @param {String[]|Function[]} [options.transports] an array of custom transport functions, or requireable paths that resolve to functions. All transport function must be named functions * @example // Use only configuration values var server = new Server().listen(5000) * @example var server = new Server({ node :{ host: 172.17.0.9 , port: 8456 , app: 'payback' } , nats: { servers: ['nats1.domain.com:4222', 'nats2.domain.com:4222'] } }) server.listen(5000) * @example // Use a custom node instance var node = new Node({ host: 172.17.0.9 , port: 8456 , app: 'payback' }) var server = new Server({ node }) server.listen(5000) */ class Server extends http.Server { constructor(opts = {}){ super((req, res) => { this._router.handle(req, res) }) this.closed = false this.options = Object.assign({}, { seeds: null , nats: null , storage: null , transports: [] , autobalance: conf.get('autobalance') }, opts) /* istanbul ignore else */ if(opts.node) { this._node = opts.node instanceof Node ? opts.node : new Node( opts.node.host, opts.node.port, opts.node.name, opts.node.app ) } else { this._node = new Node() } this._group = this._node.name this._node.on('bootstrap', (seeds) => { this.emit('bootstrap', seeds) }) } route(opts) { const route = this._router.route(opts.path, opts.method, opts.handler) opts.middleware && route.before( opts.middleware ) debug('loaded: %s %s', opts.method, opts.path) } /** * Joins the node to the configured ring and starts the http server * @method module:skyring/lib/server#listen * @param {Number} port Port number to listen on * @param {String} [host=localhost] host or ip address to listen on * @param {Number} [backlog] * @param {Function} [callback] Callback function to call when the server is running * @return {module:skyring/lib/server} **/ listen(port, ...args) { const callback = args[args.length - 1] if (this.listening) return isFunction(callback) ? callback() : null debug('seed nodes', this.options.seeds) this._timers = new Timer({ nats: this.options.nats , storage: this.options.storage , transports: this.options.transports }, (err) => { if (err) return isFunction(callback) ? callback(err) : null this._router = new Router(this._node, this._timers) for (const key of Object.keys(routes)) { const item = routes[key] const route = this._router.route( item.path , item.method , item.handler ) debug('loaded: %s %s', item.method, item.path) item.middleware && route.before( item.middleware ) } // When nodes are added / removed exec a rebalanace of local timers // If this node is not the owner, sent it back in the ring if (this.options.autobalance) { this._node.on('ringchange', this._rebalance.bind(this)) } process.on('SIGUSR2', this._rebalance.bind(this)) // Join the ring this._node.join(this.options.seeds, (err) => { /* istanbul ignore if */ if (err) { return isFunction(callback) ? callback(err) : null } // delegate mock requests from the ring to the // API router this._node.handle(( req, res ) => { this._router.handle( req, res ) }) // listen for timers being purged over nats when a remote // node is evicted or shutdown this._timers.watch(`skyring:${this._group}`, (err, data) => { this.proxy(data) }) debug('binding to port', port) super.listen(port, ...args) }) }) return this } _rebalance(evt = {}) { this._timers.rebalance(evt, this._node, (data) => { this.proxy(data) }) } proxy(data) { debug('fabricating request', data.id) const opts = { url: '/timer' , method: 'POST' , headers: { "x-timer-id": data.id } , payload: JSON.stringify(data) } const res = new mock.Response() const req = new mock.Request(opts) debug('routing fabricated request', data.id) this._router.handle(req, res) this.emit('proxy', data) } /** * Removes a server from the ring, closes the http server and redistributes * any pending timers * @method module:skyring/lib/server#close * @param {Function} callback A callback to be called when the server is completely shut down **/ close( cb ){ if(this.closed) return isFunction(cb) ? setImmediate(cb) : null super.close(() => { this._node.close(() => { const active = this._node._ring.membership.members.filter((m) => { return m.status === 'alive' }) if (active.length) { return this._timers.shutdown(() => { debug('closing server') this.closed = true cb && cb() }) } debug('Last node in cluster - skipping rebalanace') this._timers.disconnect(() => { debug('closing server') this.closed = true cb && cb() }) }) }) } } module.exports = Server module.exports.Router = Router