cover_image

sqlx: 一个优秀的rust异步SQL库

晁岳攀(鸟窝) 鸟窝聊技术
2024年05月12日 05:13

上一篇我介绍了Go 生态圈的 sqlx 库sqlx: 功能强大的数据库访问库。Rust 生态圈也有一个知名的sqlx 库[2],今天给大家介绍一下。这两个没有什么关联啊,纯粹属于名称相同而已。

概览

sqlx 是一个为 Rust 语言提供的功能齐全的数据库访问和查询构建器库。它支持多种数据库,包括 PostgreSQL、MySQL、SQLite 等。sqlx 的设计目标是成为 Rust 中最直观、高效且类型安全的数据库客户端。

  • 真正的异步。从头开始使用 async/await 构建,以实现最大的并发性。
  • 编译时检查查询(如果你需要的话)。请注意,sqlx 不是 ORM。
  • 与数据库无关。支持 PostgreSQL、MySQL、MariaDB、SQLite。
  • 纯 Rust。Postgres 和 MySQL/MariaDB 驱动程序是使用零不安全的代码以纯 Rust 编写的。
  • 与运行时无关。在不同的运行时(async-std/tokio/actix)和 TLS 后端(native-tls,rustls)上运行。
  • SQLite 驱动程序使用 libsqlite3 C 库。
  • sqlx 除非启用了 sqlite 功能,否则会使用 #![forbid(unsafe_code)]。SQLite 驱动程序通过 libsqlite3-sys 直接调用 SQLite3 API,这需要使用不安全的代码。

另外,它还有以下特性:

  • 跨平台。作为原生的 Rust 代码,sqlx 可以在任何支持 Rust 的平台上编译。
  • 内建的连接池功能,使用 sqlx::Pool
  • 行流式处理。数据从数据库异步读取并按需解码。
  • 自动的语句准备和缓存。当使用高级查询 API(sqlx::query)时,语句将按连接进行准备和缓存。
  • 简单的(未准备)查询执行,包括将结果获取到与高级 API 使用的相同 Row 类型。支持批量执行并返回所有语句的结果。
  • 传输层安全性(TLS)在支持的平台(MySQL、MariaDB 和 PostgreSQL)上可用。
  • 使用 LISTENNOTIFY 以支持 PostgreSQL 异步通知。
  • 支持保存点的嵌套事务。
  • 任何数据库驱动程序,允许在运行时更改数据库驱动程序。AnyPool 根据 URL 方案连接到指定的驱动程序。

sqlx 支持编译时检查的查询。然而,它并不是通过提供一个 Rust API 或 DSL(特定领域语言)来构建查询来实现这一点的。相反,它提供了宏,这些宏接受常规的 SQL 作为输入,并确保其对于您的数据库是有效的。其工作原理是,sqlx 在编译时连接到您的开发数据库,让数据库本身验证(并返回一些有关)您的 SQL 查询的信息。这有一些可能令人惊讶的含义:

  • 由于 sqlx 不需要解析 SQL 字符串本身,因此可以使用开发数据库接受的任何语法(包括数据库扩展添加的内容)
  • 由于数据库允许您检索的查询信息量不同,从查询宏获得的 SQL 验证的程度取决于数据库。

它不是一个 ORM 库,你如果想使用 ORM 库,可以参考ormx[3]SeaORM[4]

安装

sqlx 支持多种异步运行时,你可以通过选择不同的特性来使用不同的异步运行时。目前支持的异步运行时有async-std, tokioactix(其实是 tokio 的别名)。还支持 TLS 的连接:

# Cargo.toml
[dependencies]
# P挑选下面的一行引入sqlx:

# tokio (no TLS)
sqlx = { version = "0.7", features = [ "runtime-tokio" ] }
# tokio + native-tls
sqlx = { version = "0.7", features = [ "runtime-tokio""tls-native-tls" ] }
# tokio + rustls
sqlx = { version = "0.7", features = [ "runtime-tokio""tls-rustls" ] }

async-std (no TLS)
sqlx = { version = "0.7", features = [ "runtime-async-std" ] }
async-std + native-tls
sqlx = { version = "0.7", features = [ "runtime-async-std""tls-native-tls" ] }
async-std + rustls
sqlx = { version = "0.7", features = [ "runtime-async-std""tls-rustls" ] }

如果你引入了多个异步运行时,默认首选tokio

同时你也需要引入所需的数据库特性:

sqlx = { version = "0.7", features = [ "postgres" ] }
sqlx = { version = "0.7", features = [ "mysql" ] }
sqlx = { version = "0.7", features = [ "sqlite" ] }
sqlx = { version = "0.7", features = [ "any" ] }

