框架
版本
防抖器 API 参考
节流器 API 参考
速率限制器 API 参考
队列 API 参考
批处理器 API 参考

异步队列指南

注意: 队列指南中的所有核心队列概念同样适用于 AsyncQueuer。AsyncQueuer 在这些概念的基础上扩展了高级功能,例如并发(一次处理多个任务)和强大的错误处理。如果您是队列新手,请先阅读队列指南,了解 FIFO/LIFO、优先级、过期、拒绝和队列管理。本指南侧重于 AsyncQueuer 在异步和并发任务处理方面的独特性和强大之处。

虽然 Queuer 提供了带有时间控制的同步队列,但 AsyncQueuer 专门用于处理并发异步操作。它实现了传统意义上的“任务池”或“工作池”模式,允许同时处理多个操作,同时保持对并发和时机的控制。其实现大部分复制自 Tanner 自 2017 年以来一直服务于 JavaScript 社区的原始任务池工具 Swimmer

异步队列概念

异步队列通过增加并发处理能力来扩展基本队列概念。异步队列不是一次处理一个项,而是可以同时处理多个项,同时仍保持执行的顺序和控制。这在处理 I/O 操作、网络请求或任何大部分时间都在等待而不是消耗 CPU 的任务时特别有用。

异步队列可视化

text
Async Queuing (concurrency: 2, wait: 2 ticks)
Timeline: [1 second per tick]
Calls:        ⬇️  ⬇️  ⬇️  ⬇️     ⬇️  ⬇️     ⬇️
Queue:       [ABC]   [C]    [CDE]    [E]    []
Active:      [A,B]   [B,C]  [C,D]    [D,E]  [E]
Completed:    -       A      B        C      D,E
             [=================================================================]
             ^ Unlike regular queuing, multiple items
               can be processed concurrently

             [Items queue up]   [Process 2 at once]   [Complete]
              when busy         with wait between      all items
Async Queuing (concurrency: 2, wait: 2 ticks)
Timeline: [1 second per tick]
Calls:        ⬇️  ⬇️  ⬇️  ⬇️     ⬇️  ⬇️     ⬇️
Queue:       [ABC]   [C]    [CDE]    [E]    []
Active:      [A,B]   [B,C]  [C,D]    [D,E]  [E]
Completed:    -       A      B        C      D,E
             [=================================================================]
             ^ Unlike regular queuing, multiple items
               can be processed concurrently

             [Items queue up]   [Process 2 at once]   [Complete]
              when busy         with wait between      all items

何时使用异步队列

当您需要以下功能时,异步队列特别有效:

  • 并发处理多个异步操作
  • 控制同时操作的数量
  • 处理具有适当错误处理的基于 Promise 的任务
  • 在最大化吞吐量的同时保持顺序
  • 处理可以并行运行的后台任务

何时不使用异步队列

AsyncQueuer 非常通用,可以在许多情况下使用。如果您不需要并发处理,请改用队列。如果您不需要处理所有排队的执行,请改用节流

如果您想将操作分组,请改用批处理

TanStack Pacer 中的异步队列

TanStack Pacer 通过简单的 asyncQueue 函数和更强大的 AsyncQueuer 类提供异步队列。与核心队列指南一样,支持所有队列类型和排序策略(FIFO、LIFO、优先级等)。

使用 asyncQueue 的基本用法

asyncQueue 函数提供了一种创建始终运行的异步队列的简单方法

ts
import { asyncQueue } from '@tanstack/pacer'

// Create a queue that processes up to 2 items concurrently
const processItems = asyncQueue(
  async (item: number) => {
    // Process each item asynchronously
    const result = await fetchData(item)
    return result
  },
  {
    concurrency: 2,
    onItemsChange: (queuer) => {
      console.log('Active tasks:', queuer.peekActiveItems().length)
    }
  }
)

// Add items to be processed
processItems(1)
processItems(2)
import { asyncQueue } from '@tanstack/pacer'

// Create a queue that processes up to 2 items concurrently
const processItems = asyncQueue(
  async (item: number) => {
    // Process each item asynchronously
    const result = await fetchData(item)
    return result
  },
  {
    concurrency: 2,
    onItemsChange: (queuer) => {
      console.log('Active tasks:', queuer.peekActiveItems().length)
    }
  }
)

// Add items to be processed
processItems(1)
processItems(2)

要更全面地控制队列,请直接使用 AsyncQueuer 类。

使用 AsyncQueuer 类的进阶用法

AsyncQueuer 类提供了对异步队列行为的完全控制,包括所有核心队列功能,以及:

  • 并发:一次处理多个项(可通过 concurrency 配置)
  • 异步错误处理:每个任务和全局错误回调,并控制错误传播
  • 活动和待处理任务跟踪:监控哪些任务正在运行,哪些任务已排队
  • 异步专用回调: onSuccessonErroronSettled 等。
