前言

众所周知,DAO 层(Data Access Layer)是一个后端开发中常见的概念,之前在用 Next.js 官方博客也建议新项目采用 DAO 层作为数据处理模型。

其实对于一些简单的需求来说 DAO 层并不是很重要,因为可以直接上 ORM 框架,以 TypeScript 代码的方式编写 SQL,还能获得比较完善的类型提示,就没有必要将 SQL 代码单独抽出来了。不过有时候可能需要对 SQL 有更加完整的掌控,这时就需要 DAO 层上场了。

咱平时用的数据库是 PostgreSQL,开源社区里这个数据库主要是两个驱动比较流行:node-postgres(npm 上的包名是 pg)和 Postgres.js。由于后者 TypeScript 支持更好,也支持 Deno,所以咱一般用 Postgres.js,前者相对更底层一些,实现本文的 DAO 层会更简单(直接在事务前后分别执行 BEGINCOMMIT/ROLLBACK 即可),所以如果对 Deno/TypeScript 没有什么需求的话可以考虑选前者。

事务难题

嗯嗯,Postgres.js 虽然好,而且 API 设计肥肠简单,执行一段 SQL 就像下面这样:(你可能会觉得眼熟,因为 Vercel 之前在 Next Conf 上公布 RSC 和 Server Action 的时候也用了类似语法的包——Vercel Postgres)

1
sql`SELECT * FROM users WHERE id = ${id}`

但就是因为它的简单 API 设计导致 DAO 层的封装困难……起初咱以为把一系列数据操作的函数放在一个文件中导出就够了:

1
2
3
export function createUser(username: string, password: string) {
sql`INSERT INTO users(username, password) VALUES(${username}, ${password})`
}

后来发现这样做没法开启事务了,因为 Postgres.js 的事务 API 是这样的:

1
2
3
4
5
const result = await sql.begin(sql => [
sql`update ...`,
sql`update ...`,
sql`insert ...`
])

文档中有这样一句说明:

Postgres.js will reserve a connection for the transaction and supply a scoped sql instance for all transaction uses in the callback function.

什么意思呢,意思就是必须要使用 begin 里面的 sql 才能保证这段 SQL 在正确的连接上执行,因为 Postgres.js 为了简化 API 的同时保证性能,自动在幕后维护了一个连接池,每次调用 sql 函数时会自动从连接池中选取一个连接执行 SQL。而如果不使用 begin 给出的 sql 函数,则有可能下面的 SQL 跑到了另一个连接上去,那自然也不可能在同一个事务中了。

封装

第一版

咱想了一个办法,既然需要用指定的 sql 来执行,那么能不能让 DAO 层拿对象的时候从某个指定的位置拿,在调用 DAO 层函数之前可以改变这个值呢,而且改变完之后必须是一个新的 DAO 对象,否则全局共用同一个位置会造成竟态。大致的 API 设计是这样的:

1
2
3
sql.begin(async (sql) => {
peopleDao.with(sql).create();
})

每次调用 with 函数就可以创建一个拥有传入的 sql 对象的新 DAO,编写 create 的时候只需要从这个地方拿就好了。于是写出来了以下的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// deno-lint-ignore-file ban-types
import postgres from "postgres";
import { sql as globalSql } from "./sql.ts";

export function createDao<
T extends Record<string | number | symbol, {
<E extends Record<string, unknown> = {}>(
this: { sql: postgres.Sql<E> | postgres.TransactionSql<E> },
// deno-lint-ignore no-explicit-any
...args: any[]
): unknown;
}>,
E extends Record<string, unknown> = {},
>(
originalDao: T,
): T & {
with: (txSql: postgres.TransactionSql<E>) => T & { sql: postgres.Sql<E> };
sql: postgres.TransactionSql<E> | postgres.Sql<E>;
} {
return {
...originalDao,
sql: globalSql as postgres.Sql<E>,
with: (sql: postgres.TransactionSql<E>) => ({
...originalDao,
sql,
}),
};
}

值得注意的一是有一点点的类型体操,createDao 函数上的类型 T 的约束限定了 DAO 里面的 this 上有一个 sql 对象,这样编写 DAO 层函数的时候可以直接从 this.sql 获得这个对象,而且有完整的类型提示。

第二版

其实上面的封装已经够用了,但咱在使用的过程中发现了一些还是感觉不太满意的地方:

  • 要人工确保在事务中使用 DAO 的时候在每一个上面调用 .with(sql)
  • 虽然上一条可以通过提前调用,然后持有返回的对象来解决,但命名很难受,又是常用变量,命名过长也影响阅读
    1
    2
    3
    sql.begin(async (sql) => {
    const 我该叫什么好??? = peopleDao.with(sql);
    })
  • 就算命名解决好了,对于每一个 DAO 都这样创建一下也很麻烦

没错,偷懒是程序员生产的最大动力。所以咱开发了第二版的封装。

通过类似的方式在 createDAO 之后在创建的 DAO 对象上设置一个 getter,getter 通过 Node.js 的一个 API——AsyncLocalStorage 拿到一个 SQL 对象。

AsyncLocalStorage 可以在不同的异步上下文中保存数据,只有同一个异步上下文才共享数据,因此可以用来保存事务 SQL 对象,不过还需要替换成一个自己实现的 API 来开启事务,这样才能在执行事务之前把 SQL 对象保存到上下文中,并在事务完成后清理上下文。API 使用方式大概是这样:

