文档
CodeRabbit
Cloudflare
AG Grid
Netlify
Neon
WorkOS
Clerk
Convex
Electric
PowerSync
Sentry
Railway
Prisma
Strapi
Unkey
CodeRabbit
Cloudflare
AG Grid
Netlify
Neon
WorkOS
Clerk
Convex
Electric
PowerSync
Sentry
Railway
Prisma
Strapi
Unkey
防抖器 API 参考
节流器 API 参考
速率限制器 API 参考
队列 API 参考
批处理器 API 参考
指南

批处理指南

批量处理是一种强大的技术,用于将多个操作组合在一起并作为一个单元进行处理。与队列不同,后者确保每个操作都单独处理,批量处理会收集项目并以可配置的组的形式处理它们,从而提高效率并减少开销。本指南涵盖 TanStack Pacer 的批量处理概念。

批量处理概念

批量处理会随着时间的推移收集项目,或者直到达到一定大小,然后一次性处理所有项目。这对于批量处理比逐个处理更有效的情况非常理想。批量处理可以由以下因素触发:

  • 达到最大批量大小
  • 等待最大时间
  • 自定义逻辑(例如,特殊项目或条件)

批量处理可视化

text
Batching (processing every 3 items or every 2 seconds)
Timeline: [1 second per tick]
Calls:        ⬇️  ⬇️  ⬇️     ⬇️  ⬇️     ⬇️  ⬇️  ⬇️
Batch:       [ABC]   []      [DE]      []      [FGH]  []
Executed:     ✅             ✅         ✅
             [===============================]
             ^ Items are grouped and processed together

             [Items accumulate]   [Process batch]   [Empty]
                in batch           as group         batch

何时使用批量处理

批量处理最适合以下情况:

  • 以组的形式处理项目更有效(例如,网络请求、数据库写入)
  • 您希望减少昂贵操作的频率
  • 您需要控制处理的速率或大小
  • 您希望将活动爆发减少为更少的操作

何时不使用批量处理