ts
import { AsyncQueuer } from '@tanstack/pacer'

const queue = new AsyncQueuer(
  async (item: number) => {
    // Process each item asynchronously
    const result = await fetchData(item)
    return result
  },
  {
    concurrency: 2, // Process 2 items at once
    wait: 1000,     // Wait 1 second between starting new items
    started: true,  // Start processing immediately
    key: 'data-processor' // Identify this queuer in devtools
  }
)

// Add error and success handlers via options
queue.setOptions({
  onError: (error, item, queuer) => {
    console.error('Task failed:', error)
    console.log('Failed item:', item)
    // You can access queue state here
    console.log('Error count:', queuer.store.state.errorCount)
  },
  onSuccess: (result, item, queuer) => {
    console.log('Task completed:', result)
    console.log('Completed item:', item)
    // You can access queue state here
    console.log('Success count:', queuer.store.state.successCount)
  },
  onSettled: (item, queuer) => {
    // Called after each execution (success or failure)
    console.log('Task settled:', item)
    console.log('Total settled:', queuer.store.state.settledCount)
  }
})

// Add items to be processed
queue.addItem(1)
queue.addItem(2)
import { AsyncQueuer } from '@tanstack/pacer'

const queue = new AsyncQueuer(
  async (item: number) => {
    // Process each item asynchronously
    const result = await fetchData(item)
    return result
  },
  {
    concurrency: 2, // Process 2 items at once
    wait: 1000,     // Wait 1 second between starting new items
    started: true,  // Start processing immediately
    key: 'data-processor' // Identify this queuer in devtools
  }
)

// Add error and success handlers via options
queue.setOptions({
  onError: (error, item, queuer) => {
    console.error('Task failed:', error)
    console.log('Failed item:', item)
    // You can access queue state here
    console.log('Error count:', queuer.store.state.errorCount)
  },
  onSuccess: (result, item, queuer) => {
    console.log('Task completed:', result)
    console.log('Completed item:', item)
    // You can access queue state here
    console.log('Success count:', queuer.store.state.successCount)
  },
  onSettled: (item, queuer) => {
    // Called after each execution (success or failure)
    console.log('Task settled:', item)
    console.log('Total settled:', queuer.store.state.settledCount)
  }
})

// Add items to be processed
queue.addItem(1)
queue.addItem(2)

异步专用功能

支持所有队列类型和排序策略(FIFO、LIFO、优先级等)——有关详细信息,请参阅队列指南。AsyncQueuer 增加了:

  • 并发:可以同时处理多个项,由 concurrency 选项控制(可以是动态的)。
  • 异步错误处理:使用 onErroronSuccessonSettled 进行强大的错误和结果跟踪。
  • 活动和待处理任务跟踪:使用 peekActiveItems()peekPendingItems() 监控队列状态。
  • 异步过期和拒绝:项可以像在核心队列指南中一样过期或被拒绝,但带有异步专用回调。

示例:优先异步队列

ts
const priorityQueue = new AsyncQueuer(
  async (item: { value: string; priority: number }) => {
    // Process each item asynchronously
    return await processTask(item.value)
  },
  {
    concurrency: 2,
    getPriority: (item) => item.priority // Higher numbers have priority
  }
)

priorityQueue.addItem({ value: 'low', priority: 1 })
priorityQueue.addItem({ value: 'high', priority: 3 })
priorityQueue.addItem({ value: 'medium', priority: 2 })
// Processes: high and medium concurrently, then low
const priorityQueue = new AsyncQueuer(
  async (item: { value: string; priority: number }) => {
    // Process each item asynchronously
    return await processTask(item.value)
  },
  {
    concurrency: 2,
    getPriority: (item) => item.priority // Higher numbers have priority
  }
)

priorityQueue.addItem({ value: 'low', priority: 1 })
priorityQueue.addItem({ value: 'high', priority: 3 })
priorityQueue.addItem({ value: 'medium', priority: 2 })
// Processes: high and medium concurrently, then low

示例:错误处理

ts
const queue = new AsyncQueuer(
  async (item: number) => {
    // Process each item asynchronously
    if (item < 0) throw new Error('Negative item')
    return await processTask(item)
  },
  {
    onError: (error, item, queuer) => {
      console.error('Task failed:', error)
      console.log('Failed item:', item)
      // You can access queue state here
      console.log('Error count:', queuer.store.state.errorCount)
    },
    throwOnError: true, // Will throw errors even with onError handler
    onSuccess: (result, item, queuer) => {
      console.log('Task succeeded:', result)
      console.log('Succeeded item:', item)
      // You can access queue state here
      console.log('Success count:', queuer.store.state.successCount)
    },
    onSettled: (item, queuer) => {
      // Called after each execution (success or failure)
      console.log('Task settled:', item)
      console.log('Total settled:', queuer.store.state.settledCount)
    }
  }
)

