farm.js 9.46 KB
'use strict'

const DEFAULT_OPTIONS = {
          workerOptions               : {}
        , maxCallsPerWorker           : Infinity
        , maxConcurrentWorkers        : (require('os').cpus() || { length: 1 }).length
        , maxConcurrentCallsPerWorker : 10
        , maxConcurrentCalls          : Infinity
        , maxCallTime                 : Infinity // exceed this and the whole worker is terminated
        , maxRetries                  : Infinity
        , forcedKillTime              : 100
        , autoStart                   : false
        , onChild                     : function() {}
      }

const fork                    = require('./fork')
    , TimeoutError            = require('errno').create('TimeoutError')
    , ProcessTerminatedError  = require('errno').create('ProcessTerminatedError')
    , MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError')


function Farm (options, path) {
  this.options     = Object.assign({}, DEFAULT_OPTIONS, options)
  this.path        = path
  this.activeCalls = 0
}


// make a handle to pass back in the form of an external API
Farm.prototype.mkhandle = function (method) {
  return function () {
    let args = Array.prototype.slice.call(arguments)
    if (this.activeCalls + this.callQueue.length >= this.options.maxConcurrentCalls) {
      let err = new MaxConcurrentCallsError('Too many concurrent calls (active: ' + this.activeCalls + ', queued: ' + this.callQueue.length + ')')
      if (typeof args[args.length - 1] == 'function')
        return process.nextTick(args[args.length - 1].bind(null, err))
      throw err
    }
    this.addCall({
        method   : method
      , callback : args.pop()
      , args     : args
      , retries  : 0
    })
  }.bind(this)
}


// a constructor of sorts
Farm.prototype.setup = function (methods) {
  let iface
  if (!methods) { // single-function export
    iface = this.mkhandle()
  } else { // multiple functions on the export
    iface = {}
    methods.forEach(function (m) {
      iface[m] = this.mkhandle(m)
    }.bind(this))
  }

  this.searchStart    = -1
  this.childId        = -1
  this.children       = {}
  this.activeChildren = 0
  this.callQueue      = []

  if (this.options.autoStart) {
    while (this.activeChildren < this.options.maxConcurrentWorkers)
      this.startChild()
  }

  return iface
}


// when a child exits, check if there are any outstanding jobs and requeue them
Farm.prototype.onExit = function (childId) {
  // delay this to give any sends a chance to finish
  setTimeout(function () {
    let doQueue = false
    if (this.children[childId] && this.children[childId].activeCalls) {
      this.children[childId].calls.forEach(function (call, i) {
        if (!call) return
        else if (call.retries >= this.options.maxRetries) {
          this.receive({
              idx   : i
            , child : childId
            , args  : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ]
          })
        } else {
          call.retries++
          this.callQueue.unshift(call)
          doQueue = true
        }
      }.bind(this))
    }
    this.stopChild(childId)
    doQueue && this.processQueue()
  }.bind(this), 10)
}


// start a new worker
Farm.prototype.startChild = function () {
  this.childId++

  let forked = fork(this.path, this.options.workerOptions)
    , id     = this.childId
    , c      = {
          send        : forked.send
        , child       : forked.child
        , calls       : []
        , activeCalls : 0
        , exitCode    : null
      }

  this.options.onChild(forked.child);

  forked.child.on('message', function(data) {
    if (data.owner !== 'farm') {
      return;
    }
    this.receive(data);
  }.bind(this))
  forked.child.once('exit', function (code) {
    c.exitCode = code
    this.onExit(id)
  }.bind(this))

  this.activeChildren++
  this.children[id] = c
}


// stop a worker, identified by id
Farm.prototype.stopChild = function (childId) {
  let child = this.children[childId]
  if (child) {
    child.send({owner: 'farm', event: 'die'})
    setTimeout(function () {
      if (child.exitCode === null)
        child.child.kill('SIGKILL')
    }, this.options.forcedKillTime).unref()
    ;delete this.children[childId]
    this.activeChildren--
  }
}