批量处理在以下情况下可能不理想:

  • 必须立即单独处理每个项目(使用队列
  • 您只关心最新的值(使用防抖
提示

如果您发现自己正在进行可以分组的重复调用,批量处理可以帮助您优化性能和资源利用率。

TanStack Pacer 中的批量处理

TanStack Pacer 通过 Batcher 类和简单的 batch 函数提供批量处理。两者都允许您收集项目并以可配置的批次处理它们。

使用 batch 的基本用法

batch 函数提供了一种创建批量处理函数的简单方法

ts
import { batch } from '@tanstack/pacer'

// Create a batcher that processes up to 3 items or every 2 seconds
const processBatch = batch<number>(
  (items) => {
    // Process the batch
    console.log('Processing batch:', items)
  },
  {
    maxSize: 3, // Process when 3 items are collected
    wait: 2000, // Or after 2 seconds, whichever comes first
    onItemsChange: (batcher) => {
      console.log('Current batch:', batcher.peekAllItems())
    }
  }
)

// Add items to be batched
processBatch(1)
processBatch(2)
processBatch(3) // Triggers batch processing
processBatch(4)
// Or wait 2 seconds for the next batch to process

注意:在使用 React 时,为了更好地与 React 的生命周期集成和自动清理,请优先使用 useBatchedCallback 钩子,而不是 batch 函数。

batch 函数返回一个将项目添加到批次中的函数。批次将根据您的配置自动处理。

使用 Batcher 类的进阶用法

Batcher 类提供对批量处理行为的完全控制

ts
import { Batcher } from '@tanstack/pacer'

// Create a batcher that processes up to 5 items or every 3 seconds
const batcher = new Batcher<number>(
  (items) => {
    // Process the batch
    console.log('Processing batch:', items)
  },
  {
    maxSize: 5, // Process when 5 items are collected
    wait: 3000, // Or after 3 seconds
    getShouldExecute: (items, batcher) => items.includes(42), // Custom trigger
    onItemsChange: (batcher) => {
      console.log('Current batch:', batcher.peekAllItems())
    }
  }
)

// Add items to the batch
batcher.addItem(1)
batcher.addItem(2)
batcher.addItem(3)
// ...

// Manually process the current batch
batcher.execute()

// Control batching
batcher.stop()  // Pause batching
batcher.start() // Resume batching

批量处理选项

批量处理选项允许您自定义批次的处理方式和时间

  • maxSize:每个批次的最大项目数(默认值:Infinity
  • wait:在处理批次之前等待的最大时间(毫秒)(默认值:Infinity
  • getShouldExecute:自定义函数,用于确定是否应处理批次
  • onExecute:批次处理后的回调
  • onItemsChange:添加项目或处理批次后的回调
  • onIsRunningChange:批量处理器的运行状态发生变化时的回调
  • started:批量处理器是否立即开始运行(默认值:true

批量处理方法

Batcher 类提供几种用于批量处理管理的方法

ts
batcher.addItem(item)           // Add an item to the batch
batcher.execute()               // Manually process the current batch
batcher.stop()                  // Pause batching
batcher.start()                 // Resume batching
batcher.store.state.size        // Get current batch size
batcher.store.state.isEmpty     // Check if batch is empty
batcher.store.state.isRunning   // Check if batcher is running
batcher.peekAllItems()           // Get all items in the current batch
batcher.store.state.executionCount // Number of batches processed
batcher.store.state.totalItemsProcessed // Number of items processed
batcher.setOptions(opts)        // Update batcher options
batcher.flush()                 // Flush pending batch immediately

自定义批量处理触发器

您可以使用 getShouldExecute 根据自定义逻辑触发批次

ts
const batcher = new Batcher<number>(
  (items) => console.log('Processing batch:', items),
  {
    getShouldExecute: (items) => items.includes(99),
  }
)

batcher.addItem(1)
batcher.addItem(99) // Triggers batch processing immediately

动态配置

您可以在运行时更新批量处理选项

ts
batcher.setOptions({
  maxSize: 10,
  wait: 1000,
})

const options = batcher.getOptions()
console.log(options.maxSize) // 10

状态管理

Batcher 类使用 TanStack Store 进行反应式状态管理,提供对批量处理状态、执行次数和处理状态的实时访问。所有状态都存储在 TanStack Store 中,可以通过 batcher.store.state 访问,但是,如果您正在使用 React 或 Solid 等框架适配器,您将不想从这里读取状态。相反,您将从 batcher.state 读取状态,并提供一个选择器回调作为 useBatcher 钩子的第三个参数,以选择加入状态跟踪,如下所示。

状态选择器(框架适配器)

框架适配器支持以两种方式订阅状态更改

1. 使用 batcher.Subscribe 组件(推荐用于组件树订阅)

使用 Subscribe 组件订阅组件树深处的状态更改,而无需将选择器传递给钩子。这对于希望在子组件中订阅状态非常有用。

tsx
// Default behavior - no reactive state subscriptions at hook level
const batcher = useBatcher(processFn, { maxSize: 5, wait: 1000 })

// Subscribe to state changes deep in component tree using Subscribe component
<batcher.Subscribe selector={(state) => ({ size: state.size })}>
  {(state) => (
    <div>Batch size: {state.size}</div>
  )}
</batcher.Subscribe>

2. 使用 selector 参数(用于钩子级别订阅)

selector 参数允许您指定哪些状态更改将触发钩子级别上的反应式更新,从而通过在发生不相关的状态更改时防止不必要的更新来优化性能。

默认情况下,batcher.state 为空({}),因为选择器默认为空。 这是来自 TanStack Store useStore 的反应式状态存储的位置。您必须通过提供选择器函数来选择加入状态跟踪。

ts
// Default behavior - no reactive state subscriptions
const batcher = useBatcher(processFn, { maxSize: 5, wait: 1000 })
console.log(batcher.state) // {}

// Opt-in to re-render when size changes
const batcher = useBatcher(
  processFn, 
  { maxSize: 5, wait: 1000 },
  (state) => ({ size: state.size })
)
console.log(batcher.state.size) // Reactive value

// Multiple state properties
const batcher = useBatcher(
  processFn,
  { maxSize: 5, wait: 1000 },
  (state) => ({
    size: state.size,
    executionCount: state.executionCount,
    status: state.status
  })
)

初始状态

您可以在创建批量处理器时提供初始状态值。这通常用于从持久存储恢复状态

ts
// Load initial state from localStorage
const savedState = localStorage.getItem('batcher-state')
const initialState = savedState ? JSON.parse(savedState) : {}

const batcher = new Batcher(processFn, {
  maxSize: 5,
  wait: 1000,
  initialState
})

订阅状态更改

Store 是响应式的并支持订阅

ts
const batcher = new Batcher(processFn, { maxSize: 5, wait: 1000 })

// Subscribe to state changes
const unsubscribe = batcher.store.subscribe((state) => {
  // do something with the state like persist it to localStorage
})

// Unsubscribe when done
unsubscribe()

注意:当使用框架适配器时,这是不必要的,因为底层的 useStore 钩子已经执行了此操作。您还可以导入并使用 useStore 来自 TanStack Store,以便将 batcher.store.state 转换为具有自定义选择器的反应式状态,只要您需要即可。

可用状态属性

BatcherState 包括

  • executionCount:已完成的批量处理执行次数
  • isEmpty:批量处理器是否没有要处理的项目(项目数组为空)
  • isPending:批量处理器是否正在等待超时以触发批量处理
  • items:当前排队等待批量处理的项目数组
  • size:批量队列中当前的项目数
  • status:当前处理状态('idle' | 'pending')
  • totalItemsProcessed:所有批次处理的总项目数

刷新待处理批次

批量处理器支持刷新待处理批次以立即触发处理

ts
const batcher = new Batcher(processFn, { maxSize: 10, wait: 5000 })

batcher.addItem('item1')
batcher.addItem('item2')
console.log(batcher.store.state.isPending) // true

// Flush immediately instead of waiting
batcher.flush()
console.log(batcher.store.state.isEmpty) // true (batch was processed)

框架适配器

每个框架适配器都会围绕批量处理器类构建便捷的钩子和函数。像 useBatchercreateBatcher 这样的钩子是小型包装器,可以减少您在一些常见用例中的代码冗余。

对于异步批量处理,请参阅异步批量处理指南