Source: lib/timer.js

'use strict'
/**
 * Manage Timers on a node
 * @module skyring/lib/timer
 * @author Eric Satterwhite
 * @since 3.0.0
 * @requires os
 * @requires crypto
 * @requires path
 * @requires levelup
 * @requires encoding-down
 * @requires debug
 * @requires skyring/lib/transports
 * @requires skyring/lib/nats
 * @requires skyring/lib/json
 */
const os             = require('os')
const crypto         = require('crypto')
const path           = require('path')
const levelup        = require('levelup')
const encode         = require('encoding-down')
const Transports     = require('./transports')
const nats           = require('./nats')
const json           = require('./json')
const conf           = require('../conf')
const debug          = require('debug')('skyring:timer')
const rebalance      = require('debug')('skyring:rebalance')
const store          = require('debug')('skyring:store')
const storage        = Symbol('storage')
const shutdown       = Symbol.for('kShutdown')
const kNode          = Symbol('nodeid')
const kRemove        = Symbol('remove')
const noop           = () => {}
const REBALANCE_SUB  = 'skyring.rebalance'
const EVENT_STATUS = {
  CREATED:   'create'
, UPDATED:   'replace'
, EXEC:      'execute'
, CANCELLED: 'cancel'
, FAIL:      'fail'
, SUCCESS:   'success'
, SHUTDOWN:  'shutdown'
, READY:     'ready'
, RECOVERY:  'recover'
, REBALANCE: 'rebalance'
, PURGE:     'purge'
, EVICT:     'evict'
}

function generateId(id) {
  if (!id) return crypto.randomBytes(10).toString('hex')
  return crypto.createHash('sha1').update(id).digest('hex')
}
/**
 * Node style callback
 * @typedef {Function} Nodeback
 * @property {?Error} [err] An error instance. If not null, the results should not be trusted
 * @property {Object} result The results of the function execution
 **/

/**
 * @constructor
 * @alias module:skyring/lib/timer
 * @param {Object} [options]
 * @param {Object} [options.nats] Nats connection information
 * @param {String[]} [options.nats.servers] A list of nats `host:port` to connect to
 * @param {Object} [options.storage] Storage config options for level db
 * @param {String[]} [options.storage.backend=memdown] a requireable module name, or absolute path to a leveldb compatible backend. `memdown` and `leveldown` are built in
 * `leveldown` and `memdown` are installed by default
 * @param {String} options.storage.path A directory path to a leveldb instance. One will be created if it doesn't already exist.
 * If the backend is memdown, this is optional and randomly generated per timer instance
 * @param {Function} [onReady=()=>{}] A callback function to call after initial recovery has completed
 * @param {String[]|Function[]} [options.transports] an array of custom transport functions, or requireable paths that resolve to functions. All transport function must be named functions
 * If not specified, configuration values will be used
 **/
class Timer extends Map {
  constructor(options = {}, cb = noop) {
    super()
    this.options = Object.assign({}, {
      nats: null
    , storage: null
    , transports: []
    }, options)
    this._sid = null
    this._bail = false
    const store_opts = conf.get('storage')
    const opts = Object.assign(store_opts, this.options.storage)
    store(opts)
    if (!opts.path) {
      if (opts.backend === 'memdown') {
        this[kNode] = generateId()
        opts.path = path.join(
          os.tmpdir()
        , `skyring-${this[kNode]}`
        )
      } else {
        const err = new Error('storage.path must be set with non memdown backends')
        err.code = 'ENOSTORAGE'
        throw err
      }
    }
    const backend = opts.backend === 'memdown'
      ? new (require(opts.backend))
      : encode(require(opts.backend)(opts.path), {valueEncoding: 'json'})

    debug('storage path', opts)
    this[kNode] = generateId(store_opts.path)
    this.nats = nats.createClient(this.options.nats)
    this.transports = new Transports(this.options.transports)
    this[storage] = levelup(backend, opts, (err) => {
      store('storage backend ready', store_opts)
      debug('node id', this[kNode])
      this.recover(() => {
        this.nats.publish('skyring:node', {
          node: this[kNode]
        , type: EVENT_STATUS.READY
        }, cb)
      })
    })

  }

  get id() {
    return this[kNode]
  }

