框架
版本
防抖器 API 参考
节流器 API 参考
速率限制器 API 参考
队列 API 参考
批处理器 API 参考

排队指南

速率限制节流防抖 不同,后者会在执行过于频繁时丢弃执行,而队列可以配置为确保所有操作都得到处理。它们提供了一种管理和控制操作流程而不丢失任何请求的方式。这使得它们成为数据丢失不可接受的场景的理想选择。排队还可以设置为最大大小,这对于防止内存泄漏或其他问题非常有用。本指南将介绍 TanStack Pacer 的排队概念。

排队概念

排队确保所有操作最终都能得到处理,即使它们进来的速度比处理它们的速度快。与丢弃多余操作的其他执行控制技术不同,排队将操作缓冲到一个有序列表中,并根据特定规则进行处理。这使得排队成为 TanStack Pacer 中唯一“无损”的执行控制技术,除非指定了 maxSize,这可能导致在缓冲区满时拒绝项目。

排队可视化

text
Queuing (processing one item every 2 ticks)
Timeline: [1 second per tick]
Calls:        ⬇️  ⬇️  ⬇️     ⬇️  ⬇️     ⬇️  ⬇️  ⬇️
Queue:       [ABC]   [BC]    [BCDE]    [DE]    [E]    []
Executed:     ✅     ✅       ✅        ✅      ✅     ✅
             [=================================================================]
             ^ Unlike rate limiting/throttling/debouncing,
               ALL calls are eventually processed in order

             [Items queue up]   [Process steadily]   [Empty]
              when busy          one by one           queue
Queuing (processing one item every 2 ticks)
Timeline: [1 second per tick]
Calls:        ⬇️  ⬇️  ⬇️     ⬇️  ⬇️     ⬇️  ⬇️  ⬇️
Queue:       [ABC]   [BC]    [BCDE]    [DE]    [E]    []
Executed:     ✅     ✅       ✅        ✅      ✅     ✅
             [=================================================================]
             ^ Unlike rate limiting/throttling/debouncing,
               ALL calls are eventually processed in order

             [Items queue up]   [Process steadily]   [Empty]
              when busy          one by one           queue

何时使用排队

当您需要确保所有操作都得到处理时,排队尤其重要,即使这意味着会引入一些延迟。这使其成为数据一致性和完整性比即时执行更重要的场景的理想选择。当使用 maxSize 时,它还可以充当缓冲区,防止系统被过多的待处理操作淹没。

何时不使用排队

当以下情况时,排队可能不是最佳选择:

  • 即时反馈比处理每个操作更重要
  • 您只关心最新值(请使用 防抖
  • 您想将操作分组(请使用 批量处理

提示

如果您目前正在使用速率限制、节流或防抖,但发现丢弃的操作导致了问题,那么排队很可能就是您需要的解决方案。

TanStack Pacer 中的排队

TanStack Pacer 通过简单的 queue 函数和更强大的 Queuer 类提供排队功能。虽然其他执行控制技术通常偏爱基于函数的 API,但排队通常受益于基于类的 API 提供的额外控制。

使用 queue 进行基本用法

queue 函数提供了一种简单的方法来创建一个始终运行的队列,该队列会在添加项目时处理它们。

ts
import { queue } from '@tanstack/pacer'

// Create a queue that processes items every second
const processItems = queue<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    wait: 1000,
    maxSize: 10, // Optional: limit queue size to prevent memory or time issues
    onItemsChange: (queuer) => {
      console.log('Current queue:', queuer.peekAllItems())
    }
  }
)

// Add items to be processed
processItems(1) // Processed immediately
processItems(2) // Processed after 1 second
processItems(3) // Processed after 2 seconds
import { queue } from '@tanstack/pacer'

// Create a queue that processes items every second
const processItems = queue<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    wait: 1000,
    maxSize: 10, // Optional: limit queue size to prevent memory or time issues
    onItemsChange: (queuer) => {
      console.log('Current queue:', queuer.peekAllItems())
    }
  }
)

// Add items to be processed
processItems(1) // Processed immediately
processItems(2) // Processed after 1 second
processItems(3) // Processed after 2 seconds

