Collection Options Creator 是一个工厂函数,用于生成 TanStack DB collection 的配置选项。它提供了一种标准化的方式,将不同的同步引擎和数据源集成到 TanStack DB 的响应式、同步优先架构中。
Collection Options Creator 遵循一致的模式
你应该在以下情况创建自定义 Collection
注意:如果你只是访问 API 并返回数据,请改用 Query Collection。
每个 Collection Options Creator 都必须实现以下核心职责
定义一个扩展或包含标准 collection 属性的配置接口
// Pattern A: User provides handlers (Query / ElectricSQL style)
interface MyCollectionConfig<TItem extends object> {
// Your sync engine specific options
connectionUrl: string
apiKey?: string
// Standard collection properties
id?: string
schema?: StandardSchemaV1
getKey: (item: TItem) => string | number
sync?: SyncConfig<TItem>
rowUpdateMode?: 'partial' | 'full'
// User provides mutation handlers
onInsert?: InsertMutationFn<TItem>
onUpdate?: UpdateMutationFn<TItem>
onDelete?: DeleteMutationFn<TItem>
}
// Pattern B: Built-in handlers (Trailbase style)
interface MyCollectionConfig<TItem extends object>
extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete'> {
// Your sync engine specific options
recordApi: MyRecordApi<TItem>
connectionUrl: string
rowUpdateMode?: 'partial' | 'full'
// Note: onInsert/onUpdate/onDelete are implemented by your collection creator
}
// Pattern A: User provides handlers (Query / ElectricSQL style)
interface MyCollectionConfig<TItem extends object> {
// Your sync engine specific options
connectionUrl: string
apiKey?: string
// Standard collection properties
id?: string
schema?: StandardSchemaV1
getKey: (item: TItem) => string | number
sync?: SyncConfig<TItem>
rowUpdateMode?: 'partial' | 'full'
// User provides mutation handlers
onInsert?: InsertMutationFn<TItem>
onUpdate?: UpdateMutationFn<TItem>
onDelete?: DeleteMutationFn<TItem>
}
// Pattern B: Built-in handlers (Trailbase style)
interface MyCollectionConfig<TItem extends object>
extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete'> {
// Your sync engine specific options
recordApi: MyRecordApi<TItem>
connectionUrl: string
rowUpdateMode?: 'partial' | 'full'
// Note: onInsert/onUpdate/onDelete are implemented by your collection creator
}
Sync 函数是你 Collection 的核心。它必须:
Sync 函数必须返回一个清理函数,以实现 proper garbage collection
const sync: SyncConfig<T>['sync'] = (params) => {
const { begin, write, commit, markReady, collection } = params
// 1. Initialize connection to your sync engine
const connection = initializeConnection(config)
// 2. Set up real-time subscription FIRST (prevents race conditions)
const eventBuffer: Array<any> = []
let isInitialSyncComplete = false
connection.subscribe((event) => {
if (!isInitialSyncComplete) {
// Buffer events during initial sync to prevent race conditions
eventBuffer.push(event)
return
}
// Process real-time events
begin()
switch (event.type) {
case 'insert':
write({ type: 'insert', value: event.data })
break
case 'update':
write({ type: 'update', value: event.data })
break
case 'delete':
write({ type: 'delete', value: event.data })
break
}
commit()
})
// 3. Perform initial data fetch
async function initialSync() {
try {
const data = await fetchInitialData()
begin() // Start a transaction
for (const item of data) {
write({
type: 'insert',
value: item
})
}
commit() // Commit the transaction
// 4. Process buffered events
isInitialSyncComplete = true
if (eventBuffer.length > 0) {
begin()
for (const event of eventBuffer) {
// Deduplicate if necessary based on your sync engine
write({ type: event.type, value: event.data })
}
commit()
eventBuffer.splice(0)
}
} catch (error) {
console.error('Initial sync failed:', error)
throw error
} finally {
// ALWAYS call markReady, even on error
markReady()
}
}
initialSync()
// 4. Return cleanup function
return () => {
connection.close()
// Clean up any timers, intervals, or other resources
}
}
const sync: SyncConfig<T>['sync'] = (params) => {
const { begin, write, commit, markReady, collection } = params
// 1. Initialize connection to your sync engine
const connection = initializeConnection(config)
// 2. Set up real-time subscription FIRST (prevents race conditions)
const eventBuffer: Array<any> = []
let isInitialSyncComplete = false
connection.subscribe((event) => {
if (!isInitialSyncComplete) {
// Buffer events during initial sync to prevent race conditions
eventBuffer.push(event)
return
}
// Process real-time events
begin()
switch (event.type) {
case 'insert':
write({ type: 'insert', value: event.data })
break
case 'update':
write({ type: 'update', value: event.data })
break
case 'delete':
write({ type: 'delete', value: event.data })
break
}
commit()
})
// 3. Perform initial data fetch
async function initialSync() {
try {
const data = await fetchInitialData()
begin() // Start a transaction
for (const item of data) {
write({
type: 'insert',
value: item
})
}
commit() // Commit the transaction
// 4. Process buffered events
isInitialSyncComplete = true
if (eventBuffer.length > 0) {
begin()
for (const event of eventBuffer) {
// Deduplicate if necessary based on your sync engine
write({ type: event.type, value: event.data })
}
commit()
eventBuffer.splice(0)
}
} catch (error) {
console.error('Initial sync failed:', error)
throw error
} finally {
// ALWAYS call markReady, even on error
markReady()
}
}
initialSync()
// 4. Return cleanup function
return () => {
connection.close()
// Clean up any timers, intervals, or other resources
}
}
理解事务生命周期对于正确实现至关重要。
同步过程遵循以下生命周期
防止竞态条件:许多同步引擎在初始同步完成之前就开始实时订阅。你的实现必须去重通过订阅到达的、表示与初始同步相同数据的事件。考虑:
如果你的同步引擎返回不同类型的数据,请为特定字段提供转换函数
interface MyCollectionConfig<TItem, TRecord> {
// ... other config
// Only specify conversions for fields that need type conversion
parse: {
created_at: (ts: number) => new Date(ts * 1000), // timestamp -> Date
updated_at: (ts: number) => new Date(ts * 1000), // timestamp -> Date
metadata?: (str: string) => JSON.parse(str) // JSON string -> object
}
serialize: {
created_at: (date: Date) => Math.floor(date.valueOf() / 1000), // Date -> timestamp
updated_at: (date: Date) => Math.floor(date.valueOf() / 1000), // Date -> timestamp
metadata?: (obj: object) => JSON.stringify(obj) // object -> JSON string
}
}
interface MyCollectionConfig<TItem, TRecord> {
// ... other config
// Only specify conversions for fields that need type conversion
parse: {
created_at: (ts: number) => new Date(ts * 1000), // timestamp -> Date
updated_at: (ts: number) => new Date(ts * 1000), // timestamp -> Date
metadata?: (str: string) => JSON.parse(str) // JSON string -> object
}
serialize: {
created_at: (date: Date) => Math.floor(date.valueOf() / 1000), // Date -> timestamp
updated_at: (date: Date) => Math.floor(date.valueOf() / 1000), // Date -> timestamp
metadata?: (obj: object) => JSON.stringify(obj) // object -> JSON string
}
}
类型转换示例
// Firebase Timestamp to Date
parse: {
createdAt: (timestamp) => timestamp?.toDate?.() || new Date(timestamp),
updatedAt: (timestamp) => timestamp?.toDate?.() || new Date(timestamp),
}
// PostGIS geometry to GeoJSON
parse: {
location: (wkb: string) => parseWKBToGeoJSON(wkb)
}
// JSON string to object with error handling
parse: {
metadata: (str: string) => {
try {
return JSON.parse(str)
} catch {
return {}
}
}
}
// Firebase Timestamp to Date
parse: {
createdAt: (timestamp) => timestamp?.toDate?.() || new Date(timestamp),
updatedAt: (timestamp) => timestamp?.toDate?.() || new Date(timestamp),
}
// PostGIS geometry to GeoJSON
parse: {
location: (wkb: string) => parseWKBToGeoJSON(wkb)
}
// JSON string to object with error handling
parse: {
metadata: (str: string) => {
try {
return JSON.parse(str)
} catch {
return {}
}
}
}
Collection Options Creator 中处理 mutations 有两种不同的模式
用户在配置中提供 mutations 处理函数。你的 Collection Creator 将它们传递下去。
interface MyCollectionConfig<TItem extends object> {
// ... other config
// User provides these handlers
onInsert?: InsertMutationFn<TItem>
onUpdate?: UpdateMutationFn<TItem>
onDelete?: DeleteMutationFn<TItem>
}
export function myCollectionOptions<TItem extends object>(
config: MyCollectionConfig<TItem>
) {
return {
// ... other options
rowUpdateMode: config.rowUpdateMode || 'partial',
// Pass through user-provided handlers (possibly with additional logic)
onInsert: config.onInsert ? async (params) => {
const result = await config.onInsert!(params)
// Additional sync coordination logic
return result
} : undefined
}
}
interface MyCollectionConfig<TItem extends object> {
// ... other config
// User provides these handlers
onInsert?: InsertMutationFn<TItem>
onUpdate?: UpdateMutationFn<TItem>
onDelete?: DeleteMutationFn<TItem>
}
export function myCollectionOptions<TItem extends object>(
config: MyCollectionConfig<TItem>
) {
return {
// ... other options
rowUpdateMode: config.rowUpdateMode || 'partial',
// Pass through user-provided handlers (possibly with additional logic)
onInsert: config.onInsert ? async (params) => {
const result = await config.onInsert!(params)
// Additional sync coordination logic
return result
} : undefined
}
}
你的 Collection Creator 直接使用同步引擎的 API 实现处理函数。
interface MyCollectionConfig<TItem extends object>
extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete'> {
// ... sync engine specific config
// Note: onInsert/onUpdate/onDelete are NOT in the config
}
export function myCollectionOptions<TItem extends object>(
config: MyCollectionConfig<TItem>
) {
return {
// ... other options
rowUpdateMode: config.rowUpdateMode || 'partial',
// Implement handlers using sync engine APIs
onInsert: async ({ transaction }) => {
// Handle provider-specific batch limits (e.g., Firestore's 500 limit)
const chunks = chunkArray(transaction.mutations, PROVIDER_BATCH_LIMIT)
for (const chunk of chunks) {
const ids = await config.recordApi.createBulk(
chunk.map(m => serialize(m.modified))
)
await awaitIds(ids)
}
return transaction.mutations.map(m => m.key)
},
onUpdate: async ({ transaction }) => {
const chunks = chunkArray(transaction.mutations, PROVIDER_BATCH_LIMIT)
for (const chunk of chunks) {
await Promise.all(
chunk.map(m =>
config.recordApi.update(m.key, serialize(m.changes))
)
)
}
await awaitIds(transaction.mutations.map(m => String(m.key)))
}
}
}
interface MyCollectionConfig<TItem extends object>
extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete'> {
// ... sync engine specific config
// Note: onInsert/onUpdate/onDelete are NOT in the config
}
export function myCollectionOptions<TItem extends object>(
config: MyCollectionConfig<TItem>
) {
return {
// ... other options
rowUpdateMode: config.rowUpdateMode || 'partial',
// Implement handlers using sync engine APIs
onInsert: async ({ transaction }) => {
// Handle provider-specific batch limits (e.g., Firestore's 500 limit)
const chunks = chunkArray(transaction.mutations, PROVIDER_BATCH_LIMIT)
for (const chunk of chunks) {
const ids = await config.recordApi.createBulk(
chunk.map(m => serialize(m.modified))
)
await awaitIds(ids)
}
return transaction.mutations.map(m => m.key)
},
onUpdate: async ({ transaction }) => {
const chunks = chunkArray(transaction.mutations, PROVIDER_BATCH_LIMIT)
for (const chunk of chunks) {
await Promise.all(
chunk.map(m =>
config.recordApi.update(m.key, serialize(m.changes))
)
)
}
await awaitIds(transaction.mutations.map(m => String(m.key)))
}
}
}
许多 Provider 有批量大小限制(Firestore:500,DynamoDB:25 等),因此需要相应地分块大型事务。
当用户需要提供自己的 API 时选择模式 A,当你的同步引擎直接处理写入时选择模式 B。
Collections 支持两种更新模式
在你的同步配置中配置此项
sync: {
sync: syncFn,
rowUpdateMode: 'full' // or 'partial'
}
sync: {
sync: syncFn,
rowUpdateMode: 'full' // or 'partial'
}
有关完整、生产就绪的示例,请参阅 TanStack DB 仓库中的 Collection packages。
来自 Query Collection
来自 Trailbase Collection
来自 Electric Collection
这是一个基于 WebSocket 的 Collection Options Creator 的完整示例,演示了完整的往返流程。
import type {
CollectionConfig,
SyncConfig,
InsertMutationFnParams,
UpdateMutationFnParams,
DeleteMutationFnParams,
UtilsRecord
} from '@tanstack/db'
interface WebSocketMessage<T> {
type: 'insert' | 'update' | 'delete' | 'sync' | 'transaction' | 'ack'
data?: T | T[]
mutations?: Array<{
type: 'insert' | 'update' | 'delete'
data: T
id?: string
}>
transactionId?: string
id?: string
}
interface WebSocketCollectionConfig<TItem extends object>
extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete' | 'sync'> {
url: string
reconnectInterval?: number
// Note: onInsert/onUpdate/onDelete are handled by the WebSocket connection
// Users don't provide these handlers
}
interface WebSocketUtils extends UtilsRecord {
reconnect: () => void
getConnectionState: () => 'connected' | 'disconnected' | 'connecting'
}
export function webSocketCollectionOptions<TItem extends object>(
config: WebSocketCollectionConfig<TItem>
): CollectionConfig<TItem> & { utils: WebSocketUtils } {
let ws: WebSocket | null = null
let reconnectTimer: NodeJS.Timeout | null = null
let connectionState: 'connected' | 'disconnected' | 'connecting' = 'disconnected'
// Track pending transactions awaiting acknowledgment
const pendingTransactions = new Map<string, {
resolve: () => void
reject: (error: Error) => void
timeout: NodeJS.Timeout
}>()
const sync: SyncConfig<TItem>['sync'] = (params) => {
const { begin, write, commit, markReady } = params
function connect() {
connectionState = 'connecting'
ws = new WebSocket(config.url)
ws.onopen = () => {
connectionState = 'connected'
// Request initial sync
ws.send(JSON.stringify({ type: 'sync' }))
}
ws.onmessage = (event) => {
const message: WebSocketMessage<TItem> = JSON.parse(event.data)
switch (message.type) {
case 'sync':
// Initial sync with array of items
begin()
if (Array.isArray(message.data)) {
for (const item of message.data) {
write({ type: 'insert', value: item })
}
}
commit()
markReady()
break
case 'insert':
case 'update':
case 'delete':
// Real-time updates from other clients
begin()
write({
type: message.type,
value: message.data as TItem
})
commit()
break
case 'ack':
// Server acknowledged our transaction
if (message.transactionId) {
const pending = pendingTransactions.get(message.transactionId)
if (pending) {
clearTimeout(pending.timeout)
pendingTransactions.delete(message.transactionId)
pending.resolve()
}
}
break
case 'transaction':
// Server sending back the actual data after processing our transaction
if (message.mutations) {
begin()
for (const mutation of message.mutations) {
write({
type: mutation.type,
value: mutation.data
})
}
commit()
}
break
}
}
ws.onerror = (error) => {
console.error('WebSocket error:', error)
connectionState = 'disconnected'
}
ws.onclose = () => {
connectionState = 'disconnected'
// Auto-reconnect
if (!reconnectTimer) {
reconnectTimer = setTimeout(() => {
reconnectTimer = null
connect()
}, config.reconnectInterval || 5000)
}
}
}
// Start connection
connect()
// Return cleanup function
return () => {
if (reconnectTimer) {
clearTimeout(reconnectTimer)
reconnectTimer = null
}
if (ws) {
ws.close()
ws = null
}
}
}
// Helper function to send transaction and wait for server acknowledgment
const sendTransaction = async (
params: InsertMutationFnParams<TItem> | UpdateMutationFnParams<TItem> | DeleteMutationFnParams<TItem>
): Promise<void> => {
if (ws?.readyState !== WebSocket.OPEN) {
throw new Error('WebSocket not connected')
}
const transactionId = crypto.randomUUID()
// Convert all mutations in the transaction to the wire format
const mutations = params.transaction.mutations.map(mutation => ({
type: mutation.type,
id: mutation.key,
data: mutation.type === 'delete' ? undefined :
mutation.type === 'update' ? mutation.changes :
mutation.modified
}))
// Send the entire transaction at once
ws.send(JSON.stringify({
type: 'transaction',
transactionId,
mutations
}))
// Wait for server acknowledgment
return new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
pendingTransactions.delete(transactionId)
reject(new Error(`Transaction ${transactionId} timed out`))
}, 10000) // 10 second timeout
pendingTransactions.set(transactionId, {
resolve,
reject,
timeout
})
})
}
// All mutation handlers use the same transaction sender
const onInsert = async (params: InsertMutationFnParams<TItem>) => {
await sendTransaction(params)
}
const onUpdate = async (params: UpdateMutationFnParams<TItem>) => {
await sendTransaction(params)
}
const onDelete = async (params: DeleteMutationFnParams<TItem>) => {
await sendTransaction(params)
}
return {
id: config.id,
schema: config.schema,
getKey: config.getKey,
sync: { sync },
onInsert,
onUpdate,
onDelete,
utils: {
reconnect: () => {
if (ws) ws.close()
connect()
},
getConnectionState: () => connectionState
}
}
}
import type {
CollectionConfig,
SyncConfig,
InsertMutationFnParams,
UpdateMutationFnParams,
DeleteMutationFnParams,
UtilsRecord
} from '@tanstack/db'
interface WebSocketMessage<T> {
type: 'insert' | 'update' | 'delete' | 'sync' | 'transaction' | 'ack'
data?: T | T[]
mutations?: Array<{
type: 'insert' | 'update' | 'delete'
data: T
id?: string
}>
transactionId?: string
id?: string
}
interface WebSocketCollectionConfig<TItem extends object>
extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete' | 'sync'> {
url: string
reconnectInterval?: number
// Note: onInsert/onUpdate/onDelete are handled by the WebSocket connection
// Users don't provide these handlers
}
interface WebSocketUtils extends UtilsRecord {
reconnect: () => void
getConnectionState: () => 'connected' | 'disconnected' | 'connecting'
}
export function webSocketCollectionOptions<TItem extends object>(
config: WebSocketCollectionConfig<TItem>
): CollectionConfig<TItem> & { utils: WebSocketUtils } {
let ws: WebSocket | null = null
let reconnectTimer: NodeJS.Timeout | null = null
let connectionState: 'connected' | 'disconnected' | 'connecting' = 'disconnected'
// Track pending transactions awaiting acknowledgment
const pendingTransactions = new Map<string, {
resolve: () => void
reject: (error: Error) => void
timeout: NodeJS.Timeout
}>()
const sync: SyncConfig<TItem>['sync'] = (params) => {
const { begin, write, commit, markReady } = params
function connect() {
connectionState = 'connecting'
ws = new WebSocket(config.url)
ws.onopen = () => {
connectionState = 'connected'
// Request initial sync
ws.send(JSON.stringify({ type: 'sync' }))
}
ws.onmessage = (event) => {
const message: WebSocketMessage<TItem> = JSON.parse(event.data)
switch (message.type) {
case 'sync':
// Initial sync with array of items
begin()
if (Array.isArray(message.data)) {
for (const item of message.data) {
write({ type: 'insert', value: item })
}
}
commit()
markReady()
break
case 'insert':
case 'update':
case 'delete':
// Real-time updates from other clients
begin()
write({
type: message.type,
value: message.data as TItem
})
commit()
break
case 'ack':
// Server acknowledged our transaction
if (message.transactionId) {
const pending = pendingTransactions.get(message.transactionId)
if (pending) {
clearTimeout(pending.timeout)
pendingTransactions.delete(message.transactionId)
pending.resolve()
}
}
break
case 'transaction':
// Server sending back the actual data after processing our transaction
if (message.mutations) {
begin()
for (const mutation of message.mutations) {
write({
type: mutation.type,
value: mutation.data
})
}
commit()
}
break
}
}
ws.onerror = (error) => {
console.error('WebSocket error:', error)
connectionState = 'disconnected'
}
ws.onclose = () => {
connectionState = 'disconnected'
// Auto-reconnect
if (!reconnectTimer) {
reconnectTimer = setTimeout(() => {
reconnectTimer = null
connect()
}, config.reconnectInterval || 5000)
}
}
}
// Start connection
connect()
// Return cleanup function
return () => {
if (reconnectTimer) {
clearTimeout(reconnectTimer)
reconnectTimer = null
}
if (ws) {
ws.close()
ws = null
}
}
}
// Helper function to send transaction and wait for server acknowledgment
const sendTransaction = async (
params: InsertMutationFnParams<TItem> | UpdateMutationFnParams<TItem> | DeleteMutationFnParams<TItem>
): Promise<void> => {
if (ws?.readyState !== WebSocket.OPEN) {
throw new Error('WebSocket not connected')
}
const transactionId = crypto.randomUUID()
// Convert all mutations in the transaction to the wire format
const mutations = params.transaction.mutations.map(mutation => ({
type: mutation.type,
id: mutation.key,
data: mutation.type === 'delete' ? undefined :
mutation.type === 'update' ? mutation.changes :
mutation.modified
}))
// Send the entire transaction at once
ws.send(JSON.stringify({
type: 'transaction',
transactionId,
mutations
}))
// Wait for server acknowledgment
return new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
pendingTransactions.delete(transactionId)
reject(new Error(`Transaction ${transactionId} timed out`))
}, 10000) // 10 second timeout
pendingTransactions.set(transactionId, {
resolve,
reject,
timeout
})
})
}
// All mutation handlers use the same transaction sender
const onInsert = async (params: InsertMutationFnParams<TItem>) => {
await sendTransaction(params)
}
const onUpdate = async (params: UpdateMutationFnParams<TItem>) => {
await sendTransaction(params)
}
const onDelete = async (params: DeleteMutationFnParams<TItem>) => {
await sendTransaction(params)
}
return {
id: config.id,
schema: config.schema,
getKey: config.getKey,
sync: { sync },
onInsert,
onUpdate,
onDelete,
utils: {
reconnect: () => {
if (ws) ws.close()
connect()
},
getConnectionState: () => connectionState
}
}
}
import { createCollection } from '@tanstack/react-db'
import { webSocketCollectionOptions } from './websocket-collection'
const todos = createCollection(
webSocketCollectionOptions({
url: 'ws://:8080/todos',
getKey: (todo) => todo.id,
schema: todoSchema
// Note: No onInsert/onUpdate/onDelete - handled by WebSocket automatically
})
)
// Use the collection
todos.insert({ id: '1', text: 'Buy milk', completed: false })
// Access utilities
todos.utils.getConnectionState() // 'connected'
todos.utils.reconnect() // Force reconnect
import { createCollection } from '@tanstack/react-db'
import { webSocketCollectionOptions } from './websocket-collection'
const todos = createCollection(
webSocketCollectionOptions({
url: 'ws://:8080/todos',
getKey: (todo) => todo.id,
schema: todoSchema
// Note: No onInsert/onUpdate/onDelete - handled by WebSocket automatically
})
)
// Use the collection
todos.insert({ id: '1', text: 'Buy milk', completed: false })
// Access utilities
todos.utils.getConnectionState() // 'connected'
todos.utils.reconnect() // Force reconnect
同步优先应用中的一个关键挑战是知道何时丢弃乐观状态。当用户进行更改时:
关键问题是:如何知道第 4 步何时完成?
许多 Provider 提供内置方法来等待同步完成
// Firebase
await waitForPendingWrites(firestore)
// Custom WebSocket
await websocket.waitForAck(transactionId)
// Firebase
await waitForPendingWrites(firestore)
// Custom WebSocket
await websocket.waitForAck(transactionId)
ElectricSQL 返回的事务 ID,你可以进行追踪
// Track seen transaction IDs
const seenTxids = new Store<Set<number>>(new Set())
// In sync, track txids from incoming messages
if (message.headers.txids) {
message.headers.txids.forEach(txid => {
seenTxids.setState(prev => new Set([...prev, txid]))
})
}
// Mutation handlers return txids and wait for them
const wrappedOnInsert = async (params) => {
const result = await config.onInsert!(params)
// Wait for the txid to appear in synced data
if (result.txid) {
await awaitTxId(result.txid)
}
return result
}
// Utility function to wait for a txid
const awaitTxId = (txId: number): Promise<boolean> => {
if (seenTxids.state.has(txId)) return Promise.resolve(true)
return new Promise((resolve) => {
const unsubscribe = seenTxids.subscribe(() => {
if (seenTxids.state.has(txId)) {
unsubscribe()
resolve(true)
}
})
})
}
// Track seen transaction IDs
const seenTxids = new Store<Set<number>>(new Set())
// In sync, track txids from incoming messages
if (message.headers.txids) {
message.headers.txids.forEach(txid => {
seenTxids.setState(prev => new Set([...prev, txid]))
})
}
// Mutation handlers return txids and wait for them
const wrappedOnInsert = async (params) => {
const result = await config.onInsert!(params)
// Wait for the txid to appear in synced data
if (result.txid) {
await awaitTxId(result.txid)
}
return result
}
// Utility function to wait for a txid
const awaitTxId = (txId: number): Promise<boolean> => {
if (seenTxids.state.has(txId)) return Promise.resolve(true)
return new Promise((resolve) => {
const unsubscribe = seenTxids.subscribe(() => {
if (seenTxids.state.has(txId)) {
unsubscribe()
resolve(true)
}
})
})
}
Trailbase 追踪特定记录 ID 何时已同步
// Track synced IDs with timestamps
const seenIds = new Store(new Map<string, number>())
// In sync, mark IDs as seen
write({ type: 'insert', value: item })
seenIds.setState(prev => new Map(prev).set(item.id, Date.now()))
// Wait for specific IDs after mutations
const wrappedOnInsert = async (params) => {
const ids = await config.recordApi.createBulk(items)
// Wait for all IDs to be synced back
await awaitIds(ids)
}
const awaitIds = (ids: string[]): Promise<void> => {
const allSynced = ids.every(id => seenIds.state.has(id))
if (allSynced) return Promise.resolve()
return new Promise((resolve) => {
const unsubscribe = seenIds.subscribe((state) => {
if (ids.every(id => state.has(id))) {
unsubscribe()
resolve()
}
})
})
}
// Track synced IDs with timestamps
const seenIds = new Store(new Map<string, number>())
// In sync, mark IDs as seen
write({ type: 'insert', value: item })
seenIds.setState(prev => new Map(prev).set(item.id, Date.now()))
// Wait for specific IDs after mutations
const wrappedOnInsert = async (params) => {
const ids = await config.recordApi.createBulk(items)
// Wait for all IDs to be synced back
await awaitIds(ids)
}
const awaitIds = (ids: string[]): Promise<void> => {
const allSynced = ids.every(id => seenIds.state.has(id))
if (allSynced) return Promise.resolve()
return new Promise((resolve) => {
const unsubscribe = seenIds.subscribe((state) => {
if (ids.every(id => state.has(id))) {
unsubscribe()
resolve()
}
})
})
}
追踪版本号或时间戳以检测数据是否最新
// Track latest sync timestamp
let lastSyncTime = 0
// In mutations, record when the operation was sent
const wrappedOnUpdate = async (params) => {
const mutationTime = Date.now()
await config.onUpdate(params)
// Wait for sync to catch up
await waitForSync(mutationTime)
}
const waitForSync = (afterTime: number): Promise<void> => {
if (lastSyncTime > afterTime) return Promise.resolve()
return new Promise((resolve) => {
const check = setInterval(() => {
if (lastSyncTime > afterTime) {
clearInterval(check)
resolve()
}
}, 100)
})
}
// Track latest sync timestamp
let lastSyncTime = 0
// In mutations, record when the operation was sent
const wrappedOnUpdate = async (params) => {
const mutationTime = Date.now()
await config.onUpdate(params)
// Wait for sync to catch up
await waitForSync(mutationTime)
}
const waitForSync = (afterTime: number): Promise<void> => {
if (lastSyncTime > afterTime) return Promise.resolve()
return new Promise((resolve) => {
const check = setInterval(() => {
if (lastSyncTime > afterTime) {
clearInterval(check)
resolve()
}
}, 100)
})
}
Query Collection 在 mutations 后简单地重取所有数据
const wrappedOnInsert = async (params) => {
// Perform the mutation
await config.onInsert(params)
// Refetch the entire collection
await refetch()
// The refetch will trigger sync with fresh data,
// automatically dropping optimistic state
}
const wrappedOnInsert = async (params) => {
// Perform the mutation
await config.onInsert(params)
// Refetch the entire collection
await refetch()
// The refetch will trigger sync with fresh data,
// automatically dropping optimistic state
}
使用以下方式测试你的 Collection Options Creator:
创建 Collection Options Creator 允许你将任何同步引擎集成到 TanStack DB 强大的同步优先架构中。遵循此处显示的模式,你将获得一个健壮、类型安全的集成,提供出色的开发者体验。
您的每周 JavaScript 资讯。每周一免费发送给超过 10 万开发者。