以及一些其他的关于数据类型的特性等,比如chronouuidtimebstrbigdecimalrust_decimalipnetwork等。

derive支持 derive 类型的宏,如FromRow, Type, Encode, Decode.

macros 增加了对 query*! 宏的支持,该宏允许进行编译时检查的查询。

一个简单的 sqlx 示例如下:

use sqlx::postgres::PgPoolOptions;

#[tokio::main] // 异步运行时
async fn main() -> Result<(), sqlx::Error> {
    // 使用一个数据库的连接池
    // 不同的数据库选择不同的连接池构建器
    let pool = MySqlPoolOptions::new()
        .max_connections(5)
        .connect("mysql://root:password@localhost/test").await?;

    // 执行一个简单的查询
    let row: (i64,) = sqlx::query_as("SELECT ?")
        .bind(150_i64)
        .fetch_one(&pool).await?;

    assert_eq!(row.0150);

    Ok(())
}

连接数据库

sqlx 支持多种不同的方式来连接数据库。最常见和推荐的是使用连接池。

建立连接池

连接池可以显著提高应用程序的性能和并发能力。通过重用连接,减少了建立新连接的开销。

use sqlx::postgres::PgPoolOptions;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect("postgres://postgres:@localhost")
        .await?;

    Ok(())
}

上面的代码创建了一个最大 5 个连接的 PostgreSQL 连接池。PgPoolOptions提供了各种配置选项。

比如你可以不通过 dsn 字符串,而是通过方法进行用户名和密码设置:

let conn = PgConnectOptions::new()
    .host("secret-host")
    .port(2525)
    .username("secret-user")
    .password("secret-password")
    .ssl_mode(PgSslMode::Require)
    .connect()
    .await?;

甚至可以在解析 dsn 字符串后再修改特定的参数,如下面的 mysql 示例:

use sqlx::{Connection, ConnectOptions};
use sqlx::mysql::{MySqlConnectOptions, MySqlConnection, MySqlPool, MySqlSslMode};

// dsn string
let conn = MySqlConnection::connect("mysql://root:password@localhost/db").await?;

// 手工构造
let conn = MySqlConnectOptions::new()
    .host("localhost")
    .username("root")
    .password("password")
    .database("db")
    .connect().await?;

// 从dsn字符串解析Options
let mut opts: MySqlConnectOptions = "mysql://root:password@localhost/db".parse()?;

// 修改参数
opts.log_statements(log::LevelFilter::Trace);

// 创建连接池
let pool = MySqlPool::connect_with(&opts).await?;

单个连接

有时您可能只需要一个简单的单连接,而不需要连接池。

use sqlx::postgres::PgConnOptions;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let conn = PgConnOptions::new()
        .connect("postgres://postgres:@localhost")
        .await?;

    Ok(())
}

查询

在 SQL 中,查询可以分为预编译(参数化)或未预编译(简单)的。预编译查询会缓存其查询计划,使用二进制通信模式(降低带宽和更快的解码速度),并利用参数来避免 SQL 注入。未预编译的查询是简单的,并且仅用于无法使用预编译语句的情况,例如各种数据库命令(如 PRAGMASETBEGIN)。

sqlx 支持使用这两种类型的查询进行所有操作。在 sqlx 中,一个 &str 被当作未预编译的查询来处理,而 QueryQueryAs 结构体被当作预编译的查询来处理。

在其他语言中,预编译就是 prepared statement,未预编译就是 unprepared statement。

// 底层执行
conn.execute("BEGIN").await?; // 未预编译,简单查询
conn.execute(sqlx::query("DELETE FROM table")).await?; // 预编译,此连接会缓存查询

我们应该尽可能使用高级查询接口。为了使这更加容易,这些类型上有终结器(finalizers),这样就不需要使用执行器(executor)来包装它们。换句话说,sqlx 提供了高级查询接口,这些接口使得与数据库的交互更加简洁和直观。这些接口被设计为可以独立工作,而不需要显式地创建一个执行器对象来执行查询。终结器(在这里可能指的是一些内部机制或方法)确保了这些高级接口在使用后可以正确地清理和关闭相关资源,从而简化了开发者的工作。

sqlx::query("DELETE FROM table").execute(&mut conn).await?;
sqlx::query("DELETE FROM table").execute(&pool).await?;

在 sqlx 中,执行查询(execute)的终结器会返回受影响的行数(如果有的话),并丢弃所有接收到的结果。此外,还提供了 fetchfetch_onefetch_optionalfetch_all 方法来接收结果。