虽然 queue 函数易于使用,但它仅通过 addItem 方法提供了一个基本的始终运行的队列。对于大多数用例,您会希望使用 Queuer 类提供的额外控制和功能。

使用 Queuer 类进行高级用法

Queuer 类提供了对队列行为和处理的完全控制。

ts
import { Queuer } from '@tanstack/pacer'

// Create a queue that processes items every second
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    wait: 1000, // Wait 1 second between processing items
    maxSize: 5, // Optional: limit queue size to prevent memory or time issues
    onItemsChange: (queuer) => {
      console.log('Current queue:', queuer.peekAllItems())
    }
  }
)

// Start processing
queue.start()

// Add items to be processed
queue.addItem(1)
queue.addItem(2)
queue.addItem(3)

// Items will be processed one at a time with 1 second delay between each
// Output:
// Processing: 1 (immediately)
// Processing: 2 (after 1 second)
// Processing: 3 (after 2 seconds)
import { Queuer } from '@tanstack/pacer'

// Create a queue that processes items every second
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    wait: 1000, // Wait 1 second between processing items
    maxSize: 5, // Optional: limit queue size to prevent memory or time issues
    onItemsChange: (queuer) => {
      console.log('Current queue:', queuer.peekAllItems())
    }
  }
)

// Start processing
queue.start()

// Add items to be processed
queue.addItem(1)
queue.addItem(2)
queue.addItem(3)

// Items will be processed one at a time with 1 second delay between each
// Output:
// Processing: 1 (immediately)
// Processing: 2 (after 1 second)
// Processing: 3 (after 2 seconds)

队列类型和排序

TanStack Pacer 的 Queuer 之所以独特,是因为它可以通过基于位置的 API 来适应不同的用例。同一个 Queuer 可以充当传统的队列、栈或双端队列,所有这些都通过相同的统一接口实现。

FIFO 队列(先进先出)

默认行为是按照添加的顺序处理项目。这是最常见的队列类型,遵循“先添加的项目先处理”的原则。在使用 maxSize 时,如果队列已满,新项目将被拒绝。

ts
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    addItemsTo: 'back', // default
    getItemsFrom: 'front', // default
  }
)
queue.addItem(1) // [1]
queue.addItem(2) // [1, 2]
// Processes: 1, then 2
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    addItemsTo: 'back', // default
    getItemsFrom: 'front', // default
  }
)
queue.addItem(1) // [1]
queue.addItem(2) // [1, 2]
// Processes: 1, then 2

LIFO 栈(后进先出)

通过将“back”指定为添加和检索项目的位置,queuer 的行为类似于堆栈。在堆栈中,最近添加的项目是第一个被处理的项目。在使用 maxSize 时,如果堆栈已满,新项目将被拒绝。

ts
const stack = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    addItemsTo: 'back', // default
    getItemsFrom: 'back', // override default for stack behavior
  }
)
stack.addItem(1) // [1]
stack.addItem(2) // [1, 2]
// Items will process in order: 2, then 1

stack.getNextItem('back') // get next item from back of queue instead of front
const stack = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    addItemsTo: 'back', // default
    getItemsFrom: 'back', // override default for stack behavior
  }
)
stack.addItem(1) // [1]
stack.addItem(2) // [1, 2]
// Items will process in order: 2, then 1

stack.getNextItem('back') // get next item from back of queue instead of front

优先级队列

优先级队列通过允许项目根据其优先级而不是仅仅插入顺序进行排序,为队列排序增加了另一个维度。每个项目都被分配一个优先级值,队列会自动按优先级顺序维护项目。在使用 maxSize 时,如果队列已满,优先级较低的项目可能会被拒绝。

ts
const priorityQueue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    getPriority: (n) => n // Higher numbers have priority
  }
)
priorityQueue.addItem(1) // [1]
priorityQueue.addItem(3) // [3, 1]
priorityQueue.addItem(2) // [3, 2, 1]
// Processes: 3, 2, then 1
const priorityQueue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    getPriority: (n) => n // Higher numbers have priority
  }
)
priorityQueue.addItem(1) // [1]
priorityQueue.addItem(3) // [3, 1]
priorityQueue.addItem(2) // [3, 2, 1]
// Processes: 3, 2, then 1

