李成笔记网

专注域名、站长SEO知识分享与实战技巧

使用 Cloudflare Durable Objects 在边缘网络实现有状态应用

使用 Cloudflare Durable Objects 是在边缘实现有状态应用的关键。传统的 Cloudflare Workers 是无状态的,意味着每次请求都是独立的,Worker 本身不存储跨请求的持久化数据。Durable Objects 改变了这一点,它为每个“对象实例”提供了强一致性的、持久化的存储,并且代码(Worker 逻辑)与该存储紧密结合。


可以把 Durable Object 理解为一个具有持久化状态的、可寻址的 Actor(类似于 Actor 模型中的概念)。每个 Durable Object 实例都有一个唯一的 ID,并且在任何给定时间,该 ID 只会激活在一个地方,确保了对该对象状态的访问是单线程的、串行的,从而避免了并发冲突。


**Durable Objects 的核心特性:**


1. **持久化存储 (Persistent Storage):** 每个 Durable Object 实例都可以访问一个私有的、事务性的键值存储 API (`this.state.storage`)。数据写入后会持久保存,直到被显式删除。

2. **强一致性 (Strong Consistency):** 对单个 Durable Object 实例的读写操作是强一致性的。一旦写入成功,后续的读取操作(针对同一个对象实例)保证能看到最新的数据。这与 Workers KV 的最终一致性不同。

3. **单点协调 (Single Point of Coordination):** 对于一个给定的 Durable Object ID,Cloudflare 会确保只有一个实例在运行。所有针对该 ID 的请求都会被路由到这个唯一的实例,这使得它非常适合作为协调中心(例如,聊天室、游戏会话、文档锁)。

4. **代码与状态的共存 (Colocation of Code and State):** Durable Object 的代码(在 `fetch` 处理程序中定义)与其状态存储在同一个位置执行,这减少了访问状态的延迟。

5. **全局可访问性与管辖权控制 (Global Accessibility & Jurisdictional Control):**

* 虽然 Durable Object 实例在物理上可能存在于某个特定的 Cloudflare 数据中心,但它们可以通过其 ID 从全球任何地方的 Worker 访问。

* 您可以指定一个“管辖权 (Jurisdiction)”来限制 Durable Object 的状态存储在特定的地理区域(例如,“EU”),以满足数据本地化要求。


**如何使用 Durable Objects 实现有状态的边缘应用:**


实现过程通常涉及两个主要部分:


1. **Durable Object 类定义:** 这是实际处理状态和逻辑的地方。

2. **普通的 Cloudflare Worker (Orchestrating Worker):** 这个 Worker 负责根据请求确定需要哪个 Durable Object 实例,获取该实例的“存根 (stub)”,并将请求转发给它。


**步骤详解:**


**1. 定义 Durable Object 类 (例如,`MyDurableObject.js`)**


```javascript

// MyDurableObject.js

export class MyDurableObject {

constructor(state, env) {

this.state = state; // `state` 对象提供了对存储的访问 (this.state.storage)

this.env = env; // `env` 对象包含了绑定的环境变量、KV、R2 等

this.value = 0; // 可以在构造函数中初始化内存中的状态


// 从持久化存储中加载初始状态(如果存在)

//
this.state.blockConcurrencyWhile() 确保在异步操作完成前,不会处理其他请求

this.state.blockConcurrencyWhile(async () => {

let storedValue = await this.state.storage.get("currentValue");

if (storedValue !== undefined) {

this.value = storedValue;

}

});

}


// 每个 Durable Object 都必须实现一个 fetch 方法

// 这个方法处理发送到此 Durable Object 实例的请求

async fetch(request) {

const url = new URL(request.url);

let currentValue = this.value; // 从内存中读取


switch (url.pathname) {

case "/increment":

currentValue++;

this.value = currentValue; // 更新内存中的值

await this.state.storage.put("currentValue", currentValue); // 持久化到存储

return new Response(`Value incremented to: ${currentValue}`);

case "/decrement":

currentValue--;

this.value = currentValue;

await this.state.storage.put("currentValue", currentValue);

return new Response(`Value decremented to: ${currentValue}`);

case "/get":

return new Response(`Current value: ${currentValue}`);

case "/reset":

currentValue = 0;

this.value = currentValue;

await this.state.storage.put("currentValue", currentValue);

// 或者删除: await this.state.storage.delete("currentValue");

return new Response("Value reset to 0");

default:

return new Response("Not found", { status: 404 });

}

}


// 示例:使用事务来确保原子性操作

async atomicUpdate(amount) {

await this.state.storage.transaction(async (txn) => {

let val = await txn.get("currentValue") || 0;

val += amount;

if (val < 0) {

// 事务可以被回滚

txn.rollback();

throw new Error("Value cannot be negative after transaction.");

}

await txn.put("currentValue", val);

this.value = val; // 更新内存状态

});

return this.value;

}

}

```


**2. 配置 `wrangler.toml`**


你需要告诉 Cloudflare 你的 Durable Object 类以及如何绑定它。


```toml

name = "my-stateful-worker"

main = "src/worker.js" # 这是你的普通 Worker (Orchestrating Worker)

compatibility_date = "YYYY-MM-DD" # 使用一个较新的日期


# Durable Objects 配置

# 每个 [durable_objects.bindings] 条目定义了一个绑定

# 你可以在 Worker 代码中通过 `env.COUNTER_DO` 来访问它

[[durable_objects.bindings]]

name = "COUNTER_DO" # Worker 中使用的绑定名称

class_name = "MyDurableObject" # Durable Object 类的名称 (需要与导出的类名匹配)

# script_name = "my-durable-object-script" # 如果 DO 在不同的 Worker 脚本中,指定脚本名


# 如果你的 Durable Object 类在同一个 worker.js 文件中,

# 并且你使用了 ES模块格式,通常不需要 script_name。

# Cloudflare 会自动找到它。

# 如果 MyDurableObject 类在 worker.js 中定义并导出,

# 确保你的 worker.js 看起来像这样:

# export { MyDurableObject } from "./MyDurableObject.js"; // 如果在单独文件

# export default { ... } // 你的普通 Worker

```


