'use strict'
/**
* Represents a participant in the Hashring
* @module skyring/lib/server/node
* @author Eric Satterwhite
* @since 1.0.0
* @requires path
* @requires events
* @requires dns
* @requires @skyringringpop
* @requires tchannel
* @requires debug
* @requires keef
*/
const path = require('path')
const EventEmitter = require('events').EventEmitter
const dns = require('dns')
const Ringpop = require('@skyring/ringpop')
const TChannel = require('tchannel')
const debug = require('debug')('skyring:ring')
const conf = require(path.join(__dirname, '..', '..', 'conf'))
const host = conf.get('channel:host')
const port = ~~conf.get('channel:port')
let ring_seeds = conf.get('seeds')
ring_seeds = !Array.isArray(ring_seeds) ? ring_seeds.split(',') : ring_seeds
function resolve(tasks, cb) {
const results = []
;(function next() {
if (!tasks.length) return cb(null, results)
const task = tasks.shift()
const [h, p] = task.split(':')
dns.lookup(h, (err, addr) => {
if (err) return cb(err)
results.push(`${addr}${p ? ':' + p : ''}`)
next()
})
})()
}
/**
* @constructor
* @alias module:skyring/lib/server/node
* @param {String} [host] host name for the node to listen on - 127.0.0.1 must be used for localhost ( not 0.0.0.0)
* @param {Number} [port] Port number for the node to listen on in the ring
* @param {String} [name='ringpop'] name of the active ring to join
* @param {String} [app=timers] app name of the active ring
*/
class Node extends EventEmitter {
constructor(h = host, p = port, name = 'ringpop', app = 'timers') {
super()
this._port = p
this._host = host
this._name = name
this._app = app
this._tchannel = new TChannel()
this._ring = null
}
/**
* Does the work of configuring tchannel and joining itself into a ringpop ring
* @method module:skyring/lib/server/node#join
* @param {String[]} [seeds] An array of node addresses to use as boot strapping nodes
* @param {Function} callback Function to call when the node has completed the bootstrap process
* @example node.join(['node-1:5555', '172.10.0.4:4563'], (err) => {
if (err) throw err
})
**/
join(seed_arr, cb) {
const nodes = seed_arr || ring_seeds
if (!Array.isArray(nodes)) {
const err = new TypeError('seeds must be an array')
return cb(err)
}
const addrs = [this._host].concat(nodes)
resolve(addrs, (err, seeds) => {
debug('seed nodes', seeds)
if (err) return cb(err)
const host = seeds.shift()
this._ring = new Ringpop({
app: this._app
, hostPort: `${host}:${this._port}`
, channel: this._tchannel.makeSubChannel({
serviceName: this._name
, trace: false
})
})
this._ring.setupChannel()
this._ring.on('ringChanged', (evt) => {
const added = evt.added
if (!added.length) return debug('node removed', evt.removed)
if (added.length === 1 && added.indexOf(`${host}:${this._port}`) !== -1) return
debug('node added', added)
this.emit('ringchange', evt)
})
this._tchannel.listen(this._port, host, (er) => {
if (er) return cb(er)
debug('tchannel listening on ', host, this._port)
this._ring.bootstrap(seeds, (er) => {
if (er) return cb(er)
debug('ring bootstraped', seeds)
this.emit('bootstrap', seeds)
cb(null)
})
})
})
}
/**
* Adds a request handler to the active ringpop instance
* @method module:skyring/lib/server/node#handle
* @param {Function} handler A request handler for incoming requests from the ring
**/
handle(cb) {
return this._ring.on('request', cb)
}
/**
* Determines if this instance is responsible for a specific key.
* proxies the request if it is not
* @method module:skyring/lib/server/node#handleOrProxy
* @param {String} key The key to use to do a node lookup in the ring
* @param {http.IncomingMessage|module:skyring/lib/server/mock.Request} req an http request object
* @param {http.ServerResponse|module:skyring/lib/server/mock.Response} res an http response object
* @example const handle = node.handleOrProxy('foobar', req, res)
if (!handle) return;
// deal with request
* @return {Boolean}
**/
handleOrProxy(key, req, res) {
return this._ring.handleOrProxy(key, req, res)
}
/**
* Determines if this node is responsible for a specific key
* @method module:skyring/lib/server/node#owns
* @param {String} key The key to use
* @return {Boolean}
**/
owns(key) {
return this.lookup(key) == this._ring.whoami()
}
/**
* Lookup the address of the server responsible for a given key
* @method module:skyring/lib/server/node#lookup
* @param {String} key The key to look up
* @return {String} A server address
**/
lookup(key) {
return this._ring.lookup(key)
}
/**
* Deprecated: use close method
* @deprecated 10.0.0
* @method module:skyring/lib/server/node#leave
* @param {Function} callback Callback function to call when the eviction process is complete
**/
leave(cb) {
this.close(cb)
}
/**
* Removes itself from the ring and closes and connections
* @method module:skyring/lib/server/node#close
* @param {Function} callback A callback function to call when the ring is closed
**/
close(cb) {
debug('node close')
this._ring.selfEvict(() => {
debug('draining tchannel')
this._tchannel.drain('leaving', () => {
debug('destroying ring')
this._ring.once('destroyed', () => {
setTimeout(cb, 100)
})
this._ring.destroy()
})
})
}
}
Object.defineProperty(Node.prototype, 'name', {
get: function() {
/**
* @readonly
* @name name
* @memberof module:skyring/lib/server/node
* @property {String} name The name of the node
**/
return this._app
}
})
module.exports = Node