框架
版本
防抖器 API 参考
节流器 API 参考
速率限制器 API 参考
队列 API 参考
批处理器 API 参考

异步批处理指南

批处理指南》中的所有核心概念同样适用于异步批处理。

何时使用异步批处理

虽然同步批处理在许多用例中效果很好,但异步批处理提供了额外的功能,在以下情况下特别有用:

  • 您需要捕获并使用批处理执行的返回值
  • 您的批处理涉及异步操作(API 调用、数据库操作、文件 I/O)
  • 您需要具有可配置错误行为的高级错误处理
  • 您希望单独跟踪成功/错误统计信息
  • 您需要监控批处理何时正在主动执行

TanStack Pacer 中的异步批处理

TanStack Pacer 通过 AsyncBatcher 类和 asyncBatch 函数提供异步批处理。与同步版本不同,异步批处理程序处理 Promises 并提供强大的错误处理功能。

使用 asyncBatch 的基本用法

asyncBatch 函数提供了一种创建异步批处理函数的方法

ts
import { asyncBatch } from '@tanstack/pacer'

const processAsyncBatch = asyncBatch<number>(
  async (items) => {
    // Process the batch asynchronously
    const results = await Promise.all(
      items.map(item => processApiCall(item))
    )
    return results
  },
  {
    maxSize: 3,
    wait: 2000,
    onSuccess: (results, batch, batcher) => {
      console.log('Batch completed successfully:', results)
      console.log('Processed batch:', batch)
      console.log('Total successes:', batcher.store.state.successCount)
    },
    onError: (error, batch, batcher) => {
      console.error('Batch failed:', error)
      console.log('Failed batch:', batch)
      console.log('Total errors:', batcher.store.state.errorCount)
    }
  }
)

// Add items to be batched
processAsyncBatch(1)
processAsyncBatch(2)
processAsyncBatch(3) // Triggers batch processing
import { asyncBatch } from '@tanstack/pacer'

const processAsyncBatch = asyncBatch<number>(
  async (items) => {
    // Process the batch asynchronously
    const results = await Promise.all(
      items.map(item => processApiCall(item))
    )
    return results
  },
  {
    maxSize: 3,
    wait: 2000,
    onSuccess: (results, batch, batcher) => {
      console.log('Batch completed successfully:', results)
      console.log('Processed batch:', batch)
      console.log('Total successes:', batcher.store.state.successCount)
    },
    onError: (error, batch, batcher) => {
      console.error('Batch failed:', error)
      console.log('Failed batch:', batch)
      console.log('Total errors:', batcher.store.state.errorCount)
    }
  }
)

// Add items to be batched
processAsyncBatch(1)
processAsyncBatch(2)
processAsyncBatch(3) // Triggers batch processing

注意: 在使用 React 时,建议使用 useAsyncBatchedCallback hook 而不是 asyncBatch 函数,以更好地与 React 的生命周期集成并自动清理。

使用 AsyncBatcher 类的进阶用法

为了更精确地控制异步批处理行为,请直接使用 AsyncBatcher 类。

ts
import { AsyncBatcher } from '@tanstack/pacer'

const batcher = new AsyncBatcher<number>(
  async (items) => {
    // Process the batch asynchronously
    const results = await Promise.all(
      items.map(item => processApiCall(item))
    )
    return results
  },
  {
    maxSize: 5,
    wait: 3000,
    onSuccess: (results, batch, batcher) => {
      console.log('Batch succeeded:', results)
      console.log('Processed batch:', batch)
    },
    onError: (error, batch, batcher) => {
      console.error('Batch failed:', error)
      console.log('Failed batch:', batch)
    }
  }
)

// Access current state via TanStack Store
console.log(batcher.store.state.successCount) // Number of successful batch executions
console.log(batcher.store.state.errorCount) // Number of failed batch executions
console.log(batcher.store.state.isExecuting) // Whether a batch is currently executing
console.log(batcher.store.state.lastResult) // Result from most recent batch

// Add items to the batch
batcher.addItem(1)
batcher.addItem(2)

