与 速率限制、节流 和 防抖 不同,后者会在执行过于频繁时丢弃执行,而队列可以配置为确保所有操作都得到处理。它们提供了一种管理和控制操作流程而不丢失任何请求的方式。这使得它们成为数据丢失不可接受的场景的理想选择。排队还可以设置为最大大小,这对于防止内存泄漏或其他问题非常有用。本指南将介绍 TanStack Pacer 的排队概念。
排队确保所有操作最终都能得到处理,即使它们进来的速度比处理它们的速度快。与丢弃多余操作的其他执行控制技术不同,排队将操作缓冲到一个有序列表中,并根据特定规则进行处理。这使得排队成为 TanStack Pacer 中唯一“无损”的执行控制技术,除非指定了 maxSize,这可能导致在缓冲区满时拒绝项目。
Queuing (processing one item every 2 ticks)
Timeline: [1 second per tick]
Calls: ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️
Queue: [ABC] [BC] [BCDE] [DE] [E] []
Executed: ✅ ✅ ✅ ✅ ✅ ✅
[=================================================================]
^ Unlike rate limiting/throttling/debouncing,
ALL calls are eventually processed in order
[Items queue up] [Process steadily] [Empty]
when busy one by one queue
Queuing (processing one item every 2 ticks)
Timeline: [1 second per tick]
Calls: ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️
Queue: [ABC] [BC] [BCDE] [DE] [E] []
Executed: ✅ ✅ ✅ ✅ ✅ ✅
[=================================================================]
^ Unlike rate limiting/throttling/debouncing,
ALL calls are eventually processed in order
[Items queue up] [Process steadily] [Empty]
when busy one by one queue
当您需要确保所有操作都得到处理时,排队尤其重要,即使这意味着会引入一些延迟。这使其成为数据一致性和完整性比即时执行更重要的场景的理想选择。当使用 maxSize 时,它还可以充当缓冲区,防止系统被过多的待处理操作淹没。
当以下情况时,排队可能不是最佳选择:
提示
如果您目前正在使用速率限制、节流或防抖,但发现丢弃的操作导致了问题,那么排队很可能就是您需要的解决方案。
TanStack Pacer 通过简单的 queue 函数和更强大的 Queuer 类提供排队功能。虽然其他执行控制技术通常偏爱基于函数的 API,但排队通常受益于基于类的 API 提供的额外控制。
queue
函数提供了一种简单的方法来创建一个始终运行的队列,该队列会在添加项目时处理它们。
import { queue } from '@tanstack/pacer'
// Create a queue that processes items every second
const processItems = queue<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
wait: 1000,
maxSize: 10, // Optional: limit queue size to prevent memory or time issues
onItemsChange: (queuer) => {
console.log('Current queue:', queuer.peekAllItems())
}
}
)
// Add items to be processed
processItems(1) // Processed immediately
processItems(2) // Processed after 1 second
processItems(3) // Processed after 2 seconds
import { queue } from '@tanstack/pacer'
// Create a queue that processes items every second
const processItems = queue<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
wait: 1000,
maxSize: 10, // Optional: limit queue size to prevent memory or time issues
onItemsChange: (queuer) => {
console.log('Current queue:', queuer.peekAllItems())
}
}
)
// Add items to be processed
processItems(1) // Processed immediately
processItems(2) // Processed after 1 second
processItems(3) // Processed after 2 seconds
虽然 queue
函数易于使用,但它仅通过 addItem
方法提供了一个基本的始终运行的队列。对于大多数用例,您会希望使用 Queuer
类提供的额外控制和功能。
Queuer
类提供了对队列行为和处理的完全控制。
import { Queuer } from '@tanstack/pacer'
// Create a queue that processes items every second
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
wait: 1000, // Wait 1 second between processing items
maxSize: 5, // Optional: limit queue size to prevent memory or time issues
onItemsChange: (queuer) => {
console.log('Current queue:', queuer.peekAllItems())
}
}
)
// Start processing
queue.start()
// Add items to be processed
queue.addItem(1)
queue.addItem(2)
queue.addItem(3)
// Items will be processed one at a time with 1 second delay between each
// Output:
// Processing: 1 (immediately)
// Processing: 2 (after 1 second)
// Processing: 3 (after 2 seconds)
import { Queuer } from '@tanstack/pacer'
// Create a queue that processes items every second
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
wait: 1000, // Wait 1 second between processing items
maxSize: 5, // Optional: limit queue size to prevent memory or time issues
onItemsChange: (queuer) => {
console.log('Current queue:', queuer.peekAllItems())
}
}
)
// Start processing
queue.start()
// Add items to be processed
queue.addItem(1)
queue.addItem(2)
queue.addItem(3)
// Items will be processed one at a time with 1 second delay between each
// Output:
// Processing: 1 (immediately)
// Processing: 2 (after 1 second)
// Processing: 3 (after 2 seconds)
TanStack Pacer 的 Queuer 之所以独特,是因为它可以通过基于位置的 API 来适应不同的用例。同一个 Queuer 可以充当传统的队列、栈或双端队列,所有这些都通过相同的统一接口实现。
默认行为是按照添加的顺序处理项目。这是最常见的队列类型,遵循“先添加的项目先处理”的原则。在使用 maxSize 时,如果队列已满,新项目将被拒绝。
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
addItemsTo: 'back', // default
getItemsFrom: 'front', // default
}
)
queue.addItem(1) // [1]
queue.addItem(2) // [1, 2]
// Processes: 1, then 2
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
addItemsTo: 'back', // default
getItemsFrom: 'front', // default
}
)
queue.addItem(1) // [1]
queue.addItem(2) // [1, 2]
// Processes: 1, then 2
通过将“back”指定为添加和检索项目的位置,queuer 的行为类似于堆栈。在堆栈中,最近添加的项目是第一个被处理的项目。在使用 maxSize 时,如果堆栈已满,新项目将被拒绝。
const stack = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
addItemsTo: 'back', // default
getItemsFrom: 'back', // override default for stack behavior
}
)
stack.addItem(1) // [1]
stack.addItem(2) // [1, 2]
// Items will process in order: 2, then 1
stack.getNextItem('back') // get next item from back of queue instead of front
const stack = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
addItemsTo: 'back', // default
getItemsFrom: 'back', // override default for stack behavior
}
)
stack.addItem(1) // [1]
stack.addItem(2) // [1, 2]
// Items will process in order: 2, then 1
stack.getNextItem('back') // get next item from back of queue instead of front
优先级队列通过允许项目根据其优先级而不是仅仅插入顺序进行排序,为队列排序增加了另一个维度。每个项目都被分配一个优先级值,队列会自动按优先级顺序维护项目。在使用 maxSize 时,如果队列已满,优先级较低的项目可能会被拒绝。
const priorityQueue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
getPriority: (n) => n // Higher numbers have priority
}
)
priorityQueue.addItem(1) // [1]
priorityQueue.addItem(3) // [3, 1]
priorityQueue.addItem(2) // [3, 2, 1]
// Processes: 3, 2, then 1
const priorityQueue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
getPriority: (n) => n // Higher numbers have priority
}
)
priorityQueue.addItem(1) // [1]
priorityQueue.addItem(3) // [3, 1]
priorityQueue.addItem(2) // [3, 2, 1]
// Processes: 3, 2, then 1
Queuer
类通过 start()
和 stop()
方法支持启动和停止处理。默认情况下,队列会自动开始处理。您可以将 started: false
设置为使队列初始暂停,从而允许您执行以下操作:
start()
开始处理getNextItem()
来手动处理项目const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
started: false // Start paused
}
)
queue.start() // Begin processing items
queue.stop() // Pause processing
// Manually process items while the queue is stopped (run it your own way)
queue.getNextItem() // Get next item
queue.getNextItem() // Get next item
queue.getNextItem() // Get next item
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
started: false // Start paused
}
)
queue.start() // Begin processing items
queue.stop() // Pause processing
// Manually process items while the queue is stopped (run it your own way)
queue.getNextItem() // Get next item
queue.getNextItem() // Get next item
queue.getNextItem() // Get next item
Queuer 提供了几个有用的队列管理方法。
// Queue inspection
queue.peekNextItem() // View next item without removing it
queue.store.state.size // Get current queue size
queue.store.state.isEmpty // Check if queue is empty
queue.store.state.isFull // Check if queue has reached maxSize
queue.peekAllItems() // Get copy of all queued items
// Queue manipulation
queue.clear() // Remove all items
queue.reset() // Reset to initial state
queue.store.state.executionCount // Get number of processed items
queue.flush() // Flush all pending items immediately
// Event handling (use the onItemsChange option, not a method)
// Example:
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
onItemsChange: (queuer) => {
console.log('Processed:', queuer.peekAllItems())
}
}
)
// Queue inspection
queue.peekNextItem() // View next item without removing it
queue.store.state.size // Get current queue size
queue.store.state.isEmpty // Check if queue is empty
queue.store.state.isFull // Check if queue has reached maxSize
queue.peekAllItems() // Get copy of all queued items
// Queue manipulation
queue.clear() // Remove all items
queue.reset() // Reset to initial state
queue.store.state.executionCount // Get number of processed items
queue.flush() // Flush all pending items immediately
// Event handling (use the onItemsChange option, not a method)
// Example:
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
onItemsChange: (queuer) => {
console.log('Processed:', queuer.peekAllItems())
}
}
)
Queuer 支持项目自动过期,这些项目在队列中停留时间过长。这对于防止处理过时数据或为排队操作设置超时非常有用。
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
expirationDuration: 5000, // Items expire after 5 seconds
onExpire: (item, queuer) => {
console.log('Item expired:', item)
}
}
)
// Or use a custom expiration check
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
getIsExpired: (item, addedAt) => {
// Custom expiration logic
return Date.now() - addedAt > 5000
},
onExpire: (item, queuer) => {
console.log('Item expired:', item)
}
}
)
// Check expiration statistics
console.log(queue.store.state.expirationCount) // Number of items that have expired
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
expirationDuration: 5000, // Items expire after 5 seconds
onExpire: (item, queuer) => {
console.log('Item expired:', item)
}
}
)
// Or use a custom expiration check
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
getIsExpired: (item, addedAt) => {
// Custom expiration logic
return Date.now() - addedAt > 5000
},
onExpire: (item, queuer) => {
console.log('Item expired:', item)
}
}
)
// Check expiration statistics
console.log(queue.store.state.expirationCount) // Number of items that have expired
过期功能对于以下方面特别有用:
当队列达到最大大小时(由 maxSize
选项设置),新项目将被拒绝。Queuer 提供了处理和监视这些拒绝的方法。
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
maxSize: 2, // Only allow 2 items in queue
onReject: (item, queuer) => {
console.log('Queue is full. Item rejected:', item)
}
}
)
queue.addItem(1) // Accepted
queue.addItem(2) // Accepted
queue.addItem(3) // Rejected, triggers onReject callback
console.log(queue.store.state.rejectionCount) // 1
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
maxSize: 2, // Only allow 2 items in queue
onReject: (item, queuer) => {
console.log('Queue is full. Item rejected:', item)
}
}
)
queue.addItem(1) // Accepted
queue.addItem(2) // Accepted
queue.addItem(3) // Rejected, triggers onReject callback
console.log(queue.store.state.rejectionCount) // 1
您可以在创建队列时预填充初始项目。
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
initialItems: [1, 2, 3],
started: true // Start processing immediately
}
)
// Queue starts with [1, 2, 3] and begins processing
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
initialItems: [1, 2, 3],
started: true // Start processing immediately
}
)
// Queue starts with [1, 2, 3] and begins processing
可以使用 setOptions()
在创建后修改 Queuer 的选项。此外,一些选项支持通过接收 queuer 实例的回调函数来动态设置值。
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
wait: 1000,
started: false
}
)
// Change configuration
queue.setOptions({
wait: 500, // Process items twice as fast
started: true // Start processing
})
// Access current state
console.log(queue.store.state.size) // Current queue size
console.log(queue.store.state.isRunning) // Whether queue is running
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
wait: 1000,
started: false
}
)
// Change configuration
queue.setOptions({
wait: 500, // Process items twice as fast
started: true // Start processing
})
// Access current state
console.log(queue.store.state.size) // Current queue size
console.log(queue.store.state.isRunning) // Whether queue is running
Queuer 中的几个选项通过接收 queuer 实例的回调函数来支持动态值。
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
// Dynamic wait time based on queue size
wait: (queuer) => {
return queuer.store.state.size > 10 ? 2000 : 1000
}
}
)
const queue = new Queuer<number>(
(item) => {
// Process each item
console.log('Processing:', item)
},
{
// Dynamic wait time based on queue size
wait: (queuer) => {
return queuer.store.state.size > 10 ? 2000 : 1000
}
}
)
以下选项支持动态值
这允许复杂队列行为适应运行时条件。
queuer 支持刷新项目以立即处理它们。
const queue = new Queuer(processFn, { wait: 5000 })
queue.addItem('item1')
queue.addItem('item2')
console.log(queue.store.state.size) // 2
// Flush all items immediately instead of waiting
queue.flush()
console.log(queue.store.state.size) // 0 (items were processed)
// Or flush a specific number of items
queue.addItem('item3')
queue.addItem('item4')
queue.addItem('item5')
queue.flush(2) // Process only 2 items
console.log(queue.store.state.size) // 1 (one item remaining)
const queue = new Queuer(processFn, { wait: 5000 })
queue.addItem('item1')
queue.addItem('item2')
console.log(queue.store.state.size) // 2
// Flush all items immediately instead of waiting
queue.flush()
console.log(queue.store.state.size) // 0 (items were processed)
// Or flush a specific number of items
queue.addItem('item3')
queue.addItem('item4')
queue.addItem('item5')
queue.flush(2) // Process only 2 items
console.log(queue.store.state.size) // 1 (one item remaining)
Queuer
类使用 TanStack Store 进行响应式状态管理,提供对队列状态、处理统计信息和并发任务跟踪的实时访问。所有状态都存储在 TanStack Store 中,可以通过 queuer.store.state
访问,尽管如果您使用的是 React 或 Solid 等框架适配器,则不应在此处读取状态。相反,您将从 queuer.state
中读取状态,并在 useQueuer
挂钩的第三个参数中提供一个选择器回调,以选择加入状态跟踪,如下所示。
框架适配器支持 selector
参数,允许您指定哪些状态更改将触发重新渲染。这通过防止在发生不相关状态更改时进行不必要的重新渲染来优化性能。
默认情况下,queuer.state
为空({}
),因为选择器默认为空。 这就是响应式状态(来自 TanStack Store useStore
)存储的位置。您必须通过提供选择器函数来选择加入状态跟踪。
// Default behavior - no reactive state subscriptions
const queue = useQueuer(processFn, { wait: 1000, maxSize: 10 })
console.log(queue.state) // {}
// Opt-in to re-render when size changes
const queue = useQueuer(
processFn,
{ wait: 1000, maxSize: 10 },
(state) => ({ size: state.size })
)
console.log(queue.state.size) // Reactive value
// Multiple state properties
const queue = useQueuer(
processFn,
{ wait: 1000, maxSize: 10 },
(state) => ({
size: state.size,
executionCount: state.executionCount,
status: state.status
})
)
// Default behavior - no reactive state subscriptions
const queue = useQueuer(processFn, { wait: 1000, maxSize: 10 })
console.log(queue.state) // {}
// Opt-in to re-render when size changes
const queue = useQueuer(
processFn,
{ wait: 1000, maxSize: 10 },
(state) => ({ size: state.size })
)
console.log(queue.state.size) // Reactive value
// Multiple state properties
const queue = useQueuer(
processFn,
{ wait: 1000, maxSize: 10 },
(state) => ({
size: state.size,
executionCount: state.executionCount,
status: state.status
})
)
您可以在创建 queuer 时提供初始状态值。这通常用于从持久存储中恢复状态。
// Load initial state from localStorage
const savedState = localStorage.getItem('queuer-state')
const initialState = savedState ? JSON.parse(savedState) : {}
const queue = new Queuer(processFn, {
wait: 1000,
maxSize: 10,
initialState
})
// Load initial state from localStorage
const savedState = localStorage.getItem('queuer-state')
const initialState = savedState ? JSON.parse(savedState) : {}
const queue = new Queuer(processFn, {
wait: 1000,
maxSize: 10,
initialState
})
Store 是响应式的并支持订阅
const queue = new Queuer(processFn, { wait: 1000, maxSize: 10 })
// Subscribe to state changes
const unsubscribe = queue.store.subscribe((state) => {
// do something with the state like persist it to localStorage
})
// Unsubscribe when done
unsubscribe()
const queue = new Queuer(processFn, { wait: 1000, maxSize: 10 })
// Subscribe to state changes
const unsubscribe = queue.store.subscribe((state) => {
// do something with the state like persist it to localStorage
})
// Unsubscribe when done
unsubscribe()
注意: 在使用框架适配器时,这是不必要的,因为底层
useStore
挂钩已经这样做了。您还可以根据需要从 TanStack Store 导入和使用useStore
,将queuer.store.state
转换为具有自定义选择器的响应式状态。
QueuerState
包括:
每个框架适配器都在 queuer 类之上构建了方便的挂钩和函数。useQueuer
、useQueuedState
和 useQueuedValue
等挂钩是小型包装器,可以减少您在自己的代码中为某些常见用例所需的样板代码。
有关异步排队的说明,请参阅 异步排队指南。
您的每周 JavaScript 资讯。每周一免费发送给超过 10 万开发者。