《批处理指南》中的所有核心概念同样适用于异步批处理。
虽然同步批处理在许多用例中效果很好,但异步批处理提供了额外的功能,在以下情况下特别有用:
TanStack Pacer 通过 AsyncBatcher 类和 asyncBatch 函数提供异步批处理。与同步版本不同,异步批处理程序处理 Promises 并提供强大的错误处理功能。
asyncBatch
函数提供了一种创建异步批处理函数的方法
import { asyncBatch } from '@tanstack/pacer'
const processAsyncBatch = asyncBatch<number>(
async (items) => {
// Process the batch asynchronously
const results = await Promise.all(
items.map(item => processApiCall(item))
)
return results
},
{
maxSize: 3,
wait: 2000,
onSuccess: (results, batch, batcher) => {
console.log('Batch completed successfully:', results)
console.log('Processed batch:', batch)
console.log('Total successes:', batcher.store.state.successCount)
},
onError: (error, batch, batcher) => {
console.error('Batch failed:', error)
console.log('Failed batch:', batch)
console.log('Total errors:', batcher.store.state.errorCount)
}
}
)
// Add items to be batched
processAsyncBatch(1)
processAsyncBatch(2)
processAsyncBatch(3) // Triggers batch processing
import { asyncBatch } from '@tanstack/pacer'
const processAsyncBatch = asyncBatch<number>(
async (items) => {
// Process the batch asynchronously
const results = await Promise.all(
items.map(item => processApiCall(item))
)
return results
},
{
maxSize: 3,
wait: 2000,
onSuccess: (results, batch, batcher) => {
console.log('Batch completed successfully:', results)
console.log('Processed batch:', batch)
console.log('Total successes:', batcher.store.state.successCount)
},
onError: (error, batch, batcher) => {
console.error('Batch failed:', error)
console.log('Failed batch:', batch)
console.log('Total errors:', batcher.store.state.errorCount)
}
}
)
// Add items to be batched
processAsyncBatch(1)
processAsyncBatch(2)
processAsyncBatch(3) // Triggers batch processing
注意: 在使用 React 时,建议使用 useAsyncBatchedCallback hook 而不是 asyncBatch 函数,以更好地与 React 的生命周期集成并自动清理。
为了更精确地控制异步批处理行为,请直接使用 AsyncBatcher 类。
import { AsyncBatcher } from '@tanstack/pacer'
const batcher = new AsyncBatcher<number>(
async (items) => {
// Process the batch asynchronously
const results = await Promise.all(
items.map(item => processApiCall(item))
)
return results
},
{
maxSize: 5,
wait: 3000,
onSuccess: (results, batch, batcher) => {
console.log('Batch succeeded:', results)
console.log('Processed batch:', batch)
},
onError: (error, batch, batcher) => {
console.error('Batch failed:', error)
console.log('Failed batch:', batch)
}
}
)
// Access current state via TanStack Store
console.log(batcher.store.state.successCount) // Number of successful batch executions
console.log(batcher.store.state.errorCount) // Number of failed batch executions
console.log(batcher.store.state.isExecuting) // Whether a batch is currently executing
console.log(batcher.store.state.lastResult) // Result from most recent batch
// Add items to the batch
batcher.addItem(1)
batcher.addItem(2)
// Control batch execution
batcher.stop() // Stop processing
batcher.start() // Resume processing
import { AsyncBatcher } from '@tanstack/pacer'
const batcher = new AsyncBatcher<number>(
async (items) => {
// Process the batch asynchronously
const results = await Promise.all(
items.map(item => processApiCall(item))
)
return results
},
{
maxSize: 5,
wait: 3000,
onSuccess: (results, batch, batcher) => {
console.log('Batch succeeded:', results)
console.log('Processed batch:', batch)
},
onError: (error, batch, batcher) => {
console.error('Batch failed:', error)
console.log('Failed batch:', batch)
}
}
)
// Access current state via TanStack Store
console.log(batcher.store.state.successCount) // Number of successful batch executions
console.log(batcher.store.state.errorCount) // Number of failed batch executions
console.log(batcher.store.state.isExecuting) // Whether a batch is currently executing
console.log(batcher.store.state.lastResult) // Result from most recent batch
// Add items to the batch
batcher.addItem(1)
batcher.addItem(2)
// Control batch execution
batcher.stop() // Stop processing
batcher.start() // Resume processing
与返回 void 的同步批处理程序不同,异步版本允许您捕获并使用批处理函数的返回值。
const batcher = new AsyncBatcher<string>(
async (items) => {
const results = await processBatch(items)
return results
},
{
maxSize: 5,
onSuccess: (results, batch, batcher) => {
// Handle the returned results
console.log('Batch results:', results)
console.log('Processed batch:', batch)
}
}
)
const batcher = new AsyncBatcher<string>(
async (items) => {
const results = await processBatch(items)
return results
},
{
maxSize: 5,
onSuccess: (results, batch, batcher) => {
// Handle the returned results
console.log('Batch results:', results)
console.log('Processed batch:', batch)
}
}
)
异步批处理程序提供全面的错误处理功能。
const batcher = new AsyncBatcher<number>(
async (items) => {
// This might throw an error
const results = await riskyBatchOperation(items)
return results
},
{
maxSize: 3,
onError: (error, batch, batcher) => {
// Handle batch errors
console.error('Batch processing failed:', error)
console.log('Items that failed:', batch)
console.log('Total error count:', batcher.store.state.errorCount)
},
throwOnError: false, // Don't throw errors, just handle them
onSuccess: (results, batch, batcher) => {
console.log('Batch succeeded:', results)
console.log('Processed batch:', batch)
console.log('Total success count:', batcher.store.state.successCount)
},
onSettled: (batch, batcher) => {
// Called after every batch (success or failure)
console.log('Batch settled:', batch)
console.log('Total batches:', batcher.store.state.settleCount)
}
}
)
const batcher = new AsyncBatcher<number>(
async (items) => {
// This might throw an error
const results = await riskyBatchOperation(items)
return results
},
{
maxSize: 3,
onError: (error, batch, batcher) => {
// Handle batch errors
console.error('Batch processing failed:', error)
console.log('Items that failed:', batch)
console.log('Total error count:', batcher.store.state.errorCount)
},
throwOnError: false, // Don't throw errors, just handle them
onSuccess: (results, batch, batcher) => {
console.log('Batch succeeded:', results)
console.log('Processed batch:', batch)
console.log('Total success count:', batcher.store.state.successCount)
},
onSettled: (batch, batcher) => {
// Called after every batch (success or failure)
console.log('Batch settled:', batch)
console.log('Total batches:', batcher.store.state.settleCount)
}
}
)
异步批处理程序会跟踪批处理正在执行的时间。
const batcher = new AsyncBatcher<number>(
async (items) => {
console.log('Starting batch execution...')
const results = await longRunningBatchOperation(items)
console.log('Batch execution completed')
return results
},
{
maxSize: 5,
onItemsChange: (batcher) => {
console.log('Is executing:', batcher.store.state.isExecuting)
console.log('Items in queue:', batcher.store.state.size)
}
}
)
const batcher = new AsyncBatcher<number>(
async (items) => {
console.log('Starting batch execution...')
const results = await longRunningBatchOperation(items)
console.log('Batch execution completed')
return results
},
{
maxSize: 5,
onItemsChange: (batcher) => {
console.log('Is executing:', batcher.store.state.isExecuting)
console.log('Items in queue:', batcher.store.state.size)
}
}
)
AsyncBatcher
支持这些特定于异步的回调函数:
异步批处理程序通过 throwOnError 选项提供灵活的错误处理。
const batcher = new AsyncBatcher<number>(
async (items) => {
// This might throw an error
throw new Error('Batch processing failed')
},
{
maxSize: 3,
onError: (error, batch, batcher) => {
console.error('Handling error:', error)
},
throwOnError: true, // Will throw errors even with onError handler
// throwOnError: false, // Will swallow errors (default if onError is provided)
// throwOnError: undefined, // Uses default behavior based on onError presence
}
)
const batcher = new AsyncBatcher<number>(
async (items) => {
// This might throw an error
throw new Error('Batch processing failed')
},
{
maxSize: 3,
onError: (error, batch, batcher) => {
console.error('Handling error:', error)
},
throwOnError: true, // Will throw errors even with onError handler
// throwOnError: false, // Will swallow errors (default if onError is provided)
// throwOnError: undefined, // Uses default behavior based on onError presence
}
)
onError
回调函数接收的是失败的批次项,而不是累积的失败项。与同步批处理程序一样,异步批处理程序支持动态选项。
const batcher = new AsyncBatcher<number>(
async (items) => {
return await processBatch(items)
},
{
// Dynamic batch size based on success rate
maxSize: (batcher) => {
const successRate = batcher.store.state.successCount / Math.max(1, batcher.store.state.settleCount)
return successRate > 0.8 ? 10 : 5 // Larger batches if success rate is high
},
// Dynamic wait time based on error count
wait: (batcher) => {
return batcher.store.state.errorCount > 5 ? 5000 : 2000 // Wait longer if errors are frequent
}
}
)
const batcher = new AsyncBatcher<number>(
async (items) => {
return await processBatch(items)
},
{
// Dynamic batch size based on success rate
maxSize: (batcher) => {
const successRate = batcher.store.state.successCount / Math.max(1, batcher.store.state.settleCount)
return successRate > 0.8 ? 10 : 5 // Larger batches if success rate is high
},
// Dynamic wait time based on error count
wait: (batcher) => {
return batcher.store.state.errorCount > 5 ? 5000 : 2000 // Wait longer if errors are frequent
}
}
)
异步批处理程序支持刷新待处理的批次以立即触发处理。
const batcher = new AsyncBatcher(asyncBatchFn, { maxSize: 10, wait: 5000 })
batcher.addItem('item1')
batcher.addItem('item2')
console.log(batcher.store.state.isPending) // true
// Flush immediately instead of waiting
const result = await batcher.flush()
console.log('Flush result:', result)
console.log(batcher.store.state.isEmpty) // true (batch was processed)
const batcher = new AsyncBatcher(asyncBatchFn, { maxSize: 10, wait: 5000 })
batcher.addItem('item1')
batcher.addItem('item2')
console.log(batcher.store.state.isPending) // true
// Flush immediately instead of waiting
const result = await batcher.flush()
console.log('Flush result:', result)
console.log(batcher.store.state.isEmpty) // true (batch was processed)
AsyncBatcher
类使用 TanStack Store 进行响应式状态管理,提供对批处理执行状态、错误跟踪和处理统计信息的实时访问。所有状态都存储在 TanStack Store 中,可以通过 asyncBatcher.store.state 访问。但是,如果您使用的是 React 或 Solid 等框架适配器,则不应在此处读取状态。而是应从 asyncBatcher.state 读取状态,并为 useAsyncBatcher hook 提供第三个参数(选择器回调),以选择加入状态跟踪,如下所示。
框架适配器支持一个 selector 参数,允许您指定哪些状态更改将触发重新渲染。这通过防止不相关的状态更改导致不必要的重新渲染来优化性能。
默认情况下,asyncBatcher.state 为空({}),因为选择器默认为空。 这里存储了来自 TanStack Store 的响应式状态 useStore。您必须通过提供选择器函数来选择加入状态跟踪。
// Default behavior - no reactive state subscriptions
const batcher = useAsyncBatcher(asyncBatchFn, { maxSize: 5, wait: 1000 })
console.log(batcher.state) // {}
// Opt-in to re-render when isExecuting changes
const batcher = useAsyncBatcher(
asyncBatchFn,
{ maxSize: 5, wait: 1000 },
(state) => ({ isExecuting: state.isExecuting })
)
console.log(batcher.state.isExecuting) // Reactive value
// Multiple state properties
const batcher = useAsyncBatcher(
asyncBatchFn,
{ maxSize: 5, wait: 1000 },
(state) => ({
isExecuting: state.isExecuting,
successCount: state.successCount,
errorCount: state.errorCount
})
)
// Default behavior - no reactive state subscriptions
const batcher = useAsyncBatcher(asyncBatchFn, { maxSize: 5, wait: 1000 })
console.log(batcher.state) // {}
// Opt-in to re-render when isExecuting changes
const batcher = useAsyncBatcher(
asyncBatchFn,
{ maxSize: 5, wait: 1000 },
(state) => ({ isExecuting: state.isExecuting })
)
console.log(batcher.state.isExecuting) // Reactive value
// Multiple state properties
const batcher = useAsyncBatcher(
asyncBatchFn,
{ maxSize: 5, wait: 1000 },
(state) => ({
isExecuting: state.isExecuting,
successCount: state.successCount,
errorCount: state.errorCount
})
)
您可以在创建异步批处理程序时提供初始状态值。这通常用于从持久存储恢复状态。
// Load initial state from localStorage
const savedState = localStorage.getItem('async-batcher-state')
const initialState = savedState ? JSON.parse(savedState) : {}
const batcher = new AsyncBatcher(asyncBatchFn, {
maxSize: 5,
wait: 1000,
initialState
})
// Load initial state from localStorage
const savedState = localStorage.getItem('async-batcher-state')
const initialState = savedState ? JSON.parse(savedState) : {}
const batcher = new AsyncBatcher(asyncBatchFn, {
maxSize: 5,
wait: 1000,
initialState
})
Store 是响应式的并支持订阅
const batcher = new AsyncBatcher(asyncBatchFn, { maxSize: 5, wait: 1000 })
// Subscribe to state changes
const unsubscribe = batcher.store.subscribe((state) => {
// do something with the state like persist it to localStorage
})
// Unsubscribe when done
unsubscribe()
const batcher = new AsyncBatcher(asyncBatchFn, { maxSize: 5, wait: 1000 })
// Subscribe to state changes
const unsubscribe = batcher.store.subscribe((state) => {
// do something with the state like persist it to localStorage
})
// Unsubscribe when done
unsubscribe()
注意: 使用框架适配器时不需要这样做,因为底层的 useStore hook 已经完成了这个功能。如果需要,您还可以从 TanStack Store 导入并使用 useStore,在任何需要的地方使用自定义选择器将 batcher.store.state 转换为响应式状态。
const batcher = useAsyncBatcher(asyncBatchFn, { maxSize: 5, wait: 1000 })
// you could manually use the `useStore` hook to subscribe to state changes in whatever scope you want
const state = useStore(batcher.store, (state) => ({
successCount: state.successCount,
}))
console.log(state)
const batcher = useAsyncBatcher(asyncBatchFn, { maxSize: 5, wait: 1000 })
// you could manually use the `useStore` hook to subscribe to state changes in whatever scope you want
const state = useStore(batcher.store, (state) => ({
successCount: state.successCount,
}))
console.log(state)
AsyncBatcherState
包括:
异步批处理程序会跟踪在批处理过程中失败的项目。
const batcher = new AsyncBatcher<number>(
async (items) => {
// This might fail for some items
if (items.some(item => item < 0)) {
throw new Error('Negative numbers not allowed')
}
return await processBatch(items)
},
{
maxSize: 3,
onError: (error, batch, batcher) => {
console.log('Failed batch:', batch)
console.log('All failed items:', batcher.peekFailedItems())
}
}
)
const batcher = new AsyncBatcher<number>(
async (items) => {
// This might fail for some items
if (items.some(item => item < 0)) {
throw new Error('Negative numbers not allowed')
}
return await processBatch(items)
},
{
maxSize: 3,
onError: (error, batch, batcher) => {
console.log('Failed batch:', batch)
console.log('All failed items:', batcher.peekFailedItems())
}
}
)
每个框架适配器都提供基于核心异步批处理功能构建的 hook,用于与框架的状态管理系统集成。类似 useAsyncBatcher 的 hook 可用于每个框架。
有关核心批处理概念和同步批处理,请参阅批处理指南。
您的每周 JavaScript 资讯。每周一免费发送给超过 10 万开发者。