queue.js 2.7 KB

  1. 'use strict'
  2. module.exports = RunQueue
  3. var validate = require('aproba')
  4. function RunQueue (opts) {
  5. validate('Z|O', [opts])
  6. if (!opts) opts = {}
  7. this.finished = false
  8. this.inflight = 0
  9. this.maxConcurrency = opts.maxConcurrency || 1
  10. this.queued = 0
  11. this.queue = []
  12. this.currentPrio = null
  13. this.currentQueue = null
  14. this.Promise = opts.Promise || global.Promise
  15. this.deferred = {}
  16. }
  17. RunQueue.prototype = {}
  18. RunQueue.prototype.run = function () {
  19. if (arguments.length !== 0) throw new Error('RunQueue.run takes no arguments')
  20. var self = this
  21. var deferred = this.deferred
  22. if (!deferred.promise) {
  23. deferred.promise = new this.Promise(function (resolve, reject) {
  24. deferred.resolve = resolve
  25. deferred.reject = reject
  26. self._runQueue()
  27. })
  28. }
  29. return deferred.promise
  30. }
  31. RunQueue.prototype._runQueue = function () {
  32. var self = this
  33. while ((this.inflight < this.maxConcurrency) && this.queued) {
  34. if (!this.currentQueue || this.currentQueue.length === 0) {
  35. // wait till the current priority is entirely processed before
  36. // starting a new one
  37. if (this.inflight) return
  38. var prios = Object.keys(this.queue)
  39. for (var ii = 0; ii < prios.length; ++ii) {
  40. var prioQueue = this.queue[prios[ii]]
  41. if (prioQueue.length) {
  42. this.currentQueue = prioQueue
  43. this.currentPrio = prios[ii]
  44. break
  45. }
  46. }
  47. }
  48. --this.queued
  49. ++this.inflight
  50. var next = this.currentQueue.shift()
  51. var args = next.args || []
  52. // we explicitly construct a promise here so that queue items can throw
  53. // or immediately return to resolve
  54. var queueEntry = new this.Promise(function (resolve) {
  55. resolve(next.cmd.apply(null, args))
  56. })
  57. queueEntry.then(function () {
  58. --self.inflight
  59. if (self.finished) return
  60. if (self.queued <= 0 && self.inflight <= 0) {
  61. self.finished = true
  62. self.deferred.resolve()
  63. }
  64. self._runQueue()
  65. }, function (err) {
  66. self.finished = true
  67. self.deferred.reject(err)
  68. })
  69. }
  70. }
  71. RunQueue.prototype.add = function (prio, cmd, args) {
  72. if (this.finished) throw new Error("Can't add to a finished queue. Create a new queue.")
  73. if (Math.abs(Math.floor(prio)) !== prio) throw new Error('Priorities must be a positive integer value.')
  74. validate('NFA|NFZ', [prio, cmd, args])
  75. prio = Number(prio)
  76. if (!this.queue[prio]) this.queue[prio] = []
  77. ++this.queued
  78. this.queue[prio].push({cmd: cmd, args: args})
  79. // if this priority is higher than the one we're currently processing,
  80. // switch back to processing its queue.
  81. if (this.currentPrio > prio) {
  82. this.currentQueue = this.queue[prio]
  83. this.currentPrio = prio
  84. }
  85. }