Electric 集合

Electric 集合

Electric collections 提供了 TanStack DB 与 ElectricSQL 之间的无缝集成,通过 Electric 的同步引擎实现与 Postgres 数据库的实时数据同步。

概述

@tanstack/electric-db-collection 包允许您创建集合,这些集合可以

  • 通过 Electric shapes 自动从 Postgres 同步数据
  • 支持通过事务匹配进行乐观更新,并在出错时自动回滚
  • 通过可自定义的 mutation handlers 处理持久化

安装

bash
npm install @tanstack/electric-db-collection @tanstack/react-db
npm install @tanstack/electric-db-collection @tanstack/react-db

基本用法

typescript
import { createCollection } from '@tanstack/react-db'
import { electricCollectionOptions } from '@tanstack/electric-db-collection'

const todosCollection = createCollection(
  electricCollectionOptions({
    shapeOptions: {
      url: '/api/todos',
    },
    getKey: (item) => item.id,
  })
)
import { createCollection } from '@tanstack/react-db'
import { electricCollectionOptions } from '@tanstack/electric-db-collection'

const todosCollection = createCollection(
  electricCollectionOptions({
    shapeOptions: {
      url: '/api/todos',
    },
    getKey: (item) => item.id,
  })
)

配置选项

electricCollectionOptions 函数接受以下选项

必选选项

  • shapeOptions: ElectricSQL ShapeStream 的配置

    • url: 到 Electric 的代理 URL
  • getKey: 从 item 中提取唯一 key 的函数

Optional (可选)

  • id: 集合的唯一标识符
  • schema: 用于验证 item 的 schema。任何兼容 Standard Schema 的 schema
  • sync: 自定义同步配置

持久化处理器

  • onInsert: 在 insert 操作之前调用的 handler
  • onUpdate: 在 update 操作之前调用的 handler
  • onDelete: 在 delete 操作之前调用的 handler

持久化处理器

可以定义 handlers 来在 mutations 上运行。它们对于将 mutations 发送到后端并在 Electric 提供相应的事务后进行确认非常有用。在确认之前,TanStack DB 会阻止集合的数据同步,以防止竞争条件。为了避免任何延迟,使用匹配策略非常重要。

最可靠的策略是后端在响应中包含事务 ID (txid),允许客户端将每个 mutation 与 Electric 的事务标识符进行匹配,以进行精确确认。如果没有提供策略,客户端 mutations 将在三秒后自动确认。

typescript
const todosCollection = createCollection(
  electricCollectionOptions({
    id: 'todos',
    schema: todoSchema,
    getKey: (item) => item.id,
    shapeOptions: {
      url: '/api/todos',
      params: { table: 'todos' },
    },
    
    onInsert: async ({ transaction }) => {
      const newItem = transaction.mutations[0].modified
      const response = await api.todos.create(newItem)
      
      return { txid: response.txid }
    },
    
    // you can also implement onUpdate and onDelete handlers
  })
)
const todosCollection = createCollection(
  electricCollectionOptions({
    id: 'todos',
    schema: todoSchema,
    getKey: (item) => item.id,
    shapeOptions: {
      url: '/api/todos',
      params: { table: 'todos' },
    },
    
    onInsert: async ({ transaction }) => {
      const newItem = transaction.mutations[0].modified
      const response = await api.todos.create(newItem)
      
      return { txid: response.txid }
    },
    
    // you can also implement onUpdate and onDelete handlers
  })
)

在后端,您可以通过直接查询 Postgres 来提取事务的 txid

ts
async function generateTxId(tx) {
  // The ::xid cast strips off the epoch, giving you the raw 32-bit value
  // that matches what PostgreSQL sends in logical replication streams
  // (and then exposed through Electric which we'll match against
  // in the client).
  const result = await tx.execute(
    sql`SELECT pg_current_xact_id()::xid::text as txid`
  )
  const txid = result.rows[0]?.txid

  if (txid === undefined) {
    throw new Error(`Failed to get transaction ID`)
  }

  return parseInt(txid as string, 10)
}
async function generateTxId(tx) {
  // The ::xid cast strips off the epoch, giving you the raw 32-bit value
  // that matches what PostgreSQL sends in logical replication streams
  // (and then exposed through Electric which we'll match against
  // in the client).
  const result = await tx.execute(
    sql`SELECT pg_current_xact_id()::xid::text as txid`
  )
  const txid = result.rows[0]?.txid

  if (txid === undefined) {
    throw new Error(`Failed to get transaction ID`)
  }

  return parseInt(txid as string, 10)
}

Electric Proxy 示例

Electric 通常部署在代理服务器后面,该服务器处理 shape 配置、身份验证和授权。这提供了更好的安全性,并允许您控制用户可以访问的数据,而无需将 Electric 暴露给客户端。