启动和停止

Queuer 类通过 start()stop() 方法支持启动和停止处理。默认情况下,队列会自动开始处理。您可以将 started: false 设置为使队列初始暂停,从而允许您执行以下操作:

  1. 稍后使用 start() 开始处理
  2. 通过以事件驱动的方式调用 getNextItem() 来手动处理项目
ts
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    started: false // Start paused
  }
)

queue.start() // Begin processing items
queue.stop()  // Pause processing

// Manually process items while the queue is stopped (run it your own way)
queue.getNextItem() // Get next item
queue.getNextItem() // Get next item
queue.getNextItem() // Get next item
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    started: false // Start paused
  }
)

queue.start() // Begin processing items
queue.stop()  // Pause processing

// Manually process items while the queue is stopped (run it your own way)
queue.getNextItem() // Get next item
queue.getNextItem() // Get next item
queue.getNextItem() // Get next item

附加功能

Queuer 提供了几个有用的队列管理方法。

ts
// Queue inspection
queue.peekNextItem()           // View next item without removing it
queue.store.state.size         // Get current queue size
queue.store.state.isEmpty      // Check if queue is empty
queue.store.state.isFull       // Check if queue has reached maxSize
queue.peekAllItems()           // Get copy of all queued items

// Queue manipulation
queue.clear()                  // Remove all items
queue.reset()                  // Reset to initial state
queue.store.state.executionCount // Get number of processed items
queue.flush()                  // Flush all pending items immediately

// Event handling (use the onItemsChange option, not a method)
// Example:
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    onItemsChange: (queuer) => {
      console.log('Processed:', queuer.peekAllItems())
    }
  }
)
// Queue inspection
queue.peekNextItem()           // View next item without removing it
queue.store.state.size         // Get current queue size
queue.store.state.isEmpty      // Check if queue is empty
queue.store.state.isFull       // Check if queue has reached maxSize
queue.peekAllItems()           // Get copy of all queued items

// Queue manipulation
queue.clear()                  // Remove all items
queue.reset()                  // Reset to initial state
queue.store.state.executionCount // Get number of processed items
queue.flush()                  // Flush all pending items immediately

// Event handling (use the onItemsChange option, not a method)
// Example:
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    onItemsChange: (queuer) => {
      console.log('Processed:', queuer.peekAllItems())
    }
  }
)

项目过期

Queuer 支持项目自动过期,这些项目在队列中停留时间过长。这对于防止处理过时数据或为排队操作设置超时非常有用。

ts
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    expirationDuration: 5000, // Items expire after 5 seconds
    onExpire: (item, queuer) => {
      console.log('Item expired:', item)
    }
  }
)

// Or use a custom expiration check
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    getIsExpired: (item, addedAt) => {
      // Custom expiration logic
      return Date.now() - addedAt > 5000
    },
    onExpire: (item, queuer) => {
      console.log('Item expired:', item)
    }
  }
)

// Check expiration statistics
console.log(queue.store.state.expirationCount) // Number of items that have expired
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    expirationDuration: 5000, // Items expire after 5 seconds
    onExpire: (item, queuer) => {
      console.log('Item expired:', item)
    }
  }
)

// Or use a custom expiration check
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    getIsExpired: (item, addedAt) => {
      // Custom expiration logic
      return Date.now() - addedAt > 5000
    },
    onExpire: (item, queuer) => {
      console.log('Item expired:', item)
    }
  }
)

// Check expiration statistics
console.log(queue.store.state.expirationCount) // Number of items that have expired

过期功能对于以下方面特别有用:

  • 防止处理过时数据
  • 为排队操作实现超时
  • 通过自动删除旧项目来管理内存使用
  • 处理仅在有限时间内有效的数据

拒绝处理

当队列达到最大大小时(由 maxSize 选项设置),新项目将被拒绝。Queuer 提供了处理和监视这些拒绝的方法。

ts
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    maxSize: 2, // Only allow 2 items in queue
    onReject: (item, queuer) => {
      console.log('Queue is full. Item rejected:', item)
    }
  }
)

queue.addItem(1) // Accepted
queue.addItem(2) // Accepted
queue.addItem(3) // Rejected, triggers onReject callback

