使用 Cloudflare Durable Objects 是在边缘实现有状态应用的关键。传统的 Cloudflare Workers 是无状态的,意味着每次请求都是独立的,Worker 本身不存储跨请求的持久化数据。Durable Objects 改变了这一点,它为每个“对象实例”提供了强一致性的、持久化的存储,并且代码(Worker 逻辑)与该存储紧密结合。
可以把 Durable Object 理解为一个具有持久化状态的、可寻址的 Actor(类似于 Actor 模型中的概念)。每个 Durable Object 实例都有一个唯一的 ID,并且在任何给定时间,该 ID 只会激活在一个地方,确保了对该对象状态的访问是单线程的、串行的,从而避免了并发冲突。
**Durable Objects 的核心特性:**
1. **持久化存储 (Persistent Storage):** 每个 Durable Object 实例都可以访问一个私有的、事务性的键值存储 API (`this.state.storage`)。数据写入后会持久保存,直到被显式删除。
2. **强一致性 (Strong Consistency):** 对单个 Durable Object 实例的读写操作是强一致性的。一旦写入成功,后续的读取操作(针对同一个对象实例)保证能看到最新的数据。这与 Workers KV 的最终一致性不同。
3. **单点协调 (Single Point of Coordination):** 对于一个给定的 Durable Object ID,Cloudflare 会确保只有一个实例在运行。所有针对该 ID 的请求都会被路由到这个唯一的实例,这使得它非常适合作为协调中心(例如,聊天室、游戏会话、文档锁)。
4. **代码与状态的共存 (Colocation of Code and State):** Durable Object 的代码(在 `fetch` 处理程序中定义)与其状态存储在同一个位置执行,这减少了访问状态的延迟。
5. **全局可访问性与管辖权控制 (Global Accessibility & Jurisdictional Control):**
* 虽然 Durable Object 实例在物理上可能存在于某个特定的 Cloudflare 数据中心,但它们可以通过其 ID 从全球任何地方的 Worker 访问。
* 您可以指定一个“管辖权 (Jurisdiction)”来限制 Durable Object 的状态存储在特定的地理区域(例如,“EU”),以满足数据本地化要求。
**如何使用 Durable Objects 实现有状态的边缘应用:**
实现过程通常涉及两个主要部分:
1. **Durable Object 类定义:** 这是实际处理状态和逻辑的地方。
2. **普通的 Cloudflare Worker (Orchestrating Worker):** 这个 Worker 负责根据请求确定需要哪个 Durable Object 实例,获取该实例的“存根 (stub)”,并将请求转发给它。
**步骤详解:**
**1. 定义 Durable Object 类 (例如,`MyDurableObject.js`)**
```javascript
// MyDurableObject.js
export class MyDurableObject {
constructor(state, env) {
this.state = state; // `state` 对象提供了对存储的访问 (this.state.storage)
this.env = env; // `env` 对象包含了绑定的环境变量、KV、R2 等
this.value = 0; // 可以在构造函数中初始化内存中的状态
// 从持久化存储中加载初始状态(如果存在)
//
this.state.blockConcurrencyWhile() 确保在异步操作完成前,不会处理其他请求
this.state.blockConcurrencyWhile(async () => {
let storedValue = await this.state.storage.get("currentValue");
if (storedValue !== undefined) {
this.value = storedValue;
}
});
}
// 每个 Durable Object 都必须实现一个 fetch 方法
// 这个方法处理发送到此 Durable Object 实例的请求
async fetch(request) {
const url = new URL(request.url);
let currentValue = this.value; // 从内存中读取
switch (url.pathname) {
case "/increment":
currentValue++;
this.value = currentValue; // 更新内存中的值
await this.state.storage.put("currentValue", currentValue); // 持久化到存储
return new Response(`Value incremented to: ${currentValue}`);
case "/decrement":
currentValue--;
this.value = currentValue;
await this.state.storage.put("currentValue", currentValue);
return new Response(`Value decremented to: ${currentValue}`);
case "/get":
return new Response(`Current value: ${currentValue}`);
case "/reset":
currentValue = 0;
this.value = currentValue;
await this.state.storage.put("currentValue", currentValue);
// 或者删除: await this.state.storage.delete("currentValue");
return new Response("Value reset to 0");
default:
return new Response("Not found", { status: 404 });
}
}
// 示例:使用事务来确保原子性操作
async atomicUpdate(amount) {
await this.state.storage.transaction(async (txn) => {
let val = await txn.get("currentValue") || 0;
val += amount;
if (val < 0) {
// 事务可以被回滚
txn.rollback();
throw new Error("Value cannot be negative after transaction.");
}
await txn.put("currentValue", val);
this.value = val; // 更新内存状态
});
return this.value;
}
}
```
**2. 配置 `wrangler.toml`**
你需要告诉 Cloudflare 你的 Durable Object 类以及如何绑定它。
```toml
name = "my-stateful-worker"
main = "src/worker.js" # 这是你的普通 Worker (Orchestrating Worker)
compatibility_date = "YYYY-MM-DD" # 使用一个较新的日期
# Durable Objects 配置
# 每个 [durable_objects.bindings] 条目定义了一个绑定
# 你可以在 Worker 代码中通过 `env.COUNTER_DO` 来访问它
[[durable_objects.bindings]]
name = "COUNTER_DO" # Worker 中使用的绑定名称
class_name = "MyDurableObject" # Durable Object 类的名称 (需要与导出的类名匹配)
# script_name = "my-durable-object-script" # 如果 DO 在不同的 Worker 脚本中,指定脚本名
# 如果你的 Durable Object 类在同一个 worker.js 文件中,
# 并且你使用了 ES模块格式,通常不需要 script_name。
# Cloudflare 会自动找到它。
# 如果 MyDurableObject 类在 worker.js 中定义并导出,
# 确保你的 worker.js 看起来像这样:
# export { MyDurableObject } from "./MyDurableObject.js"; // 如果在单独文件
# export default { ... } // 你的普通 Worker
```
如果 `MyDurableObject` 类定义在 `src/worker.js` 内部或从 `src/worker.js` 中导出,`script_name` 通常可以省略。如果它在另一个独立的 Worker 脚本中,则需要指定 `script_name`。
**3. 编写普通的 Cloudflare Worker (Orchestrating Worker) (例如,`src/worker.js`)**
这个 Worker 接收外部请求,决定使用哪个 Durable Object 实例,并与之通信。
```javascript
// src/worker.js
// 如果 MyDurableObject 在单独的文件中,像这样导入并导出它
// export { MyDurableObject } from "./MyDurableObject.js";
// 如果 MyDurableObject 定义在这个文件中,确保它被导出
export class MyDurableObject { /* ... Class definition from above ... */ }
export default {
async fetch(request, env, ctx) {
try {
const url = new URL(request.url);
const pathSegments = url.pathname.split('/').filter(Boolean); // e.g., /counter/my-counter-instance/increment
if (pathSegments.length < 2 || pathSegments[0] !== 'counter') {
return new Response("Invalid path. Use /counter/:id/:action", { status: 400 });
}
const counterIdName = pathSegments[1]; // 例如 "my-counter-instance"
const actionPath = "/" + (pathSegments.slice(2).join('/') || "get"); // 例如 "/increment" or "/get"
// 1. 获取 Durable Object ID
// idFromName() 确保对于相同的名称,你总是得到相同的 ID 对象。
// 这个 ID 本身不是字符串,而是一个 DurableObjectId 对象。
let id = env.COUNTER_DO.idFromName(counterIdName);
// 2. 获取 Durable Object 存根 (stub)
// stub 是一个客户端对象,用于与 Durable Object 实例通信。
let stub = env.COUNTER_DO.get(id);
// 3. 将请求转发给 Durable Object 实例
// 你可以创建一个新的请求,或者修改并转发原始请求。
// 这里我们构建一个新的 URL,只包含动作路径。
const doUrl = new URL(request.url);
doUrl.pathname = actionPath;
// 将原始请求的方法、头部和主体(如果存在)转发给 DO
// 注意:如果原始请求是 GET/HEAD,则 body 为 null
return await stub.fetch(doUrl.toString(), {
method: request.method,
headers: request.headers,
body: request.body,
});
} catch (e) {
return new Response(e.message || "Internal Server Error", { status: 500 });
}
},
};
```
**工作流程:**
1. 用户向你的 Worker URL 发起请求,例如 `
https://your-worker.your-account.workers.dev/counter/game-lobby-123/increment`。
2. `Orchestrating Worker` (`src/worker.js`) 的 `fetch` 方法被调用。
3. 它从 URL 中解析出 `counterIdName` ("game-lobby-123") 和 `actionPath` ("/increment")。
4. 它使用 `env.COUNTER_DO.idFromName("game-lobby-123")` 获取一个唯一的 `DurableObjectId`。
5. 然后使用 `env.COUNTER_DO.get(id)` 获取该 Durable Object 实例的存根 (`stub`)。
* 如果这是第一次访问这个 ID,Cloudflare 会在某个边缘节点上创建一个新的 `MyDurableObject` 实例,并运行其构造函数。
* 如果该 ID 的实例已存在(可能在内存中,也可能需要从存储中唤醒),Cloudflare 会将请求路由到现有的实例。
6. `Orchestrating Worker` 通过 `stub.fetch(...)` 将一个包含路径 `/increment` 的新请求发送给 `MyDurableObject` 实例。
7. `MyDurableObject` 实例的 `fetch` 方法被调用,它处理 `/increment` 路径,更新其内部状态 (`this.value`),并将状态持久化到 `this.state.storage`。
8. `MyDurableObject` 返回一个响应。
9. `Orchestrating Worker` 将此响应返回给原始用户。
**应用场景示例:**
* **实时协作应用:** 每个文档或白板可以是一个 Durable Object,处理编辑、光标位置等。
* **聊天室/消息传递:** 每个聊天室是一个 Durable Object,负责接收和广播消息给连接的客户端(通常通过 WebSockets,Durable Objects 支持 WebSockets)。
* **游戏服务器:** 每个游戏会话或游戏大厅可以是一个 Durable Object,管理游戏状态和玩家互动。
* **购物车:** 每个用户的购物车可以是一个 Durable Object,在用户浏览时保持其状态。
* **分布式锁或信号量:** 用于协调跨多个 Worker 的操作。
* **投票/计数器:** 每个投票项目或计数器是一个 Durable Object。
**关键考虑:**
* **ID 的选择:** 如何命名和生成 Durable Object ID 非常重要。它决定了你的状态是如何分区的。
* **粒度:** 一个 Durable Object 应该管理多少状态?太粗粒度可能导致瓶颈,太细粒度可能导致管理开销。
* **事务:** 使用 `
this.state.storage.transaction()` 来执行原子性的多步更新,确保数据一致性。
* **`blockConcurrencyWhile()`:** 在构造函数或 `fetch` 处理程序中执行异步初始化或关键操作时,使用它来防止在这些操作完成前处理其他请求,避免竞争条件。
通过这种方式,Durable Objects 使得在 Cloudflare 边缘构建复杂的、有状态的、低延迟的应用程序成为可能。