// Control batch execution
batcher.stop()  // Stop processing
batcher.start() // Resume processing
import { AsyncBatcher } from '@tanstack/pacer'

const batcher = new AsyncBatcher<number>(
  async (items) => {
    // Process the batch asynchronously
    const results = await Promise.all(
      items.map(item => processApiCall(item))
    )
    return results
  },
  {
    maxSize: 5,
    wait: 3000,
    onSuccess: (results, batch, batcher) => {
      console.log('Batch succeeded:', results)
      console.log('Processed batch:', batch)
    },
    onError: (error, batch, batcher) => {
      console.error('Batch failed:', error)
      console.log('Failed batch:', batch)
    }
  }
)

// Access current state via TanStack Store
console.log(batcher.store.state.successCount) // Number of successful batch executions
console.log(batcher.store.state.errorCount) // Number of failed batch executions
console.log(batcher.store.state.isExecuting) // Whether a batch is currently executing
console.log(batcher.store.state.lastResult) // Result from most recent batch

// Add items to the batch
batcher.addItem(1)
batcher.addItem(2)

// Control batch execution
batcher.stop()  // Stop processing
batcher.start() // Resume processing

与同步批处理的关键区别

1. 返回值处理

与返回 void 的同步批处理程序不同,异步版本允许您捕获并使用批处理函数的返回值。

ts
const batcher = new AsyncBatcher<string>(
  async (items) => {
    const results = await processBatch(items)
    return results
  },
  {
    maxSize: 5,
    onSuccess: (results, batch, batcher) => {
      // Handle the returned results
      console.log('Batch results:', results)
      console.log('Processed batch:', batch)
    }
  }
)
const batcher = new AsyncBatcher<string>(
  async (items) => {
    const results = await processBatch(items)
    return results
  },
  {
    maxSize: 5,
    onSuccess: (results, batch, batcher) => {
      // Handle the returned results
      console.log('Batch results:', results)
      console.log('Processed batch:', batch)
    }
  }
)

2. 错误处理

异步批处理程序提供全面的错误处理功能。

ts
const batcher = new AsyncBatcher<number>(
  async (items) => {
    // This might throw an error
    const results = await riskyBatchOperation(items)
    return results
  },
  {
    maxSize: 3,
    onError: (error, batch, batcher) => {
      // Handle batch errors
      console.error('Batch processing failed:', error)
      console.log('Items that failed:', batch)
      console.log('Total error count:', batcher.store.state.errorCount)
    },
    throwOnError: false, // Don't throw errors, just handle them
    onSuccess: (results, batch, batcher) => {
      console.log('Batch succeeded:', results)
      console.log('Processed batch:', batch)
      console.log('Total success count:', batcher.store.state.successCount)
    },
    onSettled: (batch, batcher) => {
      // Called after every batch (success or failure)
      console.log('Batch settled:', batch)
      console.log('Total batches:', batcher.store.state.settleCount)
    }
  }
)
const batcher = new AsyncBatcher<number>(
  async (items) => {
    // This might throw an error
    const results = await riskyBatchOperation(items)
    return results
  },
  {
    maxSize: 3,
    onError: (error, batch, batcher) => {
      // Handle batch errors
      console.error('Batch processing failed:', error)
      console.log('Items that failed:', batch)
      console.log('Total error count:', batcher.store.state.errorCount)
    },
    throwOnError: false, // Don't throw errors, just handle them
    onSuccess: (results, batch, batcher) => {
      console.log('Batch succeeded:', results)
      console.log('Processed batch:', batch)
      console.log('Total success count:', batcher.store.state.successCount)
    },
    onSettled: (batch, batcher) => {
      // Called after every batch (success or failure)
      console.log('Batch settled:', batch)
      console.log('Total batches:', batcher.store.state.settleCount)
    }
  }
)

3. 执行状态跟踪

异步批处理程序会跟踪批处理正在执行的时间。