  /**
   * Sets a new time instance. If The timer has lapsed, it will be executed immediately
   * @method module:skyring/lib/timer#create
   * @param {String} id A unique Id of the time
   * @param {Object} body Configuration options for the timer instance
   * @param {Number} body.timeout the time in milliseconds from now the timer should execute. This must be in the range: 0 < timeout < 2^31 - 1.
   * @param {String} body.data The data to be assicated with the timer, when it is executed
   * @param {Number} [body.created=Date.now()] timestamp when the timer is created. if not set, will default to now
   * @param {Object} callback Options for the outbound transport for the timer when it executes
   * @param {String} callback.transport The transport type ( http, etc )
   * @param {String} transport.method The method the transport should use when executing the timer
   * @param {String} transport.uri The target uri for the transport when the timer executes
   * @param {Nodeback} callback
   * @example
const crypto = require('crypto')
id = crypto.createHash('sha1')
           .update(crypto.randomBytes(10))
           .digest('hex')

const options = {
  timeout: 4000
, data: "this is a payload"
, callback: {
    transport: 'http'
  , method: 'put'
  , uri: 'http://api.domain.com/callback'
  }
}

timers.create(id, options, (err) => {
  if (err) throw err
})
   **/
  create(id, body, cb) {
    const payload = body
    const transport = this.transports.get(payload.callback.transport)
    if (!transport) {
      const err = new Error(`Unknown transport ${payload.callback.transport}`)
      err.code = 'ENOTRANSPORT'
      setImmediate(cb, err)
      return null
    }
    if (this.has( id )) {
      const err = new Error(`Timer with id ${id} already exists`)
      err.code = 'EKEYEXISTS'
      setImmediate(cb, err)
      return null
    }
    const now = Date.now()
    const created = payload.created || now
    const elapsed = now - created
    if(now > created + payload.timeout) {
      debug('executing stale timer')
      setImmediate(
        transport.exec.bind(transport)
      , payload.callback.method
      , payload.callback.uri
      , payload.data
      , id
      , this
      )

      this.nats.publish('skyring:events', {
        type: EVENT_STATUS.EXEC
      , timer: id
      , node: this[kNode]
      , executed: Date.now()
      , created: created
      , payload: payload
      }, noop)

      cb(null, id)
      return null
    }

    const data = {
      created: created
    , id: id
    , payload: payload
    , timer: null
    }

    this[storage].put(id, data, (err) => {

      /* istanbul ignore if */
      if (err) {
        console.error(err)
        cb(err, null)
        return null
      }

      debug('setting timer', id)
      this.nats.publish('skyring:events', {
        type: EVENT_STATUS.CREATED
      , timer: id
      , node: this[kNode]
      , created: data.created
      , payload: payload
       }, noop)

       data.timer = setTimeout(
        transport.exec.bind(transport)
      , payload.timeout - elapsed
      , payload.callback.method
      , payload.callback.uri
      , payload.data
      , id
      , this
      ).unref()
      this.set(id, data)
      cb(null, id)
      return null
    })
  }

  /**
   * Clears the respective timer from storage and publishes a success event via nats
   * @method module:skyring/lib/timer#success
   * @param {String} id the is of the time to acknowledge as delivered successfully
   * @param {Nodeback} [callback] Callback to execute when the acknowledge is complete
   * @example timers.success('2e2f6dad-9678-4caf-bc41-8e62ca07d551')
   **/
  success(id, cb = noop) {
    this[kRemove](id, (err) => {
      this.nats.publish('skyring:events', {
        type: EVENT_STATUS.SUCCESS
      , timer: id
      , node: this[kNode]
      }, cb)
    })
  }

  /**
   * Clears the respective timer from storage and publishes a failure event via nats
   * @method module:skyring/lib/timer#failure
   * @param {String} id the is of the time to acknowledge as delivered successfully
   * @param {Error} error The error object to send with event objects
   * @param {Nodeback} [callback] Callback to execute when the acknowledge is complete
   * @example
const error = Error('Remote server unavailable')
error.code = 'ENOREMOTE'
timers.failure('2e2f6dad-9678-4caf-bc41-8e62ca07d551', error)
   **/
  failure(id, error, cb = noop) {
    this[kRemove](id, (err) => {
      this.nats.publish('skyring:events', {
        type: EVENT_STATUS.FAIL
      , timer: id
      , node: this[kNode]
      , message: error.message
      , stack: error.stack
      , error: error.code || error.name
      }, cb)
    })
  }

  /**
   * Clears the respective timer from storage and publishes a cancelled event via nats
   * @method module:skyring/lib/timer#cancelled
   * @param {String} id the is of the time to acknowledge as delivered successfully
   * @param {Nodeback} [callback] Callback to execute when the acknowledge is complete
   * @example timers.cancel('2e2f6dad-9678-4caf-bc41-8e62ca07d551')
   **/
  cancel(id, cb = noop) {
    this[kRemove](id, (err) => {
      if (err) return cb(err)
      this.nats.publish('skyring:events', {
        type: EVENT_STATUS.CANCELLED
      , timer: id
      , node: this[kNode]
      }, cb)
    })
  }

  [kRemove](id, cb = noop) {
    this[storage].del(id, (err) => {
      if (err) return console.error('unable to purge %s', id, err)
      store('%s purged from storage', id, this.options.storage)
    })
    const rec = this.get(id)

    if(!rec) {
      const err = new Error('Not Found')
      err.code = 'ENOENT'
      setImmediate(cb, err)
      return null
    }

    clearTimeout(rec.timer)
    this.delete(id)
    setImmediate(cb)
    debug('timer cleared', id)
    return null
  }