sqlx::query 返回的 Query 类型, 它会从数据库中返回 Row<'conn>。可以使用 row.get() 方法通过索引或名称访问列值。由于 Row 保持了对连接的不可变借用,因此一次只能存在一个 Row

fetch 查询的终结器返回一个类似流的类型,该类型会遍历结果集中的行。你可以通过迭代这个流来访问每一行数据。这通常用于处理查询结果集中有多行数据的情况。

// 提供 `try_next`
use futures::TryStreamExt;

let mut rows = sqlx::query("SELECT * FROM users WHERE email = ?")
    .bind(email)
    .fetch(&mut conn);

while let Some(row) = rows.try_next().await? {
    // 将row映射到用户定义的领域类型
    let email: &str = row.try_get("email")?;
}

为了将 row 映射到领域类型,可以使用以下两种模式之一:

  1. 手工映射
let mut stream = sqlx::query("SELECT * FROM users")
    .map(|row: PgRow| {
        // 映射row到用户定义的领域类型
    })
    .fetch(&mut conn);
  1. 使用query_asbind方法
#[derive(sqlx::FromRow)]
struct User { name: String, id: i64 }

let mut stream = sqlx::query_as::<_, User>("SELECT * FROM users WHERE email = ? OR name = ?")
    .bind(user_email)
    .bind(user_name)
    .fetch(&mut conn);

除了使用类似流的类型来遍历结果集之外(fetch),我们还可以使用 fetch_onefetch_optional 来从数据库中请求一个必需的或可选的结果。

  • fetch_one: 这个方法会尝试从结果集中获取第一行数据。如果结果集为空(即没有数据),那么 fetch_one 通常会返回一个错误。这个方法适用于你期望查询结果只有一行数据的情况。
  • fetch_optional: 这个方法类似于 fetch_one,但它返回一个可选的结果(Option<Row>Option<T>,如果使用了类型映射)。如果结果集为空,它将返回 None 而不是错误。这使得它在处理可能返回零行或多行数据的查询时更加灵活,但你又只关心第一行数据(如果存在)的情况下特别有用。

使用这两个方法可以帮助你更直接地处理那些只返回单个结果(或可能不返回结果)的查询

原生查询和参数化查询

sqlx 支持执行原生 SQL 查询,也支持使用绑定参数进行参数化查询,后者有助于防止 SQL 注入攻击。

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = /* 连接池初始化 */

    // 原生查询
    let name: String = sqlx::query_scalar("SELECT name FROM users WHERE id = 1")
        .fetch_one(&pool)
        .await?;

    // 参数化查询
    let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM users WHERE email LIKE $1")
        .bind("%@example.com")
        .fetch_one(&pool)
        .await?;

    Ok(())
}

流式查询

sqlx 支持流式查询,这意味着你可以在查询结果返回时立即处理它们,而不需要等待整个结果集加载完毕。这对于处理大量数据或需要实时处理数据的情况非常有用。

use sqlx::postgres::PgRow;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = /* 连接池初始化 */

    let mut rows = sqlx::query("SELECT id, name FROM users")
        .fetch(&pool);

    while let Some(row) = rows.try_next().await? {
        let id: i32 = row.try_get(0)?;
        let name: &str = row.try_get(1)?;
        println!("{} {}", id, name);
    }

    Ok(())
}

查询结果映射到 Rust 数据结构

最常见的查询方式是将结果映射到一个 Rust 数据结构,比如结构体或元组结构体。sqlx 会自动将数据库列映射到结构体字段。

比如下面这个例子是查询一个用户的信息:

use sqlx::FromRow;

#[derive(FromRow)]
struct User {
    id: i32,
    name: String,
    email: String,
}

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = /* 连接池初始化 */

    let user = sqlx::query_as::<_, User>("SELECT id, name, email FROM users WHERE id = $1")
        .bind(42)
        .fetch_one(&pool)
        .await?;

    println!("{:?}", user);
    Ok(())
}

又比如下面这个例子查询一组书籍的信息:

use sqlx::postgres::PgPool;
use sqlx::FromRow;

#[derive(FromRow)]
struct Book {
    id: i32,
    title: String,
    author: String,
}

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPool::connect("postgres://postgres:@localhost").await?;

    let books = sqlx::query_as::<_, Book>("SELECT * FROM books")
        .fetch_all(&pool)
        .await?;

    for book in books {
        println!("{} - {} ({})", book.id, book.title, book.author);
    }

    Ok(())
}

执行语句

