/*jshint laxcomma: true, smarttabs: true, node: true, esnext: true*/
'use strict'
/**
* Primary server instance for a skyring app.
* @module skyring/lib/server
* @requires http
* @requires debug
* @requires @esaterwhite/micromock
* @requires skyring/lib/server/node
* @requires skyring/lib/server/router
* @requires skyring/lib/timer
*/
const {isFunction} = require('util')
const http = require('http')
const mock = require('@esatterwhite/micromock')
const util = require('util')
const Debug = require('debug')
const routes = require('./api')
const Node = require('./node')
const Router = require('./router')
const Timer = require('../timer')
const conf = require('../../conf')
const debug = Debug('skyring:server')
/**
* @constructor
* @extends http.Server
* @alias module:skyring/lib/server
* @author Eric Satterwhite
* @since 1.0.0
* @param {Object} [options]
* @param {module:skyring/lib/server/node} [options.node] A customer node instance
* @param {String} [optiopns.node.host] host name for the node to listen on - 127.0.0.1 must be used for localhost ( not 0.0.0.0 )
* @param {Number} [options.node.port] Port number for the node to listen on in the ring
* @param {String} [options.node.app=timers] name of the active ring to join
* @param {Object} [options.nats]
* @param {String[]} [options.nats.servers] An array of nats `host:port` addresses to connect to
* @param {String[]|Function[]} [options.transports] an array of custom transport functions, or requireable paths that resolve to functions. All transport function must be named functions
* @example
// Use only configuration values
var server = new Server().listen(5000)
* @example var server = new Server({
node :{
host: 172.17.0.9
, port: 8456
, app: 'payback'
}
, nats: {
servers: ['nats1.domain.com:4222', 'nats2.domain.com:4222']
}
})
server.listen(5000)
* @example // Use a custom node instance
var node = new Node({
host: 172.17.0.9
, port: 8456
, app: 'payback'
})
var server = new Server({ node })
server.listen(5000)
*/
class Server extends http.Server {
constructor(opts = {}){
super((req, res) => {
this._router.handle(req, res)
})
this.closed = false
this.options = Object.assign({}, {
seeds: null
, nats: null
, storage: null
, transports: []
, autobalance: conf.get('autobalance')
}, opts)
/* istanbul ignore else */
if(opts.node) {
this._node = opts.node instanceof Node
? opts.node
: new Node(
opts.node.host,
opts.node.port,
opts.node.name,
opts.node.app
)
} else {
this._node = new Node()
}
this._group = this._node.name
this._node.on('bootstrap', (seeds) => {
this.emit('bootstrap', seeds)
})
}
route(opts) {
const route = this._router.route(opts.path, opts.method, opts.handler)
opts.middleware && route.before( opts.middleware )
debug('loaded: %s %s', opts.method, opts.path)
}
/**
* Joins the node to the configured ring and starts the http server
* @method module:skyring/lib/server#listen
* @param {Number} port Port number to listen on
* @param {String} [host=localhost] host or ip address to listen on
* @param {Number} [backlog]
* @param {Function} [callback] Callback function to call when the server is running
* @return {module:skyring/lib/server}
**/
listen(port, ...args) {
const callback = args[args.length - 1]
if (this.listening) return isFunction(callback) ? callback() : null
debug('seed nodes', this.options.seeds)
this._timers = new Timer({
nats: this.options.nats
, storage: this.options.storage
, transports: this.options.transports
}, (err) => {
if (err) return isFunction(callback) ? callback(err) : null
this._router = new Router(this._node, this._timers)
for (const key of Object.keys(routes)) {
const item = routes[key]
const route = this._router.route(
item.path
, item.method
, item.handler
)
debug('loaded: %s %s', item.method, item.path)
item.middleware && route.before( item.middleware )
}
// When nodes are added / removed exec a rebalanace of local timers
// If this node is not the owner, sent it back in the ring
if (this.options.autobalance) {
this._node.on('ringchange', this._rebalance.bind(this))
}
process.on('SIGUSR2', this._rebalance.bind(this))
// Join the ring
this._node.join(this.options.seeds, (err) => {
/* istanbul ignore if */
if (err) {
return isFunction(callback) ? callback(err) : null
}
// delegate mock requests from the ring to the
// API router
this._node.handle(( req, res ) => {
this._router.handle( req, res )
})
// listen for timers being purged over nats when a remote
// node is evicted or shutdown
this._timers.watch(`skyring:${this._group}`, (err, data) => {
this.proxy(data)
})
debug('binding to port', port)
super.listen(port, ...args)
})
})
return this
}
_rebalance(evt = {}) {
this._timers.rebalance(evt, this._node, (data) => {
this.proxy(data)
})
}
proxy(data) {
debug('fabricating request', data.id)
const opts = {
url: '/timer'
, method: 'POST'
, headers: {
"x-timer-id": data.id
}
, payload: JSON.stringify(data)
}
const res = new mock.Response()
const req = new mock.Request(opts)
debug('routing fabricated request', data.id)
this._router.handle(req, res)
this.emit('proxy', data)
}
/**
* Removes a server from the ring, closes the http server and redistributes
* any pending timers
* @method module:skyring/lib/server#close
* @param {Function} callback A callback to be called when the server is completely shut down
**/
close( cb ){
if(this.closed) return isFunction(cb) ? setImmediate(cb) : null
super.close(() => {
this._node.close(() => {
const active = this._node._ring.membership.members.filter((m) => {
return m.status === 'alive'
})
if (active.length) {
return this._timers.shutdown(() => {
debug('closing server')
this.closed = true
cb && cb()
})
}
debug('Last node in cluster - skipping rebalanace')
this._timers.disconnect(() => {
debug('closing server')
this.closed = true
cb && cb()
})
})
})
}
}
module.exports = Server
module.exports.Router = Router