  rebalance(opts, node, cb = noop) {
    const size = this.size
    const batch = this[storage].batch()

    if(!size) return

    rebalance('node %s begin rebalance; timers: %d', this[kNode], size)
    this.nats.publish('skyring:node', {
      node: this[kNode]
    , type: EVENT_STATUS.REBALANCE
    }, noop)


    const records = this.values()
    const run = ( obj ) => {
      if (node.owns(obj.id)) return

      clearTimeout(obj.timer)
      this.delete(obj.id)
      batch.del(obj.id)

      const data = Object.assign({}, obj.payload, {
        id: obj.id
      , created: obj.created
      })

      rebalance('node %s no longer the owner of %s', this[kNode], obj.id)

      this.nats.publish('skyring:events', {
        node: this[kNode]
      , type: EVENT_STATUS.EVICT
      , timer: obj.id
      }, noop)

      cb(data)
    }

    for(var record of records) {
      run(record)
    }

    batch.write(() => {
      store('node %s rebalance batch delete complete', this[kNode])
    })
  }

  recover(cb = noop) {
    this.nats.publish('skyring:node', {
      node: this[kNode]
    , type: EVENT_STATUS.RECOVERY
    }, noop)

    const fn = (data) => {
      store('recover', data.key)
      const out = Object.assign({}, data.value.payload, {
        id: data.value.id
      , created: data.value.created
      })
      this.create(data.key, out, debug)
    }

    const stream = this[storage].createReadStream()

    stream
    .on('data', fn)
    .once('close', function () {
      debug('recover stream close')
      stream.removeListener('data', fn)
      cb && cb()
    })
  }

  /**
   * Updates a timer inplace
   * @method module:skyring/lib/timer#update
   * @param {String} id A unique Id of the time
   * @param {Object} body Configuration options for the timer instance
   * @param {Number} body.timeout Duration in milisecods to delay execution of the timer
   * @param {String} body.data The data to be assicated with the timer, when it is executed
   * @param {Object} callback Options for the outbound transport for the timer when it executes
   * @param {String} callback.transport The transport type ( http, etc )
   * @param {String} transport.method The method the transport should use when executing the timer
   * @param {String} transport.uri The target uri for the transport when the timer executes
   * @param {Nodeback} callback
   * @example timers.update('0dc5a555-d0f6-49a0-b336-5befb0437288', {
  timeout: 4000
, data: "this is a payload"
, callback: {
    transport: 'http'
  , method: 'put'
  , uri: 'http://api.domain.com/callback'
  }
})
   **/
  update(id, body, cb) {
    this[kRemove](id, (err) => {
      if (err) return cb(err)
      debug('updating timer', id)
      this.create(id, body, cb)
    })
  }

  close(cb){
    this[storage].close(cb)
  }

  disconnect(cb = noop) {
    this[storage].close(noop)
    this.transports[shutdown](() => {
      this.nats.publish('skyring:node', {
        node: this[kNode]
      , type: EVENT_STATUS.SHUTDOWN
      }, noop)

      this.nats.drainSubscription(this._sid, (err) => {
        if (err) return cb(err)
        this.nats.quit(cb)
      })
    })
  }

  /**
   * Triggers timers to be purged from this node canceling all locally pending timers,
   * and distributing them in the ring. It is assumed this node is no longer a ring member
   * @method module:skyring/lib/timer#shutdown
   * @param {Nodeback} callback Node style callback to execute when the function is complete
   **/
  shutdown(cb) {
    const size = this.size
    if (!size) {
      this[storage].close()
      return this.transports[shutdown](() => {
        this.nats.publish('skyring:node', {
          node: this[kNode]
        , type: EVENT_STATUS.SHUTDOWN
        }, noop)

        this.nats.drainSubscription(this._sid, (err) => {
          if (err) return cb(err)
          this.nats.quit(cb)
        })
      })
    }

    let sent = 0
    let acks = 0

    const batch = this[storage].batch()

    this.nats.unsubscribe(this._sid)
    this._sid = null

    const run = (obj) => {
      clearTimeout(obj.timer)
      batch.del(obj.id)
      const data = Object.assign({}, obj.payload, {
        id: obj.id
      , created: obj.created
      , count: ++sent
      })

      this.nats.request(REBALANCE_SUB, data, (reply) => {
        if (++acks === size) {
          return batch.write(() => {
            store('batch delete finished')
            this.disconnect(cb)
          })
        }
        rebalance( '%s of %s processed', acks, data.count, data.id)
      })
    }

    this.nats.publish('skyring:node', {
      node: this[kNode]
    , type: EVENT_STATUS.PURGE
    }, noop)

    for(let record of this.values()) {
      run(record)
    }

    this.clear()
  }

  /**
   * Starts an internal nats queue
   * @method module:skyring/lib/timer#watch
   * @param {String} key The name of the nats queue to create
   * @param {Nodeback} callback Node style callback to execute when the function has finished execution
   **/
  watch(key, cb) {
    if (this._bail) return
    const opts = { queue: key }
    this._sid = this.nats.subscribe(REBALANCE_SUB, opts, (data, reply) => {
      if (reply) this.nats.publish(reply, {node: this[kNode], timer: data.id})
      if(this._bail) return
      cb(null, data)
    })
    return this._sid
  }
}

module.exports = Timer