除了查询,sqlx 还支持执行其他 SQL 语句,如 INSERT、UPDATE 和 DELETE 等。它提供了多种执行这些语句的方法,包括支持事务。

执行语句

最简单的执行语句方式是使用execute函数:

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = /* 连接池初始化 */

    let inserted_rows = sqlx::query("INSERT INTO users (name, email) VALUES ($1, $2)")
        .bind("User1").bind("user1@example.com")
        .execute(&pool)
        .await?
        .rows_affected();

    println!("Inserted {} rows", inserted_rows);
    Ok(())
}

上面这个例子是插入一行数据到users表中,并打印出插入的行数。

不要被sqlx::query这个名字所误导,它不仅仅用于查询,还可以用于执行其他 SQL 语句。

下面是一个插入/更新/删除数据的示例:

use sqlx::postgres::PgPool;

#[derive(Debug)]
struct User {
    id: i32,
    name: String,
    email: String,
}

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPool::connect("postgres://postgres:@localhost").await?;

    // 插入新用户
    let user = User { id: 0, name: "NewUser".into(), email: "new@example.com".into() };
    let id = sqlx::query("INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id")
        .bind(&user.name).bind(&user.email)
        .fetch_one(&pool)
        .await?
        .get(0);

    println!("Inserted user with id: {}", id);

    // 更新用户
    let updated_rows = sqlx::query("UPDATE users SET email=$1 WHERE id=$2")
        .bind("updated@example.com").bind(id)
        .execute(&pool)
        .await?
        .rows_affected();

    println!("Updated {} rows", updated_rows);

    // 删除用户
    let deleted_rows = sqlx::query("DELETE FROM users WHERE id=$1")
        .bind(id)
        .execute(&pool)
        .await?
        .rows_affected();

    println!("Deleted {} rows", deleted_rows);

    Ok(())
}

事务

sqlx 支持事务,你可以使用transaction方法来执行一个事务。要执行多个语句作为一个原子事务,您可以使用begincommitrollback函数:

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = /* 连接池初始化 */

    let mut tx = pool.begin().await?;

    sqlx::query("UPDATE users SET email=$1 WHERE id=$2")
        .bind("new@email.com").bind(42)
        .execute(&mut tx)
        .await?;

    sqlx::query("DELETE FROM users WHERE id=$1")
        .bind(43)
        .execute(&mut tx)
        .await?;

    tx.commit().await?;

    Ok(())
}

上面的示例首先开始一个新事务,然后执行两个语句,最后提交事务。如果中间任何一步失败,可以调用rollback回滚整个事务。

面是一个使用 sqlx 中事务和回滚(rollback)的示例:

use sqlx::postgres::PgPool;
use sqlx::Error;