console.log(queue.store.state.rejectionCount) // 1
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    maxSize: 2, // Only allow 2 items in queue
    onReject: (item, queuer) => {
      console.log('Queue is full. Item rejected:', item)
    }
  }
)

queue.addItem(1) // Accepted
queue.addItem(2) // Accepted
queue.addItem(3) // Rejected, triggers onReject callback

console.log(queue.store.state.rejectionCount) // 1

初始项目

您可以在创建队列时预填充初始项目。

ts
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    initialItems: [1, 2, 3],
    started: true // Start processing immediately
  }
)

// Queue starts with [1, 2, 3] and begins processing
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    initialItems: [1, 2, 3],
    started: true // Start processing immediately
  }
)

// Queue starts with [1, 2, 3] and begins processing

动态配置

可以使用 setOptions() 在创建后修改 Queuer 的选项。此外,一些选项支持通过接收 queuer 实例的回调函数来动态设置值。

ts
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    wait: 1000,
    started: false
  }
)

// Change configuration
queue.setOptions({
  wait: 500, // Process items twice as fast
  started: true // Start processing
})

// Access current state
console.log(queue.store.state.size) // Current queue size
console.log(queue.store.state.isRunning) // Whether queue is running
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    wait: 1000,
    started: false
  }
)

// Change configuration
queue.setOptions({
  wait: 500, // Process items twice as fast
  started: true // Start processing
})

// Access current state
console.log(queue.store.state.size) // Current queue size
console.log(queue.store.state.isRunning) // Whether queue is running

动态选项

Queuer 中的几个选项通过接收 queuer 实例的回调函数来支持动态值。

ts
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    // Dynamic wait time based on queue size
    wait: (queuer) => {
      return queuer.store.state.size > 10 ? 2000 : 1000
    }
  }
)
const queue = new Queuer<number>(
  (item) => {
    // Process each item
    console.log('Processing:', item)
  },
  {
    // Dynamic wait time based on queue size
    wait: (queuer) => {
      return queuer.store.state.size > 10 ? 2000 : 1000
    }
  }
)

以下选项支持动态值

  • wait: 可以是数字或返回数字的函数

这允许复杂队列行为适应运行时条件。

刷新队列项目

queuer 支持刷新项目以立即处理它们。

ts
const queue = new Queuer(processFn, { wait: 5000 })

queue.addItem('item1')
queue.addItem('item2')
console.log(queue.store.state.size) // 2

// Flush all items immediately instead of waiting
queue.flush()
console.log(queue.store.state.size) // 0 (items were processed)

// Or flush a specific number of items
queue.addItem('item3')
queue.addItem('item4')
queue.addItem('item5')
queue.flush(2) // Process only 2 items
console.log(queue.store.state.size) // 1 (one item remaining)
const queue = new Queuer(processFn, { wait: 5000 })

queue.addItem('item1')
queue.addItem('item2')
console.log(queue.store.state.size) // 2

// Flush all items immediately instead of waiting
queue.flush()
console.log(queue.store.state.size) // 0 (items were processed)

// Or flush a specific number of items
queue.addItem('item3')
queue.addItem('item4')
queue.addItem('item5')
queue.flush(2) // Process only 2 items
console.log(queue.store.state.size) // 1 (one item remaining)

状态管理

Queuer 类使用 TanStack Store 进行响应式状态管理,提供对队列状态、处理统计信息和并发任务跟踪的实时访问。所有状态都存储在 TanStack Store 中,可以通过 queuer.store.state 访问,尽管如果您使用的是 React 或 Solid 等框架适配器,则不应在此处读取状态。相反,您将从 queuer.state 中读取状态,并在 useQueuer 挂钩的第三个参数中提供一个选择器回调,以选择加入状态跟踪,如下所示。

状态选择器(框架适配器)

框架适配器支持 selector 参数,允许您指定哪些状态更改将触发重新渲染。这通过防止在发生不相关状态更改时进行不必要的重新渲染来优化性能。

默认情况下,queuer.state 为空({}),因为选择器默认为空。 这就是响应式状态(来自 TanStack Store useStore)存储的位置。您必须通过提供选择器函数来选择加入状态跟踪。

