查询集合

查询集合

查询集合可实现 TanStack DB 和 TanStack Query 之间的无缝集成,能够自动同步本地数据库和远程数据源。

概述

@tanstack/query-db-collection 包允许您创建集合,这些集合可以:

  • 通过 TanStack Query 自动与远程数据同步
  • 支持乐观更新,并在出错时自动回滚
  • 通过可定制的突变处理器处理持久化
  • 提供直接写入功能,用于直接写入同步存储

安装

bash
npm install @tanstack/query-db-collection @tanstack/query-core @tanstack/db
npm install @tanstack/query-db-collection @tanstack/query-core @tanstack/db

基本用法

typescript
import { QueryClient } from '@tanstack/query-core'
import { createCollection } from '@tanstack/db'
import { queryCollectionOptions } from '@tanstack/query-db-collection'

const queryClient = new QueryClient()

const todosCollection = createCollection(
  queryCollectionOptions({
    queryKey: ['todos'],
    queryFn: async () => {
      const response = await fetch('/api/todos')
      return response.json()
    },
    queryClient,
    getKey: (item) => item.id,
  })
)
import { QueryClient } from '@tanstack/query-core'
import { createCollection } from '@tanstack/db'
import { queryCollectionOptions } from '@tanstack/query-db-collection'

const queryClient = new QueryClient()

const todosCollection = createCollection(
  queryCollectionOptions({
    queryKey: ['todos'],
    queryFn: async () => {
      const response = await fetch('/api/todos')
      return response.json()
    },
    queryClient,
    getKey: (item) => item.id,
  })
)

配置选项

queryCollectionOptions 函数接受以下选项:

必选选项

  • queryKey: TanStack Query 的查询键
  • queryFn: 从服务器获取数据的函数
  • queryClient: TanStack Query 客户端实例
  • getKey: 从项中提取唯一键的函数