queue.addItem(-1) // Will trigger error handling
queue.addItem(2)
const queue = new AsyncQueuer(
  async (item: number) => {
    // Process each item asynchronously
    if (item < 0) throw new Error('Negative item')
    return await processTask(item)
  },
  {
    onError: (error, item, queuer) => {
      console.error('Task failed:', error)
      console.log('Failed item:', item)
      // You can access queue state here
      console.log('Error count:', queuer.store.state.errorCount)
    },
    throwOnError: true, // Will throw errors even with onError handler
    onSuccess: (result, item, queuer) => {
      console.log('Task succeeded:', result)
      console.log('Succeeded item:', item)
      // You can access queue state here
      console.log('Success count:', queuer.store.state.successCount)
    },
    onSettled: (item, queuer) => {
      // Called after each execution (success or failure)
      console.log('Task settled:', item)
      console.log('Total settled:', queuer.store.state.settledCount)
    }
  }
)

queue.addItem(-1) // Will trigger error handling
queue.addItem(2)

示例:动态并发

ts
const queue = new AsyncQueuer(
  async (item: number) => {
    // Process each item asynchronously
    return await processTask(item)
  },
  {
    // Dynamic concurrency based on system load
    concurrency: (queuer) => {
      return Math.max(1, 4 - queuer.store.state.activeItems.length)
    },
    // Dynamic wait time based on queue size
    wait: (queuer) => {
      return queuer.store.state.size > 10 ? 2000 : 1000
    }
  }
)
const queue = new AsyncQueuer(
  async (item: number) => {
    // Process each item asynchronously
    return await processTask(item)
  },
  {
    // Dynamic concurrency based on system load
    concurrency: (queuer) => {
      return Math.max(1, 4 - queuer.store.state.activeItems.length)
    },
    // Dynamic wait time based on queue size
    wait: (queuer) => {
      return queuer.store.state.size > 10 ? 2000 : 1000
    }
  }
)

队列管理和监控

AsyncQueuer 提供所有核心队列指南中的队列管理和监控方法,以及特定于异步的方法。

  • peekActiveItems() — 当前正在处理的项
  • peekPendingItems() — 等待处理的项
  • queuer.store.state.successCountqueuer.store.state.errorCountqueuer.store.state.settledCount — 执行统计
  • queuer.store.state.activeItems — 当前正在处理的项数组
  • queuer.store.state.size — 当前队列大小
  • start()stop()clear()reset()flush() 等。

有关队列管理的更多信息,请参阅队列指南

任务过期和拒绝

AsyncQueuer 支持与核心队列相同的过期和拒绝功能。

  • 使用 expirationDurationgetIsExpiredonExpire 来处理任务过期。
  • 使用 maxSizeonReject 来处理队列溢出。

有关详细信息和示例,请参阅队列指南

刷新队列项

异步队列支持刷新项以立即处理它们。

ts
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 5000 })

queue.addItem('item1')
queue.addItem('item2')
queue.addItem('item3')
console.log(queue.store.state.size) // 3

// Flush all items immediately instead of waiting
queue.flush()
console.log(queue.store.state.activeItems.length) // 2 (processing concurrently)
console.log(queue.store.state.size) // 1 (one remaining)

// Or flush a specific number of items
queue.flush(1) // Process 1 more item
console.log(queue.store.state.activeItems.length) // 3 (all processing concurrently)
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 5000 })

queue.addItem('item1')
queue.addItem('item2')
queue.addItem('item3')
console.log(queue.store.state.size) // 3

// Flush all items immediately instead of waiting
queue.flush()
console.log(queue.store.state.activeItems.length) // 2 (processing concurrently)
console.log(queue.store.state.size) // 1 (one remaining)

// Or flush a specific number of items
queue.flush(1) // Process 1 more item
console.log(queue.store.state.activeItems.length) // 3 (all processing concurrently)

状态管理

AsyncQueuer 类使用 TanStack Store 进行响应式状态管理,提供对队列状态、处理统计和并发任务跟踪的实时访问。所有状态都存储在 TanStack Store 中,可以通过 asyncQueuer.store.state 访问。但是,如果您使用的是 React 或 Solid 等框架适配器,则不应从此访问状态。而应从 asyncQueuer.state 读取状态,并在 useAsyncQueuer hook 的第三个参数中提供一个选择器回调函数来选择加入状态跟踪,如下所示。