ts
// Default behavior - no reactive state subscriptions
const queue = useQueuer(processFn, { wait: 1000, maxSize: 10 })
console.log(queue.state) // {}

// Opt-in to re-render when size changes
const queue = useQueuer(
  processFn, 
  { wait: 1000, maxSize: 10 },
  (state) => ({ size: state.size })
)
console.log(queue.state.size) // Reactive value

// Multiple state properties
const queue = useQueuer(
  processFn,
  { wait: 1000, maxSize: 10 },
  (state) => ({
    size: state.size,
    executionCount: state.executionCount,
    status: state.status
  })
)
// Default behavior - no reactive state subscriptions
const queue = useQueuer(processFn, { wait: 1000, maxSize: 10 })
console.log(queue.state) // {}

// Opt-in to re-render when size changes
const queue = useQueuer(
  processFn, 
  { wait: 1000, maxSize: 10 },
  (state) => ({ size: state.size })
)
console.log(queue.state.size) // Reactive value

// Multiple state properties
const queue = useQueuer(
  processFn,
  { wait: 1000, maxSize: 10 },
  (state) => ({
    size: state.size,
    executionCount: state.executionCount,
    status: state.status
  })
)

初始状态

您可以在创建 queuer 时提供初始状态值。这通常用于从持久存储中恢复状态。

ts
// Load initial state from localStorage
const savedState = localStorage.getItem('queuer-state')
const initialState = savedState ? JSON.parse(savedState) : {}

const queue = new Queuer(processFn, {
  wait: 1000,
  maxSize: 10,
  initialState
})
// Load initial state from localStorage
const savedState = localStorage.getItem('queuer-state')
const initialState = savedState ? JSON.parse(savedState) : {}

const queue = new Queuer(processFn, {
  wait: 1000,
  maxSize: 10,
  initialState
})

订阅状态更改

Store 是响应式的并支持订阅

ts
const queue = new Queuer(processFn, { wait: 1000, maxSize: 10 })

// Subscribe to state changes
const unsubscribe = queue.store.subscribe((state) => {
  // do something with the state like persist it to localStorage
})

// Unsubscribe when done
unsubscribe()
const queue = new Queuer(processFn, { wait: 1000, maxSize: 10 })

// Subscribe to state changes
const unsubscribe = queue.store.subscribe((state) => {
  // do something with the state like persist it to localStorage
})

// Unsubscribe when done
unsubscribe()

注意: 在使用框架适配器时,这是不必要的,因为底层 useStore 挂钩已经这样做了。您还可以根据需要从 TanStack Store 导入和使用 useStore,将 queuer.store.state 转换为具有自定义选择器的响应式状态。

可用状态属性

QueuerState 包括:

  • addItemCount:调用 addItem 的次数(用于计算减法)
  • executionCount:由 queuer 处理的项目数
  • expirationCount:由于过期而从队列中移除的项目数
  • isEmpty:queuer 是否没有要处理的项目(items 数组为空)
  • isFull:queuer 是否已达到最大容量
  • isIdle:queuer 当前是否未处理任何项目
  • isRunning:queuer 是否处于活动状态并会自动处理项目
  • items:当前等待处理的项目数组
  • itemTimestamps:项目添加到队列中的时间戳,用于过期跟踪
  • pendingTick:queuer 是否有待处理的下一项处理超时
  • rejectionCount:已拒绝添加到队列的项目数
  • size:当前队列中的项目数
  • status:当前处理状态(“idle” | “running” | “stopped”)

框架适配器

每个框架适配器都在 queuer 类之上构建了方便的挂钩和函数。useQueueruseQueuedStateuseQueuedValue 等挂钩是小型包装器,可以减少您在自己的代码中为某些常见用例所需的样板代码。


有关异步排队的说明,请参阅 异步排队指南

我们的合作伙伴
Code Rabbit
Unkey
订阅 Bytes

您的每周 JavaScript 资讯。每周一免费发送给超过 10 万开发者。

Bytes

无垃圾邮件。您可以随时取消订阅。

订阅 Bytes

您的每周 JavaScript 资讯。每周一免费发送给超过 10 万开发者。

Bytes

无垃圾邮件。您可以随时取消订阅。