Electric collections 提供了 TanStack DB 与 ElectricSQL 之间的无缝集成,通过 Electric 的同步引擎实现与 Postgres 数据库的实时数据同步。
@tanstack/electric-db-collection
包允许您创建集合,这些集合可以
npm install @tanstack/electric-db-collection @tanstack/react-db
npm install @tanstack/electric-db-collection @tanstack/react-db
import { createCollection } from '@tanstack/react-db'
import { electricCollectionOptions } from '@tanstack/electric-db-collection'
const todosCollection = createCollection(
electricCollectionOptions({
shapeOptions: {
url: '/api/todos',
},
getKey: (item) => item.id,
})
)
import { createCollection } from '@tanstack/react-db'
import { electricCollectionOptions } from '@tanstack/electric-db-collection'
const todosCollection = createCollection(
electricCollectionOptions({
shapeOptions: {
url: '/api/todos',
},
getKey: (item) => item.id,
})
)
electricCollectionOptions
函数接受以下选项
shapeOptions: ElectricSQL ShapeStream 的配置
getKey: 从 item 中提取唯一 key 的函数
可以定义 handlers 来在 mutations 上运行。它们对于将 mutations 发送到后端并在 Electric 提供相应的事务后进行确认非常有用。在确认之前,TanStack DB 会阻止集合的数据同步,以防止竞争条件。为了避免任何延迟,使用匹配策略非常重要。
最可靠的策略是后端在响应中包含事务 ID (txid),允许客户端将每个 mutation 与 Electric 的事务标识符进行匹配,以进行精确确认。如果没有提供策略,客户端 mutations 将在三秒后自动确认。
const todosCollection = createCollection(
electricCollectionOptions({
id: 'todos',
schema: todoSchema,
getKey: (item) => item.id,
shapeOptions: {
url: '/api/todos',
params: { table: 'todos' },
},
onInsert: async ({ transaction }) => {
const newItem = transaction.mutations[0].modified
const response = await api.todos.create(newItem)
return { txid: response.txid }
},
// you can also implement onUpdate and onDelete handlers
})
)
const todosCollection = createCollection(
electricCollectionOptions({
id: 'todos',
schema: todoSchema,
getKey: (item) => item.id,
shapeOptions: {
url: '/api/todos',
params: { table: 'todos' },
},
onInsert: async ({ transaction }) => {
const newItem = transaction.mutations[0].modified
const response = await api.todos.create(newItem)
return { txid: response.txid }
},
// you can also implement onUpdate and onDelete handlers
})
)
在后端,您可以通过直接查询 Postgres 来提取事务的 txid。
async function generateTxId(tx) {
// The ::xid cast strips off the epoch, giving you the raw 32-bit value
// that matches what PostgreSQL sends in logical replication streams
// (and then exposed through Electric which we'll match against
// in the client).
const result = await tx.execute(
sql`SELECT pg_current_xact_id()::xid::text as txid`
)
const txid = result.rows[0]?.txid
if (txid === undefined) {
throw new Error(`Failed to get transaction ID`)
}
return parseInt(txid as string, 10)
}
async function generateTxId(tx) {
// The ::xid cast strips off the epoch, giving you the raw 32-bit value
// that matches what PostgreSQL sends in logical replication streams
// (and then exposed through Electric which we'll match against
// in the client).
const result = await tx.execute(
sql`SELECT pg_current_xact_id()::xid::text as txid`
)
const txid = result.rows[0]?.txid
if (txid === undefined) {
throw new Error(`Failed to get transaction ID`)
}
return parseInt(txid as string, 10)
}
Electric 通常部署在代理服务器后面,该服务器处理 shape 配置、身份验证和授权。这提供了更好的安全性,并允许您控制用户可以访问的数据,而无需将 Electric 暴露给客户端。
以下是使用 TanStack Starter 的代理实现示例
import { createServerFileRoute } from "@tanstack/react-start/server"
import { ELECTRIC_PROTOCOL_QUERY_PARAMS } from "@electric-sql/client"
// Electric URL
const baseUrl = 'http://.../v1/shape'
const serve = async ({ request }: { request: Request }) => {
// ...check user authorization
const url = new URL(request.url)
const originUrl = new URL(baseUrl)
// passthrough parameters from electric client
url.searchParams.forEach((value, key) => {
if (ELECTRIC_PROTOCOL_QUERY_PARAMS.includes(key)) {
originUrl.searchParams.set(key, value)
}
})
// set shape parameters
// full spec: https://github.com/electric-sql/electric/blob/main/website/electric-api.yaml
originUrl.searchParams.set("table", "todos")
// Where clause to filter rows in the table (optional).
// originUrl.searchParams.set("where", "completed = true")
// Select the columns to sync (optional)
// originUrl.searchParams.set("columns", "id,text,completed")
const response = await fetch(originUrl)
const headers = new Headers(response.headers)
headers.delete("content-encoding")
headers.delete("content-length")
return new Response(response.body, {
status: response.status,
statusText: response.statusText,
headers,
})
}
export const ServerRoute = createServerFileRoute("/api/todos").methods({
GET: serve,
})
import { createServerFileRoute } from "@tanstack/react-start/server"
import { ELECTRIC_PROTOCOL_QUERY_PARAMS } from "@electric-sql/client"
// Electric URL
const baseUrl = 'http://.../v1/shape'
const serve = async ({ request }: { request: Request }) => {
// ...check user authorization
const url = new URL(request.url)
const originUrl = new URL(baseUrl)
// passthrough parameters from electric client
url.searchParams.forEach((value, key) => {
if (ELECTRIC_PROTOCOL_QUERY_PARAMS.includes(key)) {
originUrl.searchParams.set(key, value)
}
})
// set shape parameters
// full spec: https://github.com/electric-sql/electric/blob/main/website/electric-api.yaml
originUrl.searchParams.set("table", "todos")
// Where clause to filter rows in the table (optional).
// originUrl.searchParams.set("where", "completed = true")
// Select the columns to sync (optional)
// originUrl.searchParams.set("columns", "id,text,completed")
const response = await fetch(originUrl)
const headers = new Headers(response.headers)
headers.delete("content-encoding")
headers.delete("content-length")
return new Response(response.body, {
status: response.status,
statusText: response.statusText,
headers,
})
}
export const ServerRoute = createServerFileRoute("/api/todos").methods({
GET: serve,
})
对于更高级的用例,您可以创建自定义 actions,这些 actions 可以跨集合事务性地执行多个 mutations。在这种情况下,您需要使用 utils.awaitTxId() 显式等待事务 ID。
const addTodoAction = createOptimisticAction({
onMutate: ({ text }) => {
// optimistically insert with a temporary ID
const tempId = crypto.randomUUID()
todosCollection.insert({
id: tempId,
text,
completed: false,
created_at: new Date(),
})
// ... mutate other collections
},
mutationFn: async ({ text }) => {
const response = await api.todos.create({
data: { text, completed: false }
})
await todosCollection.utils.awaitTxId(response.txid)
}
})
const addTodoAction = createOptimisticAction({
onMutate: ({ text }) => {
// optimistically insert with a temporary ID
const tempId = crypto.randomUUID()
todosCollection.insert({
id: tempId,
text,
completed: false,
created_at: new Date(),
})
// ... mutate other collections
},
mutationFn: async ({ text }) => {
const response = await api.todos.create({
data: { text, completed: false }
})
await todosCollection.utils.awaitTxId(response.txid)
}
})
该集合通过 collection.utils 提供这些工具方法
todosCollection.utils.awaitTxId(12345)
todosCollection.utils.awaitTxId(12345)
当您需要确保 mutation 已同步后再进行其他操作时,此功能非常有用。
您的每周 JavaScript 资讯。每周一免费发送给超过 10 万开发者。