#[tokio::main]
async fn main() -> Result<(), Error> {
    let pool = PgPool::connect("postgres://postgres:@localhost").await?;

    // 开始一个事务
    let mut transaction = pool.begin().await?;

    // 执行一些操作
    sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE id = $2")
        .bind(100.0// 从账号中扣除100元
        .bind(1)
        .execute(&mut transaction)
        .await?;

    sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE id = $2")
        .bind(100.0// 将100元转账到另一个账号
        .bind(2)
        .execute(&mut transaction)
        .await?;

    // 模拟一个错误情况
    if should_rollback() {
        // 回滚事务
        transaction.rollback().await?;
        println!("Transaction rolled back");
    } else {
        // 提交事务
        transaction.commit().await?;
        println!("Transaction committed");
    }

    Ok(())
}

fn should_rollback() -> bool {
    // 一些条件判断,决定是否需要回滚
    // 这里为了演示,我们随机返回true或false
    rand::thread_rng().gen_bool(0.5)
}

在这个示例中,我们首先使用pool.begin()开始一个新的事务。然后,我们执行两个查询,分别从一个账户扣除 100 元,并将这 100 元转账到另一个账户。接下来,我们调用should_rollback()函数来模拟一个错误情况。如果should_rollback()返回 true,我们就调用transaction.rollback().await?来回滚整个事务。否则,我们调用transaction.commit().await?来提交事务。

在真实情况下,您可能会在遇到某些异常或错误时触发回滚,例如:

  • 违反了某些业务规则或数据完整性约束
  • 发生了意外的异常或错误
  • 用户取消或中断了操作
  • 出于某些原因,整个事务需要被回滚

通过使用事务和回滚,您可以确保数据库中的更改要么全部成功,要么完全回滚,从而保持数据的一致性和完整性。这对于处理敏感操作或需要多个步骤的复杂操作非常重要。

连接池和并发

sqlx 内置了连接池支持,这使得它天生就支持高效的并发查询。通过连接池,可以避免为每个查询创建新连接的开销。

连接池管理

sqlx 中的连接池由PgPool之类的类型表示。您可以直接创建一个连接池实例,也可以使用PgPoolOptions来定制配置:

use sqlx::postgres::PgPoolOptions;

let pool = PgPoolOptions::new()
    .max_connections(10)
    .connect("postgres://postgres:@localhost")
    .await?;

上面的代码创建了一个最大连接数为 10 的 PostgreSQL 连接池。PgPoolOptions提供了各种配置选项,如最大连接数、最小连接数、连接超时等。

并发查询

由于 sqlx 内置了连接池,因此并发查询变得非常简单。你只需要在多个异步任务中并行执行查询即可:

use sqlx::postgres::PgPool;
use std::time::Instant;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPool::connect("postgres://postgres:@localhost").await?;
    let tasks = (0..10)
        .map(|_| {
            let pool = pool.clone();
            tokio::spawn(async move { // 并发
                let now = Instant::now();
                let _ = sqlx::query("SELECT pg_sleep(1)").execute(&pool).await;
                println!("Task completed in {:?}", now.elapsed());
            })
        })
        .collect::<Vec<_>>();

    for task in tasks {
        task.await?;
    }
    Ok(())
}

上面的代码创建了一个包含 10 个任务的并发查询。每个任务都会执行一个简单的查询,然后打印出执行时间。通过并发查询,您可以同时执行多个查询,从而提高查询效率。

下面是一个更实际的示例,模拟了并发处理多个 Web 请求的场景:

use sqlx::postgres::{PgPool, PgRow};
use std::io;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPool::connect("postgres://postgres:@localhost").await?;

    let requests = vec![
        "SELECT * FROM users WHERE id = $1",
        "SELECT * FROM products WHERE category = $1",
        "SELECT * FROM orders WHERE user_id = $1",
    ];

    for request in requests {
        let rows = sqlx::query(request)
            .bind(42)
            .fetch_all(&pool)
            .await?;

        for row in rows {
            print_row(row);
        }

        println!();
    }

    Ok(())
}

fn print_row(row: PgRow) {
    let cols = row.columns();
    let values: Vec<&str> = row.get_refs(cols).into_iter().map(|v| v.unwrap()).collect();
    println!("{}", values.join(", "));
}

在这个示例中,我们模拟了处理多个 Web 请求的场景。我们定义了一个包含多个查询的请求列表,然后并发执行这些查询。每个查询都会返回一组行数据,我们将这些行数据打印出来。通过并发查询,我们可以同时处理多个请求,从而提高系统的性能和效率。

JSON 支持

现代数据库广泛支持 JSON 数据类型,sqlx 也为此提供了非常好的支持。您可以方便地查询 JSON 类型以及将查询结果映射为 JSON。

查询 JSON 类型

在数据库中,JSON 类型通常被存储为文本。sqlx 允许你直接查询和处理 JSON 数据:

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = /* 连接池初始化 */

    let json_data: serde_json::Value = sqlx::query(r#"
        SELECT '[{"id": 1, "name": "Product 1"}, {"id": 2, "name": "Product 2"}]'::json
    "#
)
        .fetch_one(&pool)
        .await?
        .get(0);

    println!("{:?}", json_data);
    Ok(())
}

这个例子查询了一个 JSON 数组,并将其直接映射为 serde_json::Value

将查询结果映射为 JSON

您还可以将常规的查询结果映射为 JSON 格式。这对于构建 API 或与前端交互非常有用。

use serde::{Serialize, Deserialize};

#[derive(Deserialize, Serialize)]
struct Product {
    id: i32,
    name: String,
    price: f64,
}

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = /* 连接池初始化 */

    let products: Vec<Product> = sqlx::query_as(
        "SELECT id, name, price FROM products"
    )
    .fetch_all(&pool)
    .await?;

    let json = serde_json::to_string(&products)?;
    println!("{}", json);

    Ok(())
}

这个例子查询了产品列表,并使用serde_json将其序列化为 JSON 格式。

使用 PostgreSQL 的 JSON 类型

这是一个更全面的示例,展示了如何在 PostgreSQL 中使用 JSON 类型:

use serde::{Deserialize, Serialize};
use sqlx::postgres::PgPool;
use sqlx::types::JsonValue;