如果 `MyDurableObject` 类定义在 `src/worker.js` 内部或从 `src/worker.js` 中导出,`script_name` 通常可以省略。如果它在另一个独立的 Worker 脚本中,则需要指定 `script_name`。


**3. 编写普通的 Cloudflare Worker (Orchestrating Worker) (例如,`src/worker.js`)**


这个 Worker 接收外部请求,决定使用哪个 Durable Object 实例,并与之通信。


```javascript

// src/worker.js


// 如果 MyDurableObject 在单独的文件中,像这样导入并导出它

// export { MyDurableObject } from "./MyDurableObject.js";


// 如果 MyDurableObject 定义在这个文件中,确保它被导出

export class MyDurableObject { /* ... Class definition from above ... */ }


export default {

async fetch(request, env, ctx) {

try {

const url = new URL(request.url);

const pathSegments = url.pathname.split('/').filter(Boolean); // e.g., /counter/my-counter-instance/increment


if (pathSegments.length < 2 || pathSegments[0] !== 'counter') {

return new Response("Invalid path. Use /counter/:id/:action", { status: 400 });

}


const counterIdName = pathSegments[1]; // 例如 "my-counter-instance"

const actionPath = "/" + (pathSegments.slice(2).join('/') || "get"); // 例如 "/increment" or "/get"


// 1. 获取 Durable Object ID

// idFromName() 确保对于相同的名称,你总是得到相同的 ID 对象。

// 这个 ID 本身不是字符串,而是一个 DurableObjectId 对象。

let id = env.COUNTER_DO.idFromName(counterIdName);


// 2. 获取 Durable Object 存根 (stub)

// stub 是一个客户端对象,用于与 Durable Object 实例通信。

let stub = env.COUNTER_DO.get(id);


// 3. 将请求转发给 Durable Object 实例

// 你可以创建一个新的请求,或者修改并转发原始请求。

// 这里我们构建一个新的 URL,只包含动作路径。

const doUrl = new URL(request.url);

doUrl.pathname = actionPath;


// 将原始请求的方法、头部和主体(如果存在)转发给 DO

// 注意:如果原始请求是 GET/HEAD,则 body 为 null

return await stub.fetch(doUrl.toString(), {

method: request.method,

headers: request.headers,

body: request.body,

});


} catch (e) {

return new Response(e.message || "Internal Server Error", { status: 500 });

}

},

};

```


**工作流程:**


1. 用户向你的 Worker URL 发起请求,例如 `
https://your-worker.your-account.workers.dev/counter/game-lobby-123/increment`。

2. `Orchestrating Worker` (`src/worker.js`) 的 `fetch` 方法被调用。

3. 它从 URL 中解析出 `counterIdName` ("game-lobby-123") 和 `actionPath` ("/increment")。

4. 它使用 `env.COUNTER_DO.idFromName("game-lobby-123")` 获取一个唯一的 `DurableObjectId`。

5. 然后使用 `env.COUNTER_DO.get(id)` 获取该 Durable Object 实例的存根 (`stub`)。

* 如果这是第一次访问这个 ID,Cloudflare 会在某个边缘节点上创建一个新的 `MyDurableObject` 实例,并运行其构造函数。

* 如果该 ID 的实例已存在(可能在内存中,也可能需要从存储中唤醒),Cloudflare 会将请求路由到现有的实例。

6. `Orchestrating Worker` 通过 `stub.fetch(...)` 将一个包含路径 `/increment` 的新请求发送给 `MyDurableObject` 实例。

7. `MyDurableObject` 实例的 `fetch` 方法被调用,它处理 `/increment` 路径,更新其内部状态 (`this.value`),并将状态持久化到 `this.state.storage`。

8. `MyDurableObject` 返回一个响应。

9. `Orchestrating Worker` 将此响应返回给原始用户。


**应用场景示例:**


* **实时协作应用:** 每个文档或白板可以是一个 Durable Object,处理编辑、光标位置等。

* **聊天室/消息传递:** 每个聊天室是一个 Durable Object,负责接收和广播消息给连接的客户端(通常通过 WebSockets,Durable Objects 支持 WebSockets)。

* **游戏服务器:** 每个游戏会话或游戏大厅可以是一个 Durable Object,管理游戏状态和玩家互动。

* **购物车:** 每个用户的购物车可以是一个 Durable Object,在用户浏览时保持其状态。

* **分布式锁或信号量:** 用于协调跨多个 Worker 的操作。

* **投票/计数器:** 每个投票项目或计数器是一个 Durable Object。


**关键考虑:**


* **ID 的选择:** 如何命名和生成 Durable Object ID 非常重要。它决定了你的状态是如何分区的。

* **粒度:** 一个 Durable Object 应该管理多少状态?太粗粒度可能导致瓶颈,太细粒度可能导致管理开销。

* **事务:** 使用 `
this.state.storage.transaction()` 来执行原子性的多步更新,确保数据一致性。

* **`blockConcurrencyWhile()`:** 在构造函数或 `fetch` 处理程序中执行异步初始化或关键操作时,使用它来防止在这些操作完成前处理其他请求,避免竞争条件。


通过这种方式,Durable Objects 使得在 Cloudflare 边缘构建复杂的、有状态的、低延迟的应用程序成为可能。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言