| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 | 'use strict'/* eslint-disable no-var */var reusify = require('reusify')function fastqueue (context, worker, concurrency) {  if (typeof context === 'function') {    concurrency = worker    worker = context    context = null  }  if (concurrency < 1) {    throw new Error('fastqueue concurrency must be greater than 1')  }  var cache = reusify(Task)  var queueHead = null  var queueTail = null  var _running = 0  var errorHandler = null  var self = {    push: push,    drain: noop,    saturated: noop,    pause: pause,    paused: false,    concurrency: concurrency,    running: running,    resume: resume,    idle: idle,    length: length,    getQueue: getQueue,    unshift: unshift,    empty: noop,    kill: kill,    killAndDrain: killAndDrain,    error: error  }  return self  function running () {    return _running  }  function pause () {    self.paused = true  }  function length () {    var current = queueHead    var counter = 0    while (current) {      current = current.next      counter++    }    return counter  }  function getQueue () {    var current = queueHead    var tasks = []    while (current) {      tasks.push(current.value)      current = current.next    }    return tasks  }  function resume () {    if (!self.paused) return    self.paused = false    for (var i = 0; i < self.concurrency; i++) {      _running++      release()    }  }  function idle () {    return _running === 0 && self.length() === 0  }  function push (value, done) {    var current = cache.get()    current.context = context    current.release = release    current.value = value    current.callback = done || noop    current.errorHandler = errorHandler    if (_running === self.concurrency || self.paused) {      if (queueTail) {        queueTail.next = current        queueTail = current      } else {        queueHead = current        queueTail = current        self.saturated()      }    } else {      _running++      worker.call(context, current.value, current.worked)    }  }  function unshift (value, done) {    var current = cache.get()    current.context = context    current.release = release    current.value = value    current.callback = done || noop    if (_running === self.concurrency || self.paused) {      if (queueHead) {        current.next = queueHead        queueHead = current      } else {        queueHead = current        queueTail = current        self.saturated()      }    } else {      _running++      worker.call(context, current.value, current.worked)    }  }  function release (holder) {    if (holder) {      cache.release(holder)    }    var next = queueHead    if (next) {      if (!self.paused) {        if (queueTail === queueHead) {          queueTail = null        }        queueHead = next.next        next.next = null        worker.call(context, next.value, next.worked)        if (queueTail === null) {          self.empty()        }      } else {        _running--      }    } else if (--_running === 0) {      self.drain()    }  }  function kill () {    queueHead = null    queueTail = null    self.drain = noop  }  function killAndDrain () {    queueHead = null    queueTail = null    self.drain()    self.drain = noop  }  function error (handler) {    errorHandler = handler  }}function noop () {}function Task () {  this.value = null  this.callback = noop  this.next = null  this.release = noop  this.context = null  this.errorHandler = null  var self = this  this.worked = function worked (err, result) {    var callback = self.callback    var errorHandler = self.errorHandler    var val = self.value    self.value = null    self.callback = noop    if (self.errorHandler) {      errorHandler(err, val)    }    callback.call(self.context, err, result)    self.release(self)  }}function queueAsPromised (context, worker, concurrency) {  if (typeof context === 'function') {    concurrency = worker    worker = context    context = null  }  function asyncWrapper (arg, cb) {    worker.call(this, arg)      .then(function (res) {        cb(null, res)      }, cb)  }  var queue = fastqueue(context, asyncWrapper, concurrency)  var pushCb = queue.push  var unshiftCb = queue.unshift  queue.push = push  queue.unshift = unshift  queue.drained = drained  return queue  function push (value) {    var p = new Promise(function (resolve, reject) {      pushCb(value, function (err, result) {        if (err) {          reject(err)          return        }        resolve(result)      })    })    // Let's fork the promise chain to    // make the error bubble up to the user but    // not lead to a unhandledRejection    p.catch(noop)    return p  }  function unshift (value) {    var p = new Promise(function (resolve, reject) {      unshiftCb(value, function (err, result) {        if (err) {          reject(err)          return        }        resolve(result)      })    })    // Let's fork the promise chain to    // make the error bubble up to the user but    // not lead to a unhandledRejection    p.catch(noop)    return p  }  function drained () {    var previousDrain = queue.drain    var p = new Promise(function (resolve) {      queue.drain = function () {        previousDrain()        resolve()      }    })    return p  }}module.exports = fastqueuemodule.exports.promise = queueAsPromised
 |