#[derive(Serialize, Deserialize)]
struct User {
    id: i32,
    name: String,
    profile: JsonValue,
}

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPool::connect("postgres://postgres:@localhost").await?;

    // 插入用户及其JSON配置文件
    let profile = serde_json::json!({
        "bio""Software Engineer",
        "interests": ["coding""reading"]
    });

    let user = User {
        id: 0,
        name: "NewUser".into(),
        profile: profile.into(),
    };

    let id = sqlx::query("INSERT INTO users (name, profile) VALUES ($1, $2) RETURNING id")
        .bind(&user.name)
        .bind(&user.profile)
        .fetch_one(&pool)
        .await?
        .get(0);

    // 查询并打印用户及其配置文件
    let user: User = sqlx::query_as("SELECT id, name, profile FROM users WHERE id = $1")
        .bind(id)
        .fetch_one(&pool)
        .await?;

    println!("{:?}", user);

    Ok(())
}

在这个例子中,我们首先使用serde_json创建了一个 JSON 值,作为用户配置文件。然后,我们将这个 JSON 值 插入到数据库中。最后,我们查询用户并将配置文件作为 JsonValue 类型获取。

通知和监听

sqlx 提供了在数据库中监听通知(NOTIFY/LISTEN)的功能,这使得构建基于事件的、实时应用程序成为可能。

数据库通知数据库通知是一种机制,允许应用程序在数据库中发生某些事件时接收通知。这种功能在构建基于事件的系统(如聊天应用程序或实时仪表板)时非常有用。

-- 在数据库中触发通知
NOTIFY channel_name, 'hello';

使用监听器sqlx 通过DatabaseNotification结构体来表示接收到的通知。您可以在应用程序中设置一个监听器,以接收并处理这些通知。

use sqlx::postgres::PgListener;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let listener = PgListener::bind("postgresql://localhost/").await?;
    listener.listen("channel_name").await?;

    loop {
        let notification = listener.recv().await?;
        println!(
            "Received notification: {} ({})",
            notification.payload, notification.payload_pretty(),
        );
    }
}

上面的示例创建了一个PostgreSQL监听器,并开始监听名为channel_name的通道。当接收到通知时,它会打印出通知的有效负载。

这是一个更完整的示例,展示了如何在 PostgreSQL 中设置通知并在应用程序中监听它们:

use sqlx::postgres::{PgPool, PgListener};

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPool::connect("postgres://postgres:@localhost").await?;
    let listener = PgListener::bind("postgresql://localhost/").await?;

    // 创建一个通知通道
    sqlx::query("LISTEN channel_name").execute(&pool).await?;

    // 在另一个连接上触发通知
    sqlx::query("NOTIFY channel_name, 'hello'").execute(&pool).await?;

    // 等待并处理通知
    if let Some(notification) = listener.recv().await? {
        println!("Received notification: {}", notification.payload);
    }

    Ok(())
}

在这个例子中,我们首先创建了一个 PostgreSQL 监听器,并在数据库中设置了一个名为channel_name的通知通道。然后,我们在另一个连接上触发了一个通知。最后,监听器接收到通知并打印出了它的有效负载。

测试

编写测试对于任何健壮的软件系统都是必不可少的,sqlx 也不例外。幸运的是,sqlx 提供了多种方式来测试与数据库交互的代码。

测试连接

最基本的测试是确保您的应用程序能够成功连接到数据库。您可以使用 sqlx 提供的try_connect函数进行测试:

use sqlx::PgPool;

#[tokio::test]
async fn test_connection() {
    let pool = PgPool::try_connect("postgres://postgres:@localhost").await.unwrap();
    // 执行一些操作来测试连接...
}

测试查询

您还可以测试查询,以确保它们能够正确地执行并返回预期的结果。您可以使用queryquery_as函数来测试查询:

use sqlx::PgPool;

#[tokio::test]
async fn test_query() {
    let pool = PgPool::connect("postgres://postgres:@localhost").await.unwrap();

    let row: (i64,) = sqlx::query_as("SELECT 1")
        .fetch_one(&pool)
        .await
        .unwrap();

    assert_eq!(row.01);
}

使用内存数据库

sqlx 支持使用内存数据库进行测试,例如 SQLite 内存数据库。这种方式快速、轻量,非常适合单元测试。

#[tokio::test]
async fn test_query() {
    let pool = sqlx::SqlitePool::connect(":memory:").await.unwrap();
    // 执行测试...
}

对于更全面的集成测试,您可以在测试用例中创建一个临时的测试数据库,执行所需的操作,然后在测试结束时清理该数据库。这种方式更接近真实的生产环境。

使用 mock 数据库

msql-srvopensrv-clickhouseopensrv-mysql

下面是一个使用集成测试数据库进行测试的例子:

use sqlx::postgres::{PgPool, PgRow};