ts
const batcher = new AsyncBatcher<number>(
  async (items) => {
    console.log('Starting batch execution...')
    const results = await longRunningBatchOperation(items)
    console.log('Batch execution completed')
    return results
  },
  {
    maxSize: 5,
    onItemsChange: (batcher) => {
      console.log('Is executing:', batcher.store.state.isExecuting)
      console.log('Items in queue:', batcher.store.state.size)
    }
  }
)
const batcher = new AsyncBatcher<number>(
  async (items) => {
    console.log('Starting batch execution...')
    const results = await longRunningBatchOperation(items)
    console.log('Batch execution completed')
    return results
  },
  {
    maxSize: 5,
    onItemsChange: (batcher) => {
      console.log('Is executing:', batcher.store.state.isExecuting)
      console.log('Items in queue:', batcher.store.state.size)
    }
  }
)

4. 不同的回调函数

AsyncBatcher 支持这些特定于异步的回调函数:

  • onSuccess:在每次成功的批次执行后调用,提供结果、已处理的批次项和批处理程序实例。
  • onError:在批次执行失败时调用,提供错误、失败的批次项和批处理程序实例。
  • onSettled:在每次批次执行(成功或失败)后调用,提供已处理的批次项和批处理程序实例。
  • onExecute:在每次批次执行后调用,提供已处理的批次项和批处理程序实例(与同步批处理程序相同)。
  • onItemsChange:在添加项目或处理批次时调用。

错误处理选项

异步批处理程序通过 throwOnError 选项提供灵活的错误处理。

ts
const batcher = new AsyncBatcher<number>(
  async (items) => {
    // This might throw an error
    throw new Error('Batch processing failed')
  },
  {
    maxSize: 3,
    onError: (error, batch, batcher) => {
      console.error('Handling error:', error)
    },
    throwOnError: true, // Will throw errors even with onError handler
    // throwOnError: false, // Will swallow errors (default if onError is provided)
    // throwOnError: undefined, // Uses default behavior based on onError presence
  }
)
const batcher = new AsyncBatcher<number>(
  async (items) => {
    // This might throw an error
    throw new Error('Batch processing failed')
  },
  {
    maxSize: 3,
    onError: (error, batch, batcher) => {
      console.error('Handling error:', error)
    },
    throwOnError: true, // Will throw errors even with onError handler
    // throwOnError: false, // Will swallow errors (default if onError is provided)
    // throwOnError: undefined, // Uses default behavior based on onError presence
  }
)
  • 默认行为:如果未提供 onError 处理程序,则 throwOnErrortrue;如果提供了 onError 处理程序,则为 false
  • 使用 onError 处理程序时:首先调用处理程序,然后如果 throwOnErrortrue,则抛出错误。
  • 错误状态:失败的项目在 failedItems 数组中跟踪,可以通过 peekFailedItems() 访问。onError 回调函数接收的是失败的批次项,而不是累积的失败项。

动态选项

与同步批处理程序一样,异步批处理程序支持动态选项。

ts
const batcher = new AsyncBatcher<number>(
  async (items) => {
    return await processBatch(items)
  },
  {
    // Dynamic batch size based on success rate
    maxSize: (batcher) => {
      const successRate = batcher.store.state.successCount / Math.max(1, batcher.store.state.settleCount)
      return successRate > 0.8 ? 10 : 5 // Larger batches if success rate is high
    },
    // Dynamic wait time based on error count
    wait: (batcher) => {
      return batcher.store.state.errorCount > 5 ? 5000 : 2000 // Wait longer if errors are frequent
    }
  }
)
const batcher = new AsyncBatcher<number>(
  async (items) => {
    return await processBatch(items)
  },
  {
    // Dynamic batch size based on success rate
    maxSize: (batcher) => {
      const successRate = batcher.store.state.successCount / Math.max(1, batcher.store.state.settleCount)
      return successRate > 0.8 ? 10 : 5 // Larger batches if success rate is high
    },
    // Dynamic wait time based on error count
    wait: (batcher) => {
      return batcher.store.state.errorCount > 5 ? 5000 : 2000 // Wait longer if errors are frequent
    }
  }
)