以下是使用 TanStack Starter 的代理实现示例

js
import { createServerFileRoute } from "@tanstack/react-start/server"
import { ELECTRIC_PROTOCOL_QUERY_PARAMS } from "@electric-sql/client"

// Electric URL
const baseUrl = 'http://.../v1/shape'

const serve = async ({ request }: { request: Request }) => {
  // ...check user authorization  
  const url = new URL(request.url)
  const originUrl = new URL(baseUrl)

  // passthrough parameters from electric client
  url.searchParams.forEach((value, key) => {
    if (ELECTRIC_PROTOCOL_QUERY_PARAMS.includes(key)) {
      originUrl.searchParams.set(key, value)
    }
  })

  // set shape parameters 
  // full spec: https://github.com/electric-sql/electric/blob/main/website/electric-api.yaml
  originUrl.searchParams.set("table", "todos")
  // Where clause to filter rows in the table (optional).
  // originUrl.searchParams.set("where", "completed = true")
  
  // Select the columns to sync (optional)
  // originUrl.searchParams.set("columns", "id,text,completed")

  const response = await fetch(originUrl)
  const headers = new Headers(response.headers)
  headers.delete("content-encoding")
  headers.delete("content-length")

  return new Response(response.body, {
    status: response.status,
    statusText: response.statusText,
    headers,
  })
}

export const ServerRoute = createServerFileRoute("/api/todos").methods({
  GET: serve,
})
import { createServerFileRoute } from "@tanstack/react-start/server"
import { ELECTRIC_PROTOCOL_QUERY_PARAMS } from "@electric-sql/client"

// Electric URL
const baseUrl = 'http://.../v1/shape'

const serve = async ({ request }: { request: Request }) => {
  // ...check user authorization  
  const url = new URL(request.url)
  const originUrl = new URL(baseUrl)

  // passthrough parameters from electric client
  url.searchParams.forEach((value, key) => {
    if (ELECTRIC_PROTOCOL_QUERY_PARAMS.includes(key)) {
      originUrl.searchParams.set(key, value)
    }
  })

  // set shape parameters 
  // full spec: https://github.com/electric-sql/electric/blob/main/website/electric-api.yaml
  originUrl.searchParams.set("table", "todos")
  // Where clause to filter rows in the table (optional).
  // originUrl.searchParams.set("where", "completed = true")
  
  // Select the columns to sync (optional)
  // originUrl.searchParams.set("columns", "id,text,completed")

  const response = await fetch(originUrl)
  const headers = new Headers(response.headers)
  headers.delete("content-encoding")
  headers.delete("content-length")

  return new Response(response.body, {
    status: response.status,
    statusText: response.statusText,
    headers,
  })
}

export const ServerRoute = createServerFileRoute("/api/todos").methods({
  GET: serve,
})

带有显式事务的乐观更新

对于更高级的用例,您可以创建自定义 actions,这些 actions 可以跨集合事务性地执行多个 mutations。在这种情况下,您需要使用 utils.awaitTxId() 显式等待事务 ID。

typescript
const addTodoAction = createOptimisticAction({
  onMutate: ({ text }) => {
    // optimistically insert with a temporary ID
    const tempId = crypto.randomUUID()
    todosCollection.insert({
      id: tempId,
      text,
      completed: false,
      created_at: new Date(),
    })
    
    // ... mutate other collections
  },
  
  mutationFn: async ({ text }) => {
    const response = await api.todos.create({
      data: { text, completed: false }
    })
    
    await todosCollection.utils.awaitTxId(response.txid)
  }
})
const addTodoAction = createOptimisticAction({
  onMutate: ({ text }) => {
    // optimistically insert with a temporary ID
    const tempId = crypto.randomUUID()
    todosCollection.insert({
      id: tempId,
      text,
      completed: false,
      created_at: new Date(),
    })
    
    // ... mutate other collections
  },
  
  mutationFn: async ({ text }) => {
    const response = await api.todos.create({
      data: { text, completed: false }
    })
    
    await todosCollection.utils.awaitTxId(response.txid)
  }
})

工具方法

该集合通过 collection.utils 提供这些工具方法

  • awaitTxId(txid, timeout?): 手动等待特定的事务 ID 同步
typescript
todosCollection.utils.awaitTxId(12345)
todosCollection.utils.awaitTxId(12345)

当您需要确保 mutation 已同步后再进行其他操作时,此功能非常有用。

我们的合作伙伴
Code Rabbit
Electric
Prisma
订阅 Bytes

您的每周 JavaScript 资讯。每周一免费发送给超过 10 万开发者。

Bytes

无垃圾邮件。您可以随时取消订阅。

订阅 Bytes

您的每周 JavaScript 资讯。每周一免费发送给超过 10 万开发者。

Bytes

无垃圾邮件。您可以随时取消订阅。