'use strict'
/**
* Manage Timers on a node
* @module skyring/lib/timer
* @author Eric Satterwhite
* @since 3.0.0
* @requires os
* @requires crypto
* @requires path
* @requires levelup
* @requires encoding-down
* @requires debug
* @requires skyring/lib/transports
* @requires skyring/lib/nats
* @requires skyring/lib/json
*/
const os = require('os')
const crypto = require('crypto')
const path = require('path')
const levelup = require('levelup')
const encode = require('encoding-down')
const Transports = require('./transports')
const nats = require('./nats')
const json = require('./json')
const conf = require('../conf')
const debug = require('debug')('skyring:timer')
const rebalance = require('debug')('skyring:rebalance')
const store = require('debug')('skyring:store')
const storage = Symbol('storage')
const shutdown = Symbol.for('kShutdown')
const kNode = Symbol('nodeid')
const kRemove = Symbol('remove')
const noop = () => {}
const REBALANCE_SUB = 'skyring.rebalance'
const EVENT_STATUS = {
CREATED: 'create'
, UPDATED: 'replace'
, EXEC: 'execute'
, CANCELLED: 'cancel'
, FAIL: 'fail'
, SUCCESS: 'success'
, SHUTDOWN: 'shutdown'
, READY: 'ready'
, RECOVERY: 'recover'
, REBALANCE: 'rebalance'
, PURGE: 'purge'
, EVICT: 'evict'
}
function generateId(id) {
if (!id) return crypto.randomBytes(10).toString('hex')
return crypto.createHash('sha1').update(id).digest('hex')
}
/**
* Node style callback
* @typedef {Function} Nodeback
* @property {?Error} [err] An error instance. If not null, the results should not be trusted
* @property {Object} result The results of the function execution
**/
/**
* @constructor
* @alias module:skyring/lib/timer
* @param {Object} [options]
* @param {Object} [options.nats] Nats connection information
* @param {String[]} [options.nats.servers] A list of nats `host:port` to connect to
* @param {Object} [options.storage] Storage config options for level db
* @param {String[]} [options.storage.backend=memdown] a requireable module name, or absolute path to a leveldb compatible backend. `memdown` and `leveldown` are built in
* `leveldown` and `memdown` are installed by default
* @param {String} options.storage.path A directory path to a leveldb instance. One will be created if it doesn't already exist.
* If the backend is memdown, this is optional and randomly generated per timer instance
* @param {Function} [onReady=()=>{}] A callback function to call after initial recovery has completed
* @param {String[]|Function[]} [options.transports] an array of custom transport functions, or requireable paths that resolve to functions. All transport function must be named functions
* If not specified, configuration values will be used
**/
class Timer extends Map {
constructor(options = {}, cb = noop) {
super()
this.options = Object.assign({}, {
nats: null
, storage: null
, transports: []
}, options)
this._sid = null
this._bail = false
const store_opts = conf.get('storage')
const opts = Object.assign(store_opts, this.options.storage)
store(opts)
if (!opts.path) {
if (opts.backend === 'memdown') {
this[kNode] = generateId()
opts.path = path.join(
os.tmpdir()
, `skyring-${this[kNode]}`
)
} else {
const err = new Error('storage.path must be set with non memdown backends')
err.code = 'ENOSTORAGE'
throw err
}
}
const backend = opts.backend === 'memdown'
? new (require(opts.backend))
: encode(require(opts.backend)(opts.path), {valueEncoding: 'json'})
debug('storage path', opts)
this[kNode] = generateId(store_opts.path)
this.nats = nats.createClient(this.options.nats)
this.transports = new Transports(this.options.transports)
this[storage] = levelup(backend, opts, (err) => {
store('storage backend ready', store_opts)
debug('node id', this[kNode])
this.recover(() => {
this.nats.publish('skyring:node', {
node: this[kNode]
, type: EVENT_STATUS.READY
}, cb)
})
})
}
get id() {
return this[kNode]
}
/**
* Sets a new time instance. If The timer has lapsed, it will be executed immediately
* @method module:skyring/lib/timer#create
* @param {String} id A unique Id of the time
* @param {Object} body Configuration options for the timer instance
* @param {Number} body.timeout the time in milliseconds from now the timer should execute. This must be in the range: 0 < timeout < 2^31 - 1.
* @param {String} body.data The data to be assicated with the timer, when it is executed
* @param {Number} [body.created=Date.now()] timestamp when the timer is created. if not set, will default to now
* @param {Object} callback Options for the outbound transport for the timer when it executes
* @param {String} callback.transport The transport type ( http, etc )
* @param {String} transport.method The method the transport should use when executing the timer
* @param {String} transport.uri The target uri for the transport when the timer executes
* @param {Nodeback} callback
* @example
const crypto = require('crypto')
id = crypto.createHash('sha1')
.update(crypto.randomBytes(10))
.digest('hex')
const options = {
timeout: 4000
, data: "this is a payload"
, callback: {
transport: 'http'
, method: 'put'
, uri: 'http://api.domain.com/callback'
}
}
timers.create(id, options, (err) => {
if (err) throw err
})
**/
create(id, body, cb) {
const payload = body
const transport = this.transports.get(payload.callback.transport)
if (!transport) {
const err = new Error(`Unknown transport ${payload.callback.transport}`)
err.code = 'ENOTRANSPORT'
setImmediate(cb, err)
return null
}
if (this.has( id )) {
const err = new Error(`Timer with id ${id} already exists`)
err.code = 'EKEYEXISTS'
setImmediate(cb, err)
return null
}
const now = Date.now()
const created = payload.created || now
const elapsed = now - created
if(now > created + payload.timeout) {
debug('executing stale timer')
setImmediate(
transport.exec.bind(transport)
, payload.callback.method
, payload.callback.uri
, payload.data
, id
, this
)
this.nats.publish('skyring:events', {
type: EVENT_STATUS.EXEC
, timer: id
, node: this[kNode]
, executed: Date.now()
, created: created
, payload: payload
}, noop)
cb(null, id)
return null
}
const data = {
created: created
, id: id
, payload: payload
, timer: null
}
this[storage].put(id, data, (err) => {
/* istanbul ignore if */
if (err) {
console.error(err)
cb(err, null)
return null
}
debug('setting timer', id)
this.nats.publish('skyring:events', {
type: EVENT_STATUS.CREATED
, timer: id
, node: this[kNode]
, created: data.created
, payload: payload
}, noop)
data.timer = setTimeout(
transport.exec.bind(transport)
, payload.timeout - elapsed
, payload.callback.method
, payload.callback.uri
, payload.data
, id
, this
).unref()
this.set(id, data)
cb(null, id)
return null
})
}
/**
* Clears the respective timer from storage and publishes a success event via nats
* @method module:skyring/lib/timer#success
* @param {String} id the is of the time to acknowledge as delivered successfully
* @param {Nodeback} [callback] Callback to execute when the acknowledge is complete
* @example timers.success('2e2f6dad-9678-4caf-bc41-8e62ca07d551')
**/
success(id, cb = noop) {
this[kRemove](id, (err) => {
this.nats.publish('skyring:events', {
type: EVENT_STATUS.SUCCESS
, timer: id
, node: this[kNode]
}, cb)
})
}
/**
* Clears the respective timer from storage and publishes a failure event via nats
* @method module:skyring/lib/timer#failure
* @param {String} id the is of the time to acknowledge as delivered successfully
* @param {Error} error The error object to send with event objects
* @param {Nodeback} [callback] Callback to execute when the acknowledge is complete
* @example
const error = Error('Remote server unavailable')
error.code = 'ENOREMOTE'
timers.failure('2e2f6dad-9678-4caf-bc41-8e62ca07d551', error)
**/
failure(id, error, cb = noop) {
this[kRemove](id, (err) => {
this.nats.publish('skyring:events', {
type: EVENT_STATUS.FAIL
, timer: id
, node: this[kNode]
, message: error.message
, stack: error.stack
, error: error.code || error.name
}, cb)
})
}
/**
* Clears the respective timer from storage and publishes a cancelled event via nats
* @method module:skyring/lib/timer#cancelled
* @param {String} id the is of the time to acknowledge as delivered successfully
* @param {Nodeback} [callback] Callback to execute when the acknowledge is complete
* @example timers.cancel('2e2f6dad-9678-4caf-bc41-8e62ca07d551')
**/
cancel(id, cb = noop) {
this[kRemove](id, (err) => {
if (err) return cb(err)
this.nats.publish('skyring:events', {
type: EVENT_STATUS.CANCELLED
, timer: id
, node: this[kNode]
}, cb)
})
}
[kRemove](id, cb = noop) {
this[storage].del(id, (err) => {
if (err) return console.error('unable to purge %s', id, err)
store('%s purged from storage', id, this.options.storage)
})
const rec = this.get(id)
if(!rec) {
const err = new Error('Not Found')
err.code = 'ENOENT'
setImmediate(cb, err)
return null
}
clearTimeout(rec.timer)
this.delete(id)
setImmediate(cb)
debug('timer cleared', id)
return null
}
rebalance(opts, node, cb = noop) {
const size = this.size
const batch = this[storage].batch()
if(!size) return
rebalance('node %s begin rebalance; timers: %d', this[kNode], size)
this.nats.publish('skyring:node', {
node: this[kNode]
, type: EVENT_STATUS.REBALANCE
}, noop)
const records = this.values()
const run = ( obj ) => {
if (node.owns(obj.id)) return
clearTimeout(obj.timer)
this.delete(obj.id)
batch.del(obj.id)
const data = Object.assign({}, obj.payload, {
id: obj.id
, created: obj.created
})
rebalance('node %s no longer the owner of %s', this[kNode], obj.id)
this.nats.publish('skyring:events', {
node: this[kNode]
, type: EVENT_STATUS.EVICT
, timer: obj.id
}, noop)
cb(data)
}
for(var record of records) {
run(record)
}
batch.write(() => {
store('node %s rebalance batch delete complete', this[kNode])
})
}
recover(cb = noop) {
this.nats.publish('skyring:node', {
node: this[kNode]
, type: EVENT_STATUS.RECOVERY
}, noop)
const fn = (data) => {
store('recover', data.key)
const out = Object.assign({}, data.value.payload, {
id: data.value.id
, created: data.value.created
})
this.create(data.key, out, debug)
}
const stream = this[storage].createReadStream()
stream
.on('data', fn)
.once('close', function () {
debug('recover stream close')
stream.removeListener('data', fn)
cb && cb()
})
}
/**
* Updates a timer inplace
* @method module:skyring/lib/timer#update
* @param {String} id A unique Id of the time
* @param {Object} body Configuration options for the timer instance
* @param {Number} body.timeout Duration in milisecods to delay execution of the timer
* @param {String} body.data The data to be assicated with the timer, when it is executed
* @param {Object} callback Options for the outbound transport for the timer when it executes
* @param {String} callback.transport The transport type ( http, etc )
* @param {String} transport.method The method the transport should use when executing the timer
* @param {String} transport.uri The target uri for the transport when the timer executes
* @param {Nodeback} callback
* @example timers.update('0dc5a555-d0f6-49a0-b336-5befb0437288', {
timeout: 4000
, data: "this is a payload"
, callback: {
transport: 'http'
, method: 'put'
, uri: 'http://api.domain.com/callback'
}
})
**/
update(id, body, cb) {
this[kRemove](id, (err) => {
if (err) return cb(err)
debug('updating timer', id)
this.create(id, body, cb)
})
}
close(cb){
this[storage].close(cb)
}
disconnect(cb = noop) {
this[storage].close(noop)
this.transports[shutdown](() => {
this.nats.publish('skyring:node', {
node: this[kNode]
, type: EVENT_STATUS.SHUTDOWN
}, noop)
this.nats.drainSubscription(this._sid, (err) => {
if (err) return cb(err)
this.nats.quit(cb)
})
})
}
/**
* Triggers timers to be purged from this node canceling all locally pending timers,
* and distributing them in the ring. It is assumed this node is no longer a ring member
* @method module:skyring/lib/timer#shutdown
* @param {Nodeback} callback Node style callback to execute when the function is complete
**/
shutdown(cb) {
const size = this.size
if (!size) {
this[storage].close()
return this.transports[shutdown](() => {
this.nats.publish('skyring:node', {
node: this[kNode]
, type: EVENT_STATUS.SHUTDOWN
}, noop)
this.nats.drainSubscription(this._sid, (err) => {
if (err) return cb(err)
this.nats.quit(cb)
})
})
}
let sent = 0
let acks = 0
const batch = this[storage].batch()
this.nats.unsubscribe(this._sid)
this._sid = null
const run = (obj) => {
clearTimeout(obj.timer)
batch.del(obj.id)
const data = Object.assign({}, obj.payload, {
id: obj.id
, created: obj.created
, count: ++sent
})
this.nats.request(REBALANCE_SUB, data, (reply) => {
if (++acks === size) {
return batch.write(() => {
store('batch delete finished')
this.disconnect(cb)
})
}
rebalance( '%s of %s processed', acks, data.count, data.id)
})
}
this.nats.publish('skyring:node', {
node: this[kNode]
, type: EVENT_STATUS.PURGE
}, noop)
for(let record of this.values()) {
run(record)
}
this.clear()
}
/**
* Starts an internal nats queue
* @method module:skyring/lib/timer#watch
* @param {String} key The name of the nats queue to create
* @param {Nodeback} callback Node style callback to execute when the function has finished execution
**/
watch(key, cb) {
if (this._bail) return
const opts = { queue: key }
this._sid = this.nats.subscribe(REBALANCE_SUB, opts, (data, reply) => {
if (reply) this.nats.publish(reply, {node: this[kNode], timer: data.id})
if(this._bail) return
cb(null, data)
})
return this._sid
}
}
module.exports = Timer