刷新待处理的批次

异步批处理程序支持刷新待处理的批次以立即触发处理。

ts
const batcher = new AsyncBatcher(asyncBatchFn, { maxSize: 10, wait: 5000 })

batcher.addItem('item1')
batcher.addItem('item2')
console.log(batcher.store.state.isPending) // true

// Flush immediately instead of waiting
const result = await batcher.flush()
console.log('Flush result:', result)
console.log(batcher.store.state.isEmpty) // true (batch was processed)
const batcher = new AsyncBatcher(asyncBatchFn, { maxSize: 10, wait: 5000 })

batcher.addItem('item1')
batcher.addItem('item2')
console.log(batcher.store.state.isPending) // true

// Flush immediately instead of waiting
const result = await batcher.flush()
console.log('Flush result:', result)
console.log(batcher.store.state.isEmpty) // true (batch was processed)

状态管理

AsyncBatcher 类使用 TanStack Store 进行响应式状态管理,提供对批处理执行状态、错误跟踪和处理统计信息的实时访问。所有状态都存储在 TanStack Store 中,可以通过 asyncBatcher.store.state 访问。但是,如果您使用的是 React 或 Solid 等框架适配器,则不应在此处读取状态。而是应从 asyncBatcher.state 读取状态,并为 useAsyncBatcher hook 提供第三个参数(选择器回调),以选择加入状态跟踪,如下所示。

状态选择器(框架适配器)

框架适配器支持一个 selector 参数,允许您指定哪些状态更改将触发重新渲染。这通过防止不相关的状态更改导致不必要的重新渲染来优化性能。

默认情况下,asyncBatcher.state 为空({}),因为选择器默认为空。 这里存储了来自 TanStack Store 的响应式状态 useStore。您必须通过提供选择器函数来选择加入状态跟踪。

ts
// Default behavior - no reactive state subscriptions
const batcher = useAsyncBatcher(asyncBatchFn, { maxSize: 5, wait: 1000 })
console.log(batcher.state) // {}

// Opt-in to re-render when isExecuting changes
const batcher = useAsyncBatcher(
  asyncBatchFn, 
  { maxSize: 5, wait: 1000 },
  (state) => ({ isExecuting: state.isExecuting })
)
console.log(batcher.state.isExecuting) // Reactive value

// Multiple state properties
const batcher = useAsyncBatcher(
  asyncBatchFn,
  { maxSize: 5, wait: 1000 },
  (state) => ({
    isExecuting: state.isExecuting,
    successCount: state.successCount,
    errorCount: state.errorCount
  })
)
// Default behavior - no reactive state subscriptions
const batcher = useAsyncBatcher(asyncBatchFn, { maxSize: 5, wait: 1000 })
console.log(batcher.state) // {}

// Opt-in to re-render when isExecuting changes
const batcher = useAsyncBatcher(
  asyncBatchFn, 
  { maxSize: 5, wait: 1000 },
  (state) => ({ isExecuting: state.isExecuting })
)
console.log(batcher.state.isExecuting) // Reactive value

// Multiple state properties
const batcher = useAsyncBatcher(
  asyncBatchFn,
  { maxSize: 5, wait: 1000 },
  (state) => ({
    isExecuting: state.isExecuting,
    successCount: state.successCount,
    errorCount: state.errorCount
  })
)

初始状态

您可以在创建异步批处理程序时提供初始状态值。这通常用于从持久存储恢复状态。

ts
// Load initial state from localStorage
const savedState = localStorage.getItem('async-batcher-state')
const initialState = savedState ? JSON.parse(savedState) : {}

const batcher = new AsyncBatcher(asyncBatchFn, {
  maxSize: 5,
  wait: 1000,
  initialState
})
// Load initial state from localStorage
const savedState = localStorage.getItem('async-batcher-state')
const initialState = savedState ? JSON.parse(savedState) : {}

const batcher = new AsyncBatcher(asyncBatchFn, {
  maxSize: 5,
  wait: 1000,
  initialState
})