#[tokio::test]
async fn test_user_operations() {
    let pool = create_test_pool().await;

    // 准备测试数据
    sqlx::query("CREATE TABLE users (id SERIAL PRIMARY KEY, name TEXT, email TEXT)")
        .execute(&pool)
        .await
        .unwrap();

    // 插入新用户
    let name = "Test User".to_owned();
    let email = "test@example.com".to_owned();
    let id = insert_user(&pool, &name, &email).await;

    // 查询并验证用户数据
    let row: PgRow = sqlx::query_as("SELECT id, name, email FROM users WHERE id = $1")
        .bind(id)
        .fetch_one(&pool)
        .await
        .unwrap();

    assert_eq!(row.get::<i32, _>(0), id);
    assert_eq!(row.get::<String, _>(1), name);
    assert_eq!(row.get::<String, _>(2), email);
}

async fn insert_user(pool: &PgPool, name: &str, email: &str) -> i32 {
    sqlx::query("INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id")
        .bind(name)
        .bind(email)
        .fetch_one(pool)
        .await
        .unwrap()
        .get(0)
}

async fn create_test_pool() -> PgPool {
    let db_name = "test_database";
    let pool = PgPool::connect(&format!("postgres://postgres:@localhost/{}", db_name))
        .await
        .unwrap();

    // 清理并重新创建测试数据库
    sqlx::query(&format!("DROP DATABASE IF EXISTS {}", db_name))
        .execute(&pool)
        .await
        .unwrap();
    sqlx::query(&format!("CREATE DATABASE {}", db_name))
        .execute(&pool)
        .await
        .unwrap();

    pool
}

在这个示例中,我们首先创建了一个专用的测试数据库。然后我们在这个数据库中创建了一个 users 表,并进行了插入、查询等操作,最后验证了查询结果。

高级主题

除了基础功能外,sqlx 还提供了一些高级功能,如自定义类型映射、编译时检查和性能分析等,可以进一步提高您的生产力和应用程序的性能。

自定义类型映射

sqlx 允许您定义自定义的数据类型映射规则,将数据库中的数据类型映射到 Rust 中的类型。这对于处理一些特殊的数据类型或实现自定义的逻辑非常有用。

use sqlx::types::Type;
use sqlx::postgres::{PgTypeInfo, PgValueRef};

struct MyType(String);

impl Type<PgTypeInfo> for MyType {
    fn type_info() -> PgTypeInfo {
        PgTypeInfo::with_name("mytype")
    }

    fn readable_name() -> String {
        "MyType".into()
    }
}

impl<'r> PgValueRef<'rfor MyType {
    fn from_pg_value(value: Option<&'r [u8]>) -> Option<MyType> {
        value.map(|bytes| MyType(String::from_utf8_lossy(bytes).into_owned()))
    }

    fn to_pg_value(&self) -> Option<Vec<u8>> {
        Some(self.0.as_bytes().to_vec())
    }
}

在这个例子中,我们定义了一个名为MyType的自定义数据类型,并实现了TypePgValueRef trait。这样,我们就可以将数据库中的mytype类型映射到 Rust 中的MyType类型。

编译时检查

sqlx 提供了一些宏和编译时检查功能,可以在编译时捕获一些错误,而不是在运行时才发现。这有助于提高代码质量和安全性。

use sqlx::query;

#[rustfmt::skip]
let query = query!(
    "
    SELECT id, name, email
    FROM users
    WHERE id = ?
    "
,
    42
);

上面的query!宏可以在编译时检查 SQL 语句的语法错误,并验证绑定参数的数量和类型。这样可以避免在运行时才发现这些问题。

类似的宏还有query_as!query_scalar!query_file!query_file!query_file_scalar!以及它们的变种query_xxx_unchecked!

执行时间

你可以通过计时来分析查询的性能,并根据结果进行优化。sqlx 提供了一些工具来帮助您分析查询的性能,如log_statementslog_slow_statements等。

use sqlx::query;
use sqlx::postgres::PgQueryAs;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = /* 连接池初始化 */

    let query = query_as!(
        User,
        r#"
        SELECT id, name, email
        FROM users
        WHERE id = $1
        "#

    );

    for attempt in 0..5 {
        let time = std::time::Instant::now();
        let _users: Vec<User> = query.bind(42).fetch_all(&pool).await?;

        let elapsed = time.elapsed();
        println!("Query attempt {attempt} took: {elapsed:?}");
    }

    Ok(())
}

打印日志

ConnectOptions提供了两个设置日志的方法:

  • log_statements: 使用指定的级别打印执行语句
  • log_slow_statements: 使用指定的级别打印执行时间超过指定阈值的 SQL 语句。