查询选项

  • enabled: 查询是否应自动运行(默认值:true
  • refetchInterval: 重新获取间隔(以毫秒为单位)
  • retry: 失败查询的重试配置
  • retryDelay: 重试之间的延迟
  • staleTime: 数据被视为新鲜的时间
  • meta: 可选元数据,将传递给查询函数上下文

集合选项

  • id: 集合的唯一标识符
  • schema: 用于验证项的模式
  • sync: 自定义同步配置
  • startSync: 是否立即开始同步(默认值:true

持久化处理器

  • onInsert: 在插入操作之前调用的处理器
  • onUpdate: 在更新操作之前调用的处理器
  • onDelete: 在删除操作之前调用的处理器

持久化处理器

您可以定义在发生突变时调用的处理器。这些处理器可以将更改持久化到您的后端,并控制操作后查询是否应重新获取。

typescript
const todosCollection = createCollection(
  queryCollectionOptions({
    queryKey: ['todos'],
    queryFn: fetchTodos,
    queryClient,
    getKey: (item) => item.id,
    
    onInsert: async ({ transaction }) => {
      const newItems = transaction.mutations.map(m => m.modified)
      await api.createTodos(newItems)
      // Returning nothing or { refetch: true } will trigger a refetch
      // Return { refetch: false } to skip automatic refetch
    },
    
    onUpdate: async ({ transaction }) => {
      const updates = transaction.mutations.map(m => ({
        id: m.key,
        changes: m.changes
      }))
      await api.updateTodos(updates)
    },
    
    onDelete: async ({ transaction }) => {
      const ids = transaction.mutations.map(m => m.key)
      await api.deleteTodos(ids)
    }
  })
)
const todosCollection = createCollection(
  queryCollectionOptions({
    queryKey: ['todos'],
    queryFn: fetchTodos,
    queryClient,
    getKey: (item) => item.id,
    
    onInsert: async ({ transaction }) => {
      const newItems = transaction.mutations.map(m => m.modified)
      await api.createTodos(newItems)
      // Returning nothing or { refetch: true } will trigger a refetch
      // Return { refetch: false } to skip automatic refetch
    },
    
    onUpdate: async ({ transaction }) => {
      const updates = transaction.mutations.map(m => ({
        id: m.key,
        changes: m.changes
      }))
      await api.updateTodos(updates)
    },
    
    onDelete: async ({ transaction }) => {
      const ids = transaction.mutations.map(m => m.key)
      await api.deleteTodos(ids)
    }
  })
)

控制重新获取行为

默认情况下,在任何持久化处理器(onInsertonUpdateonDelete)成功完成后,查询将自动重新获取,以确保本地状态与服务器状态匹配。

您可以通过返回一个具有 refetch 属性的对象来控制此行为。

typescript
onInsert: async ({ transaction }) => {
  await api.createTodos(transaction.mutations.map(m => m.modified))
  
  // Skip the automatic refetch
  return { refetch: false }
}
onInsert: async ({ transaction }) => {
  await api.createTodos(transaction.mutations.map(m => m.modified))
  
  // Skip the automatic refetch
  return { refetch: false }
}

这在以下情况下很有用:

  • 您确信服务器状态与您发送的内容匹配
  • 您想避免不必要的网络请求
  • 您通过其他机制(如 WebSockets)处理状态更新

工具方法

集合通过 collection.utils 提供这些工具方法。

  • refetch(): 手动触发查询的重新获取

直接写入

直接写入适用于正常查询/突变流程不适合您需求的场景。它们允许您直接写入同步数据存储,绕过乐观更新系统和查询重新获取机制。

理解数据存储

查询集合维护两个数据存储:

  1. 同步数据存储 - 通过 queryFn 与服务器同步的权威状态。
  2. 乐观突变存储 - 在服务器确认之前乐观应用的临时更改。

常规集合操作(插入、更新、删除)会创建乐观突变,这些突变会:

  • 立即应用于 UI
  • 通过持久化处理器发送到服务器
  • 如果服务器请求失败,则自动回滚
  • 查询重新获取时,用服务器数据替换。

直接写入完全绕过此系统,直接写入同步数据存储,使其成为处理来自其他来源的实时更新的理想选择。

何时使用直接写入

应在以下情况下使用直接写入:

  • 您需要同步来自 WebSockets 或服务器发送事件的实时更新
  • 您正在处理大型数据集,重新获取所有数据成本过高
  • 您收到增量更新或服务器计算的字段更新
  • 您需要实现复杂的 Paging 或部分数据加载场景

单独的写入操作

typescript
// Insert a new item directly to the synced data store
todosCollection.utils.writeInsert({ id: '1', text: 'Buy milk', completed: false })

// Update an existing item in the synced data store
todosCollection.utils.writeUpdate({ id: '1', completed: true })

// Delete an item from the synced data store
todosCollection.utils.writeDelete('1')

// Upsert (insert or update) in the synced data store
todosCollection.utils.writeUpsert({ id: '1', text: 'Buy milk', completed: false })
// Insert a new item directly to the synced data store
todosCollection.utils.writeInsert({ id: '1', text: 'Buy milk', completed: false })

// Update an existing item in the synced data store
todosCollection.utils.writeUpdate({ id: '1', completed: true })

// Delete an item from the synced data store
todosCollection.utils.writeDelete('1')

// Upsert (insert or update) in the synced data store
todosCollection.utils.writeUpsert({ id: '1', text: 'Buy milk', completed: false })

这些操作:

  • 直接写入同步数据存储
  • 不创建乐观突变
  • 不触发自动查询重新获取
  • 立即更新 TanStack Query 缓存
  • 立即在 UI 中可见

批量操作

writeBatch 方法允许您原子地执行多个操作。在回调中调用的任何写入操作都将被收集并作为单个事务执行。

typescript
todosCollection.utils.writeBatch(() => {
  todosCollection.utils.writeInsert({ id: '1', text: 'Buy milk' })
  todosCollection.utils.writeInsert({ id: '2', text: 'Walk dog' })
  todosCollection.utils.writeUpdate({ id: '3', completed: true })
  todosCollection.utils.writeDelete('4')
})
todosCollection.utils.writeBatch(() => {
  todosCollection.utils.writeInsert({ id: '1', text: 'Buy milk' })
  todosCollection.utils.writeInsert({ id: '2', text: 'Walk dog' })
  todosCollection.utils.writeUpdate({ id: '3', completed: true })
  todosCollection.utils.writeDelete('4')
})

真实世界示例:WebSocket 集成

typescript
// Handle real-time updates from WebSocket without triggering full refetches
ws.on('todos:update', (changes) => {
  todosCollection.utils.writeBatch(() => {
    changes.forEach(change => {
      switch (change.type) {
        case 'insert':
          todosCollection.utils.writeInsert(change.data)
          break
        case 'update':
          todosCollection.utils.writeUpdate(change.data)
          break
        case 'delete':
          todosCollection.utils.writeDelete(change.id)
          break
      }
    })
  })
})
// Handle real-time updates from WebSocket without triggering full refetches
ws.on('todos:update', (changes) => {
  todosCollection.utils.writeBatch(() => {
    changes.forEach(change => {
      switch (change.type) {
        case 'insert':
          todosCollection.utils.writeInsert(change.data)
          break
        case 'update':
          todosCollection.utils.writeUpdate(change.data)
          break
        case 'delete':
          todosCollection.utils.writeDelete(change.id)
          break
      }
    })
  })
})

示例:增量更新

typescript
// Handle server responses after mutations without full refetch
const createTodo = async (todo) => {
  // Optimistically add the todo
  const tempId = crypto.randomUUID()
  todosCollection.insert({ ...todo, id: tempId })
  
  try {
    // Send to server
    const serverTodo = await api.createTodo(todo)
    
    // Sync the server response (with server-generated ID and timestamps)
    // without triggering a full collection refetch
    todosCollection.utils.writeBatch(() => {
      todosCollection.utils.writeDelete(tempId)
      todosCollection.utils.writeInsert(serverTodo)
    })
  } catch (error) {
    // Rollback happens automatically
    throw error
  }
}
// Handle server responses after mutations without full refetch
const createTodo = async (todo) => {
  // Optimistically add the todo
  const tempId = crypto.randomUUID()
  todosCollection.insert({ ...todo, id: tempId })
  
  try {
    // Send to server
    const serverTodo = await api.createTodo(todo)
    
    // Sync the server response (with server-generated ID and timestamps)
    // without triggering a full collection refetch
    todosCollection.utils.writeBatch(() => {
      todosCollection.utils.writeDelete(tempId)
      todosCollection.utils.writeInsert(serverTodo)
    })
  } catch (error) {
    // Rollback happens automatically
    throw error
  }
}

示例:大型数据集分页

typescript
// Load additional pages without refetching existing data
const loadMoreTodos = async (page) => {
  const newTodos = await api.getTodos({ page, limit: 50 })
  
  // Add new items without affecting existing ones
  todosCollection.utils.writeBatch(() => {
    newTodos.forEach(todo => {
      todosCollection.utils.writeInsert(todo)
    })
  })
}
// Load additional pages without refetching existing data
const loadMoreTodos = async (page) => {
  const newTodos = await api.getTodos({ page, limit: 50 })
  
  // Add new items without affecting existing ones
  todosCollection.utils.writeBatch(() => {
    newTodos.forEach(todo => {
      todosCollection.utils.writeInsert(todo)
    })
  })
}

重要行为

完整的状态同步

查询集合将 queryFn 的结果视为集合的完整状态。这意味着:

  • 集合中存在但查询结果中不存在的项将被删除。
  • 查询结果中存在但集合中不存在的项将被插入。
  • 同时存在的项如果不同,将被更新。

空数组行为

queryFn 返回一个空数组时,集合中的所有项都将被删除。这是因为集合将空数组解释为“服务器没有项”。

typescript
// This will delete all items in the collection
queryFn: async () => []
// This will delete all items in the collection
queryFn: async () => []

处理部分/增量获取

由于查询集合期望 queryFn 返回完整状态,因此您可以通过合并新数据与现有数据来处理部分获取。

typescript
const todosCollection = createCollection(
  queryCollectionOptions({
    queryKey: ['todos'],
    queryFn: async ({ queryKey }) => {
      // Get existing data from cache
      const existingData = queryClient.getQueryData(queryKey) || []
      
      // Fetch only new/updated items (e.g., changes since last sync)
      const lastSyncTime = localStorage.getItem('todos-last-sync')
      const newData = await fetch(`/api/todos?since=${lastSyncTime}`).then(r => r.json())
      
      // Merge new data with existing data
      const existingMap = new Map(existingData.map(item => [item.id, item]))
      
      // Apply updates and additions
      newData.forEach(item => {
        existingMap.set(item.id, item)
      })
      
      // Handle deletions if your API provides them
      if (newData.deletions) {
        newData.deletions.forEach(id => existingMap.delete(id))
      }
      
      // Update sync time
      localStorage.setItem('todos-last-sync', new Date().toISOString())
      
      // Return the complete merged state
      return Array.from(existingMap.values())
    },
    queryClient,
    getKey: (item) => item.id,
  })
)
const todosCollection = createCollection(
  queryCollectionOptions({
    queryKey: ['todos'],
    queryFn: async ({ queryKey }) => {
      // Get existing data from cache
      const existingData = queryClient.getQueryData(queryKey) || []
      
      // Fetch only new/updated items (e.g., changes since last sync)
      const lastSyncTime = localStorage.getItem('todos-last-sync')
      const newData = await fetch(`/api/todos?since=${lastSyncTime}`).then(r => r.json())
      
      // Merge new data with existing data
      const existingMap = new Map(existingData.map(item => [item.id, item]))
      
      // Apply updates and additions
      newData.forEach(item => {
        existingMap.set(item.id, item)
      })
      
      // Handle deletions if your API provides them
      if (newData.deletions) {
        newData.deletions.forEach(id => existingMap.delete(id))
      }
      
      // Update sync time
      localStorage.setItem('todos-last-sync', new Date().toISOString())
      
      // Return the complete merged state
      return Array.from(existingMap.values())
    },
    queryClient,
    getKey: (item) => item.id,
  })
)

此模式允许您:

  • 只从您的 API 获取增量更改
  • 将这些更改与现有数据合并
  • 返回集合期望的完整状态
  • 避免每次获取所有数据的性能开销

直接写入和查询同步

直接写入会立即更新集合,同时也会更新 TanStack Query 缓存。但是,它们不会阻止正常的查询同步行为。如果您的 queryFn 返回的数据与您的直接写入冲突,查询数据将优先。

为了正确处理此问题:

  1. 在使用直接写入时,在持久化处理器中使用 { refetch: false }
  2. 设置适当的 staleTime 以防止不必要的重新获取。
  3. 设计您的 queryFn 以了解增量更新(例如,只获取新数据)。

完整的直接写入 API 参考

所有直接写入方法都可以在 collection.utils 上找到。

  • writeInsert(data): 直接插入一个或多个项。
  • writeUpdate(data): 直接更新一个或多个项。
  • writeDelete(keys): 直接删除一个或多个项。
  • writeUpsert(data): 直接插入或更新一个或多个项。
  • writeBatch(callback): 原子地执行多个操作。
  • refetch(): 手动触发查询的重新获取
我们的合作伙伴
Code Rabbit
Electric
Prisma
订阅 Bytes

您的每周 JavaScript 资讯。每周一免费发送给超过 10 万开发者。

Bytes

无垃圾邮件。您可以随时取消订阅。

订阅 Bytes

您的每周 JavaScript 资讯。每周一免费发送给超过 10 万开发者。

Bytes

无垃圾邮件。您可以随时取消订阅。