订阅状态更改

Store 是响应式的并支持订阅

ts
const batcher = new AsyncBatcher(asyncBatchFn, { maxSize: 5, wait: 1000 })

// Subscribe to state changes
const unsubscribe = batcher.store.subscribe((state) => {
  // do something with the state like persist it to localStorage
})

// Unsubscribe when done
unsubscribe()
const batcher = new AsyncBatcher(asyncBatchFn, { maxSize: 5, wait: 1000 })

// Subscribe to state changes
const unsubscribe = batcher.store.subscribe((state) => {
  // do something with the state like persist it to localStorage
})

// Unsubscribe when done
unsubscribe()

注意: 使用框架适配器时不需要这样做,因为底层的 useStore hook 已经完成了这个功能。如果需要,您还可以从 TanStack Store 导入并使用 useStore,在任何需要的地方使用自定义选择器将 batcher.store.state 转换为响应式状态。

ts
const batcher = useAsyncBatcher(asyncBatchFn, { maxSize: 5, wait: 1000 })

// you could manually use the `useStore` hook to subscribe to state changes in whatever scope you want
const state = useStore(batcher.store, (state) => ({
  successCount: state.successCount,
}))

console.log(state)
const batcher = useAsyncBatcher(asyncBatchFn, { maxSize: 5, wait: 1000 })

// you could manually use the `useStore` hook to subscribe to state changes in whatever scope you want
const state = useStore(batcher.store, (state) => ({
  successCount: state.successCount,
}))

console.log(state)

可用状态属性

AsyncBatcherState 包括:

  • errorCount:导致错误的批次执行次数。
  • failedItems:批处理过程中失败的项目数组。
  • isEmpty:批处理程序是否没有要处理的项目(items 数组为空)。
  • isExecuting:当前是否正在异步处理批次。
  • isPending: batcher 是否正在等待超时触发批处理
  • items: 当前排队等待批量处理的项目数组
  • lastResult:最近一次批次执行的结果。
  • settleCount:已完成(成功或失败)的批次执行次数。
  • size: 当前批处理队列中的项目数
  • status:当前处理状态('idle' | 'pending' | 'executing' | 'populated')。
  • successCount:已成功完成的批次执行次数。
  • totalItemsFailed:跨所有批次处理失败的总项目数。
  • totalItemsProcessed:跨所有批次处理的总项目数。

监控失败的项目

异步批处理程序会跟踪在批处理过程中失败的项目。

ts
const batcher = new AsyncBatcher<number>(
  async (items) => {
    // This might fail for some items
    if (items.some(item => item < 0)) {
      throw new Error('Negative numbers not allowed')
    }
    return await processBatch(items)
  },
  {
    maxSize: 3,
    onError: (error, batch, batcher) => {
      console.log('Failed batch:', batch)
      console.log('All failed items:', batcher.peekFailedItems())
    }
  }
)
const batcher = new AsyncBatcher<number>(
  async (items) => {
    // This might fail for some items
    if (items.some(item => item < 0)) {
      throw new Error('Negative numbers not allowed')
    }
    return await processBatch(items)
  },
  {
    maxSize: 3,
    onError: (error, batch, batcher) => {
      console.log('Failed batch:', batch)
      console.log('All failed items:', batcher.peekFailedItems())
    }
  }
)

框架适配器

每个框架适配器都提供基于核心异步批处理功能构建的 hook,用于与框架的状态管理系统集成。类似 useAsyncBatcher 的 hook 可用于每个框架。


有关核心批处理概念和同步批处理,请参阅批处理指南

我们的合作伙伴
Code Rabbit
Unkey
订阅 Bytes

您的每周 JavaScript 资讯。每周一免费发送给超过 10 万开发者。

Bytes

无垃圾邮件。您可以随时取消订阅。

订阅 Bytes

您的每周 JavaScript 资讯。每周一免费发送给超过 10 万开发者。

Bytes

无垃圾邮件。您可以随时取消订阅。