状态选择器(框架适配器)

框架适配器支持一个 selector 参数,允许您指定哪些状态更改将触发重新渲染。这通过防止不相关的状态更改导致不必要的重新渲染来优化性能。

默认情况下,asyncQueuer.state 为空({}),因为选择器默认为空。 这是 TanStack Store useStore 的响应式状态存储的地方。您必须通过提供一个选择器函数来选择加入状态跟踪。

ts
// Default behavior - no reactive state subscriptions
const queue = useAsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
console.log(queue.state) // {}

// Opt-in to re-render when activeItems changes
const queue = useAsyncQueuer(
  processFn, 
  { concurrency: 2, wait: 1000 },
  (state) => ({ activeItems: state.activeItems })
)
console.log(queue.state.activeItems.length) // Reactive value

// Multiple state properties
const queue = useAsyncQueuer(
  processFn,
  { concurrency: 2, wait: 1000 },
  (state) => ({
    activeItems: state.activeItems,
    successCount: state.successCount,
    errorCount: state.errorCount
  })
)
// Default behavior - no reactive state subscriptions
const queue = useAsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
console.log(queue.state) // {}

// Opt-in to re-render when activeItems changes
const queue = useAsyncQueuer(
  processFn, 
  { concurrency: 2, wait: 1000 },
  (state) => ({ activeItems: state.activeItems })
)
console.log(queue.state.activeItems.length) // Reactive value

// Multiple state properties
const queue = useAsyncQueuer(
  processFn,
  { concurrency: 2, wait: 1000 },
  (state) => ({
    activeItems: state.activeItems,
    successCount: state.successCount,
    errorCount: state.errorCount
  })
)

初始状态

您可以在创建异步队列时提供初始状态值。

ts
const savedState = localStorage.getItem('async-queuer-state')
const initialState = savedState ? JSON.parse(savedState) : {}

const queue = new AsyncQueuer(processFn, {
  concurrency: 2,
  wait: 1000,
  initialState
})
const savedState = localStorage.getItem('async-queuer-state')
const initialState = savedState ? JSON.parse(savedState) : {}

const queue = new AsyncQueuer(processFn, {
  concurrency: 2,
  wait: 1000,
  initialState
})

订阅状态更改

Store 是响应式的并支持订阅

ts
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 1000 })

// Subscribe to state changes
const unsubscribe = queue.store.subscribe((state) => {
  // do something with the state like persist it to localStorage
})

// Unsubscribe when done
unsubscribe()
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 1000 })

// Subscribe to state changes
const unsubscribe = queue.store.subscribe((state) => {
  // do something with the state like persist it to localStorage
})

// Unsubscribe when done
unsubscribe()

注意:在使用框架适配器时,这是不必要的,因为底层的 useStore hook 已经完成了此操作。您也可以根据需要从 TanStack Store 导入并使用 useStorequeuer.store.state 转换为带有自定义选择器的响应式状态。

可用状态属性

AsyncQueuerState 包含核心队列指南中的所有属性,以及:

  • activeItems:当前正在处理的项数组
  • addItemCount:调用 addItem 的次数(用于归约计算)
  • errorCount:导致错误的函数执行次数。
  • expirationCount:由于过期而被从队列中移除的项数
  • isEmpty:队列是否没有要处理的项(项数组为空)
  • isFull:queuer 是否已达到最大容量
  • isIdle:queuer 当前是否未处理任何项目
  • isRunning:queuer 是否处于活动状态并会自动处理项目
  • items:当前等待处理的项目数组
  • itemTimestamps:项添加到队列的时间戳,用于过期跟踪
  • lastResult:最近一次成功函数执行的结果。
  • pendingTick:queuer 是否有待处理的下一项处理超时
  • rejectionCount:从队列中拒绝的项数
  • settledCount:已完成执行(成功或出错)的函数执行次数
  • size:当前队列中的项目数
  • status:当前处理状态(“idle” | “running” | “stopped”)
  • successCount:成功完成的函数执行次数。

框架适配器

每个框架适配器都围绕异步队列类构建了便捷的 hooks 和函数。像 useAsyncQueueruseAsyncQueuedState 这样的 hooks 是小型包装器,可以减少您在常见用例中编写样板代码的工作量。


有关核心队列概念和同步队列,请参阅队列指南

我们的合作伙伴
Code Rabbit
Unkey
订阅 Bytes

您的每周 JavaScript 资讯。每周一免费发送给超过 10 万开发者。

Bytes

无垃圾邮件。您可以随时取消订阅。

订阅 Bytes

您的每周 JavaScript 资讯。每周一免费发送给超过 10 万开发者。

Bytes

无垃圾邮件。您可以随时取消订阅。