注意: 队列指南中的所有核心队列概念同样适用于 AsyncQueuer。AsyncQueuer 在这些概念的基础上扩展了高级功能,例如并发(一次处理多个任务)和强大的错误处理。如果您是队列新手,请先阅读队列指南,了解 FIFO/LIFO、优先级、过期、拒绝和队列管理。本指南侧重于 AsyncQueuer 在异步和并发任务处理方面的独特性和强大之处。
虽然 Queuer 提供了带有时间控制的同步队列,但 AsyncQueuer 专门用于处理并发异步操作。它实现了传统意义上的“任务池”或“工作池”模式,允许同时处理多个操作,同时保持对并发和时机的控制。其实现大部分复制自 Tanner 自 2017 年以来一直服务于 JavaScript 社区的原始任务池工具 Swimmer。
异步队列通过增加并发处理能力来扩展基本队列概念。异步队列不是一次处理一个项,而是可以同时处理多个项,同时仍保持执行的顺序和控制。这在处理 I/O 操作、网络请求或任何大部分时间都在等待而不是消耗 CPU 的任务时特别有用。
Async Queuing (concurrency: 2, wait: 2 ticks)
Timeline: [1 second per tick]
Calls: ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️
Queue: [ABC] [C] [CDE] [E] []
Active: [A,B] [B,C] [C,D] [D,E] [E]
Completed: - A B C D,E
[=================================================================]
^ Unlike regular queuing, multiple items
can be processed concurrently
[Items queue up] [Process 2 at once] [Complete]
when busy with wait between all items
Async Queuing (concurrency: 2, wait: 2 ticks)
Timeline: [1 second per tick]
Calls: ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️
Queue: [ABC] [C] [CDE] [E] []
Active: [A,B] [B,C] [C,D] [D,E] [E]
Completed: - A B C D,E
[=================================================================]
^ Unlike regular queuing, multiple items
can be processed concurrently
[Items queue up] [Process 2 at once] [Complete]
when busy with wait between all items
当您需要以下功能时,异步队列特别有效:
AsyncQueuer 非常通用,可以在许多情况下使用。如果您不需要并发处理,请改用队列。如果您不需要处理所有排队的执行,请改用节流。
如果您想将操作分组,请改用批处理。
TanStack Pacer 通过简单的 asyncQueue 函数和更强大的 AsyncQueuer 类提供异步队列。与核心队列指南一样,支持所有队列类型和排序策略(FIFO、LIFO、优先级等)。
asyncQueue
函数提供了一种创建始终运行的异步队列的简单方法
import { asyncQueue } from '@tanstack/pacer'
// Create a queue that processes up to 2 items concurrently
const processItems = asyncQueue(
async (item: number) => {
// Process each item asynchronously
const result = await fetchData(item)
return result
},
{
concurrency: 2,
onItemsChange: (queuer) => {
console.log('Active tasks:', queuer.peekActiveItems().length)
}
}
)
// Add items to be processed
processItems(1)
processItems(2)
import { asyncQueue } from '@tanstack/pacer'
// Create a queue that processes up to 2 items concurrently
const processItems = asyncQueue(
async (item: number) => {
// Process each item asynchronously
const result = await fetchData(item)
return result
},
{
concurrency: 2,
onItemsChange: (queuer) => {
console.log('Active tasks:', queuer.peekActiveItems().length)
}
}
)
// Add items to be processed
processItems(1)
processItems(2)
要更全面地控制队列,请直接使用 AsyncQueuer 类。
AsyncQueuer
类提供了对异步队列行为的完全控制,包括所有核心队列功能,以及:
import { AsyncQueuer } from '@tanstack/pacer'
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
const result = await fetchData(item)
return result
},
{
concurrency: 2, // Process 2 items at once
wait: 1000, // Wait 1 second between starting new items
started: true, // Start processing immediately
key: 'data-processor' // Identify this queuer in devtools
}
)
// Add error and success handlers via options
queue.setOptions({
onError: (error, item, queuer) => {
console.error('Task failed:', error)
console.log('Failed item:', item)
// You can access queue state here
console.log('Error count:', queuer.store.state.errorCount)
},
onSuccess: (result, item, queuer) => {
console.log('Task completed:', result)
console.log('Completed item:', item)
// You can access queue state here
console.log('Success count:', queuer.store.state.successCount)
},
onSettled: (item, queuer) => {
// Called after each execution (success or failure)
console.log('Task settled:', item)
console.log('Total settled:', queuer.store.state.settledCount)
}
})
// Add items to be processed
queue.addItem(1)
queue.addItem(2)
import { AsyncQueuer } from '@tanstack/pacer'
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
const result = await fetchData(item)
return result
},
{
concurrency: 2, // Process 2 items at once
wait: 1000, // Wait 1 second between starting new items
started: true, // Start processing immediately
key: 'data-processor' // Identify this queuer in devtools
}
)
// Add error and success handlers via options
queue.setOptions({
onError: (error, item, queuer) => {
console.error('Task failed:', error)
console.log('Failed item:', item)
// You can access queue state here
console.log('Error count:', queuer.store.state.errorCount)
},
onSuccess: (result, item, queuer) => {
console.log('Task completed:', result)
console.log('Completed item:', item)
// You can access queue state here
console.log('Success count:', queuer.store.state.successCount)
},
onSettled: (item, queuer) => {
// Called after each execution (success or failure)
console.log('Task settled:', item)
console.log('Total settled:', queuer.store.state.settledCount)
}
})
// Add items to be processed
queue.addItem(1)
queue.addItem(2)
支持所有队列类型和排序策略(FIFO、LIFO、优先级等)——有关详细信息,请参阅队列指南。AsyncQueuer 增加了:
const priorityQueue = new AsyncQueuer(
async (item: { value: string; priority: number }) => {
// Process each item asynchronously
return await processTask(item.value)
},
{
concurrency: 2,
getPriority: (item) => item.priority // Higher numbers have priority
}
)
priorityQueue.addItem({ value: 'low', priority: 1 })
priorityQueue.addItem({ value: 'high', priority: 3 })
priorityQueue.addItem({ value: 'medium', priority: 2 })
// Processes: high and medium concurrently, then low
const priorityQueue = new AsyncQueuer(
async (item: { value: string; priority: number }) => {
// Process each item asynchronously
return await processTask(item.value)
},
{
concurrency: 2,
getPriority: (item) => item.priority // Higher numbers have priority
}
)
priorityQueue.addItem({ value: 'low', priority: 1 })
priorityQueue.addItem({ value: 'high', priority: 3 })
priorityQueue.addItem({ value: 'medium', priority: 2 })
// Processes: high and medium concurrently, then low
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
if (item < 0) throw new Error('Negative item')
return await processTask(item)
},
{
onError: (error, item, queuer) => {
console.error('Task failed:', error)
console.log('Failed item:', item)
// You can access queue state here
console.log('Error count:', queuer.store.state.errorCount)
},
throwOnError: true, // Will throw errors even with onError handler
onSuccess: (result, item, queuer) => {
console.log('Task succeeded:', result)
console.log('Succeeded item:', item)
// You can access queue state here
console.log('Success count:', queuer.store.state.successCount)
},
onSettled: (item, queuer) => {
// Called after each execution (success or failure)
console.log('Task settled:', item)
console.log('Total settled:', queuer.store.state.settledCount)
}
}
)
queue.addItem(-1) // Will trigger error handling
queue.addItem(2)
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
if (item < 0) throw new Error('Negative item')
return await processTask(item)
},
{
onError: (error, item, queuer) => {
console.error('Task failed:', error)
console.log('Failed item:', item)
// You can access queue state here
console.log('Error count:', queuer.store.state.errorCount)
},
throwOnError: true, // Will throw errors even with onError handler
onSuccess: (result, item, queuer) => {
console.log('Task succeeded:', result)
console.log('Succeeded item:', item)
// You can access queue state here
console.log('Success count:', queuer.store.state.successCount)
},
onSettled: (item, queuer) => {
// Called after each execution (success or failure)
console.log('Task settled:', item)
console.log('Total settled:', queuer.store.state.settledCount)
}
}
)
queue.addItem(-1) // Will trigger error handling
queue.addItem(2)
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
return await processTask(item)
},
{
// Dynamic concurrency based on system load
concurrency: (queuer) => {
return Math.max(1, 4 - queuer.store.state.activeItems.length)
},
// Dynamic wait time based on queue size
wait: (queuer) => {
return queuer.store.state.size > 10 ? 2000 : 1000
}
}
)
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
return await processTask(item)
},
{
// Dynamic concurrency based on system load
concurrency: (queuer) => {
return Math.max(1, 4 - queuer.store.state.activeItems.length)
},
// Dynamic wait time based on queue size
wait: (queuer) => {
return queuer.store.state.size > 10 ? 2000 : 1000
}
}
)
AsyncQueuer 提供所有核心队列指南中的队列管理和监控方法,以及特定于异步的方法。
有关队列管理的更多信息,请参阅队列指南。
AsyncQueuer 支持与核心队列相同的过期和拒绝功能。
有关详细信息和示例,请参阅队列指南。
异步队列支持刷新项以立即处理它们。
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 5000 })
queue.addItem('item1')
queue.addItem('item2')
queue.addItem('item3')
console.log(queue.store.state.size) // 3
// Flush all items immediately instead of waiting
queue.flush()
console.log(queue.store.state.activeItems.length) // 2 (processing concurrently)
console.log(queue.store.state.size) // 1 (one remaining)
// Or flush a specific number of items
queue.flush(1) // Process 1 more item
console.log(queue.store.state.activeItems.length) // 3 (all processing concurrently)
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 5000 })
queue.addItem('item1')
queue.addItem('item2')
queue.addItem('item3')
console.log(queue.store.state.size) // 3
// Flush all items immediately instead of waiting
queue.flush()
console.log(queue.store.state.activeItems.length) // 2 (processing concurrently)
console.log(queue.store.state.size) // 1 (one remaining)
// Or flush a specific number of items
queue.flush(1) // Process 1 more item
console.log(queue.store.state.activeItems.length) // 3 (all processing concurrently)
AsyncQueuer
类使用 TanStack Store 进行响应式状态管理,提供对队列状态、处理统计和并发任务跟踪的实时访问。所有状态都存储在 TanStack Store 中,可以通过 asyncQueuer.store.state 访问。但是,如果您使用的是 React 或 Solid 等框架适配器,则不应从此访问状态。而应从 asyncQueuer.state 读取状态,并在 useAsyncQueuer hook 的第三个参数中提供一个选择器回调函数来选择加入状态跟踪,如下所示。
框架适配器支持一个 selector 参数,允许您指定哪些状态更改将触发重新渲染。这通过防止不相关的状态更改导致不必要的重新渲染来优化性能。
默认情况下,asyncQueuer.state 为空({}),因为选择器默认为空。 这是 TanStack Store useStore 的响应式状态存储的地方。您必须通过提供一个选择器函数来选择加入状态跟踪。
// Default behavior - no reactive state subscriptions
const queue = useAsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
console.log(queue.state) // {}
// Opt-in to re-render when activeItems changes
const queue = useAsyncQueuer(
processFn,
{ concurrency: 2, wait: 1000 },
(state) => ({ activeItems: state.activeItems })
)
console.log(queue.state.activeItems.length) // Reactive value
// Multiple state properties
const queue = useAsyncQueuer(
processFn,
{ concurrency: 2, wait: 1000 },
(state) => ({
activeItems: state.activeItems,
successCount: state.successCount,
errorCount: state.errorCount
})
)
// Default behavior - no reactive state subscriptions
const queue = useAsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
console.log(queue.state) // {}
// Opt-in to re-render when activeItems changes
const queue = useAsyncQueuer(
processFn,
{ concurrency: 2, wait: 1000 },
(state) => ({ activeItems: state.activeItems })
)
console.log(queue.state.activeItems.length) // Reactive value
// Multiple state properties
const queue = useAsyncQueuer(
processFn,
{ concurrency: 2, wait: 1000 },
(state) => ({
activeItems: state.activeItems,
successCount: state.successCount,
errorCount: state.errorCount
})
)
您可以在创建异步队列时提供初始状态值。
const savedState = localStorage.getItem('async-queuer-state')
const initialState = savedState ? JSON.parse(savedState) : {}
const queue = new AsyncQueuer(processFn, {
concurrency: 2,
wait: 1000,
initialState
})
const savedState = localStorage.getItem('async-queuer-state')
const initialState = savedState ? JSON.parse(savedState) : {}
const queue = new AsyncQueuer(processFn, {
concurrency: 2,
wait: 1000,
initialState
})
Store 是响应式的并支持订阅
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
// Subscribe to state changes
const unsubscribe = queue.store.subscribe((state) => {
// do something with the state like persist it to localStorage
})
// Unsubscribe when done
unsubscribe()
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
// Subscribe to state changes
const unsubscribe = queue.store.subscribe((state) => {
// do something with the state like persist it to localStorage
})
// Unsubscribe when done
unsubscribe()
注意:在使用框架适配器时,这是不必要的,因为底层的 useStore hook 已经完成了此操作。您也可以根据需要从 TanStack Store 导入并使用 useStore 将 queuer.store.state 转换为带有自定义选择器的响应式状态。
AsyncQueuerState
包含核心队列指南中的所有属性,以及:
每个框架适配器都围绕异步队列类构建了便捷的 hooks 和函数。像 useAsyncQueuer 或 useAsyncQueuedState 这样的 hooks 是小型包装器,可以减少您在常见用例中编写样板代码的工作量。
有关核心队列概念和同步队列,请参阅队列指南。
您的每周 JavaScript 资讯。每周一免费发送给超过 10 万开发者。