// called from a child process, the data contains information needed to
// look up the child and the original call so we can invoke the callback
Farm.prototype.receive = function (data) {
  let idx     = data.idx
    , childId = data.child
    , args    = data.args
    , child   = this.children[childId]
    , call

  if (!child) {
    return console.error(
        'Worker Farm: Received message for unknown child. '
      + 'This is likely as a result of premature child death, '
      + 'the operation will have been re-queued.'
    )
  }

  call = child.calls[idx]
  if (!call) {
    return console.error(
        'Worker Farm: Received message for unknown index for existing child. '
      + 'This should not happen!'
    )
  }

  if (this.options.maxCallTime !== Infinity)
    clearTimeout(call.timer)

  if (args[0] && args[0].$error == '$error') {
    let e = args[0]
    switch (e.type) {
      case 'TypeError': args[0] = new TypeError(e.message); break
      case 'RangeError': args[0] = new RangeError(e.message); break
      case 'EvalError': args[0] = new EvalError(e.message); break
      case 'ReferenceError': args[0] = new ReferenceError(e.message); break
      case 'SyntaxError': args[0] = new SyntaxError(e.message); break
      case 'URIError': args[0] = new URIError(e.message); break
      default: args[0] = new Error(e.message)
    }
    args[0].type = e.type
    args[0].stack = e.stack

    // Copy any custom properties to pass it on.
    Object.keys(e).forEach(function(key) {
      args[0][key] = e[key];
    });
  }

  process.nextTick(function () {
    call.callback.apply(null, args)
  })

  ;delete child.calls[idx]
  child.activeCalls--
  this.activeCalls--

  if (child.calls.length >= this.options.maxCallsPerWorker
      && !Object.keys(child.calls).length) {
    // this child has finished its run, kill it
    this.stopChild(childId)
  }

  // allow any outstanding calls to be processed
  this.processQueue()
}


Farm.prototype.childTimeout = function (childId) {
  let child = this.children[childId]
    , i

  if (!child)
    return

  for (i in child.calls) {
    this.receive({
        idx   : i
      , child : childId
      , args  : [ new TimeoutError('worker call timed out!') ]
    })
  }
  this.stopChild(childId)
}


// send a call to a worker, identified by id
Farm.prototype.send = function (childId, call) {
  let child = this.children[childId]
    , idx   = child.calls.length

  child.calls.push(call)
  child.activeCalls++
  this.activeCalls++

  child.send({
      owner  : 'farm'
    , idx    : idx
    , child  : childId
    , method : call.method
    , args   : call.args
  })

  if (this.options.maxCallTime !== Infinity) {
    call.timer =
      setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime)
  }
}


// a list of active worker ids, in order, but the starting offset is
// shifted each time this method is called, so we work our way through
// all workers when handing out jobs
Farm.prototype.childKeys = function () {
  let cka = Object.keys(this.children)
    , cks

  if (this.searchStart >= cka.length - 1)
    this.searchStart = 0
  else
    this.searchStart++

  cks = cka.splice(0, this.searchStart)

  return cka.concat(cks)
}


// Calls are added to a queue, this processes the queue and is called
// whenever there might be a chance to send more calls to the workers.
// The various options all impact on when we're able to send calls,
// they may need to be kept in a queue until a worker is ready.
Farm.prototype.processQueue = function () {
  let cka, i = 0, childId

  if (!this.callQueue.length)
    return this.ending && this.end()

  if (this.activeChildren < this.options.maxConcurrentWorkers)
    this.startChild()

  for (cka = this.childKeys(); i < cka.length; i++) {
    childId = +cka[i]
    if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
        && this.children[childId].calls.length < this.options.maxCallsPerWorker) {

      this.send(childId, this.callQueue.shift())
      if (!this.callQueue.length)
        return this.ending && this.end()
    } /*else {
      console.log(
        , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
        , this.children[childId].calls.length < this.options.maxCallsPerWorker
        , this.children[childId].calls.length , this.options.maxCallsPerWorker)
    }*/
  }

  if (this.ending)
    this.end()
}


// add a new call to the call queue, then trigger a process of the queue
Farm.prototype.addCall = function (call) {
  if (this.ending)
    return this.end() // don't add anything new to the queue
  this.callQueue.push(call)
  this.processQueue()
}


// kills child workers when they're all done
Farm.prototype.end = function (callback) {
  let complete = true
  if (this.ending === false)
    return
  if (callback)
    this.ending = callback
  else if (this.ending == null)
    this.ending = true
  Object.keys(this.children).forEach(function (child) {
    if (!this.children[child])
      return
    if (!this.children[child].activeCalls)
      this.stopChild(child)
    else
      complete = false
  }.bind(this))

  if (complete && typeof this.ending == 'function') {
    process.nextTick(function () {
      this.ending()
      this.ending = false
    }.bind(this))
  }
}


module.exports              = Farm
module.exports.TimeoutError = TimeoutError