1
2
3
4
5
6
7
8
9
10
startTransaction(async () => {
await peopleDao.create();
throw new Error("Rollback");
}).then(() => {
console.log("Committed");
}).catch((error) => {
console.log("Rollback", error);
}).finally(() => {
console.log("End");
});

可以看到在使用 DAO 对象的时候已经完全不需要关心是否处于事务当中了。

具体实现代码如下,类型标注部分和第一版实现基本一致,主要是引入了 AsyncLocalStorage 这个 API 来自动获取 SQL 对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// deno-lint-ignore-file ban-types no-explicit-any
import postgres from "postgres";
import { sql as globalSql } from "./sql.ts";
import { AsyncLocalStorage } from "node:async_hooks";

const transactionContextStorage = new AsyncLocalStorage<
{ transactionSql?: postgres.TransactionSql }
>();

type UnwrapPromiseArray<T> = T extends any[] ? {
[k in keyof T]: T[k] extends Promise<infer R> ? R : T[k];
}
: T;

export function startTransaction<T, E extends Record<string, unknown> = {}>(
cb: (sql: postgres.TransactionSql<E>) => T | Promise<T>,
): Promise<UnwrapPromiseArray<T>> {
return globalSql.begin<T>((sql) =>
transactionContextStorage.run(
{ transactionSql: sql },
() => cb(sql as postgres.TransactionSql<E>),
)
);
}

export function createDao<
T extends Record<string | number | symbol, {
<E extends Record<string, unknown> = {}>(
this: { sql: postgres.Sql<E> | postgres.TransactionSql<E> },
...args: any[]
): unknown;
}>,
E extends Record<string, unknown> = {},
>(
originalDao: T,
): T & {
readonly sql: postgres.TransactionSql<E> | postgres.Sql<E>;
} {
return {
...originalDao,
get sql() {
return (transactionContextStorage.getStore()?.transactionSql ??
globalSql) as postgres.TransactionSql<E> | postgres.Sql<E>;
},
};
}

这就是咱目前正在使用的 DAO 层封装实现了,暂时还没有遇到什么其他问题,而且这里有一点小巧思,startTransaction 接收的 callback 里面是可以拿到事务 SQL 对象的,而且类型也与 Postgres.js 提供的完全一致,因此如果有什么其他需求也可以拿到这个 SQL 对象直接操作。

另外在查 AsyncLocalStorage API 相关资料的时候发现了一个提案:tc39/proposal-async-context,目前还在 Stage 2,如果通过了的话就意味着浏览器环境也可以做类似的事情了。

如果你想测试一下上面的两种实现或者看看具体使用的示例,点这里展开

DAO 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const peopleDao = createDao({
async create() {
await this
.sql`INSERT INTO people (name, credit) VALUES (${"Alice"}, ${100})`;
await this.sql`INSERT INTO people (name, credit) VALUES (${"Bob"}, ${200})`;
},
async read(name: string) {
const [result] = await this.sql`SELECT * FROM people WHERE name = ${name}`;
return {
name: result.name as string,
credit: result.credit as number,
};
},
async update(name: string, credit: number) {
await this.sql`UPDATE people SET credit = ${credit} WHERE name = ${name}`;
},
});

第一版封装测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
sql.begin(async (sql) => {
await peopleDao.with(sql).create()
throw new Error("Rollback");
}).then(() => {
console.log("Committed");
}).catch((error) => {
console.log("Rollback", error);
}).finally(() => {
console.log("End");
});

sql.begin(async (sql) => {
const peopleDao2 = peopleDao.with(sql);
let [aliceCredit, bobCredit] = await Promise.all([
peopleDao2.read("Alice"),
peopleDao2.read("Bob"),
]);
console.log(aliceCredit, bobCredit);
await Promise.all([
peopleDao2.update("Alice", aliceCredit.credit - 10),
peopleDao2.update("Bob", bobCredit.credit + 10),
]);
throw new Error("Rollback");
[aliceCredit, bobCredit] = await Promise.all([
peopleDao2.read("Alice"),
peopleDao2.read("Bob"),
]);
console.log(aliceCredit, bobCredit);
}).then(() => {
console.log("Committed");
}).catch((error) => {
console.log("Rollback", error);
}).finally(() => {
console.log("End");
});

with 调用去掉就可以体验一下不做封装直接使用全局对象的后果。

第二版封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
startTransaction(async () => {
let [aliceCredit, bobCredit] = await Promise.all([
peopleDao.read("Alice"),
peopleDao.read("Bob"),
]);
console.log(aliceCredit, bobCredit);
await Promise.all([
peopleDao.update("Alice", aliceCredit.credit - 10),
peopleDao.update("Bob", bobCredit.credit + 10),
]);
throw new Error("Rollback");
[aliceCredit, bobCredit] = await Promise.all([
peopleDao.read("Alice"),
peopleDao.read("Bob"),
]);
console.log(aliceCredit, bobCredit);
}).then(() => {
console.log("Committed");
}).catch((error) => {
console.log("Rollback", error);
}).finally(() => {
sql.end();
console.log("End");
});

startTransition 换成 sql.begin 就可以体验不做封装直接使用全局对象的后果。