use sqlx::postgres::PgPoolOptions;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPoolOptions::new()
        .max_connections(5)
        .log_statements(log::LevelFilter::Debug// 记录所有SQL语句
        .log_slow_statements(log::LevelFilter::Warn, std::time::Duration::from_millis(100)) // 记录执行时间超过100ms的慢查询
        .connect("postgres://postgres:@localhost")
        .await?;

    // 执行一些查询
    let row: (i64,) = sqlx::query_as("SELECT 42")
        .fetch_one(&pool)
        .await?;

    println!("Result: {}", row.0);

    Ok(())
}

最佳实践和故障排除

再啰嗦几句。

在使用 sqlx 时,遵循一些最佳实践可以帮助您编写更加安全、高效和可维护的代码。此外,掌握一些常见错误和故障排除技巧也很有帮助。

sqlx 最佳实践

  • 使用参数化查询: 始终使用带参数的查询,而不是字符串插值。这可以防止 SQL 注入攻击。
  • 监控连接池指标: 监控连接池的指标,如活跃连接数、获取连接等待时间等,以确保连接池配置正确。
  • 避免 ORM: sqlx 是一个查询构建器,而不是完整的对象关系映射(ORM)库。尽量避免在 sqlx 中复制 ORM 功能。
  • 使用流式查询: 对于大型查询结果集,使用流式查询可以避免一次性加载所有数据到内存中。
  • 利用编译时检查: 使用 sqlx 提供的 query!和 query_as!宏,可以在编译时捕获 SQL 语法错误和类型不匹配等问题。
  • 测试覆盖: 为您的数据库交互代码编写单元测试和集成测试,以确保正确性和稳定性。

常见错误和故障排除

  • 连接池耗尽: 如果出现"连接池耗尽"错误,可能是因为并发请求过多或连接池配置不当导致的。检查连接池指标并适当调整max_connections
  • 死锁: 在事务中执行多个查询时,可能会遇到死锁情况。确保正确使用事务,并实现重试逻辑。
  • 类型不匹配: 如果遇到"无法将 PostgreSQL 类型映射到 Rust 类型"之类的错误,检查您的结构体字段类型是否与数据库列类型匹配。
  • SQL 语法错误: 如果出现 SQL 语法错误,首先检查您是否使用了参数化查询。如果使用了 query!宏,也可能是宏解析出现了问题。
  • 查询性能差: 如果查询性能较差,可以使用 sqlx 提供的查询追踪功能分析查询执行情况,并优化慢查询。如果频繁创建连接,检查连接池配置是否合理,比如min_connections是否过小

生产就绪建议

  • 启用日志记录: 在生产环境中合理启用 sqlx 的日志记录,以便更好地调试和监控应用程序。
  • 监控指标: 监控数据库和连接池指标,如查询执行时间、错误率、连接池利用率等。
  • 进行负载测试: 在部署之前,对您的应用程序进行全面的负载测试,以确保其能够在生产环境中良好运行。
  • 实施安全最佳实践: 遵循安全最佳实践,如使用参数化查询、限制数据库权限、加密敏感数据等。
  • 准备故障转移计划: 制定数据库故障转移计划,以确保应用程序在数据库出现故障时能够正常运行。
  • 持续集成和交付: 将 sqlx 集成测试纳入您的持续集成和交付流程,以确保代码质量。

sqlx 生态

有一些其他数据库的扩展和支持,比如sqlx-rxqlitesqlx-clickhouse-ext

sqlx-crud提供常见的数据库操作的 CRUD 操作的 derive 宏:

use sqlx::FromRow;
use sqlx_crud::SqlxCrud;

#[derive(Debug, FromRow, SqlxCrud)]
struct User {
    user_id: i32,
    name: String,
}

if let Some(user) = User::by_id(&pool, 42) {
    println!("Found user user_id=42: {:?}", user);
}

sqlx-error提供了对sqlx::Error的包装。

当然还有一些其他的库,不过当前关注度还不是很高。

参考资料
[1]

Go 生态圈的 sqlx 库: https://colobu.com/2024/05/10/sqlx-a-brief-introduction/

[2]

sqlx 库: https://github.com/launchbadge/sqlx

[3]

ormx: https://crates.io/crates/ormx

[4]

SeaORM: https://github.com/SeaQL/sea-orm


rust · 目录
上一篇测试 Rust 的 IO性能下一篇通过100个练习题学习Rust
继续滑动看下一个
鸟窝聊技术
向上滑动看下一个