Skip to content

libpqxx — PostgreSQL 客户端

libpqxx 是 PostgreSQL 的官方 C++ 客户端,提供类型安全的 API、事务管理、连接池,是 C++ 连接 PostgreSQL 的标准选择。

安装

bash
# Ubuntu
sudo apt install libpqxx-dev

# vcpkg
vcpkg install libpqxx

# CMake
find_package(libpqxx REQUIRED)
target_link_libraries(myapp PRIVATE libpqxx::pqxx)

连接与基本查询

cpp
#include <pqxx/pqxx>

int main() {
    // 连接字符串
    pqxx::connection conn(
        "host=localhost port=5432 dbname=mydb user=postgres password=secret");

    // 或者用 URI
    pqxx::connection conn2("postgresql://postgres:secret@localhost:5432/mydb");

    std::cout << "连接到: " << conn.dbname() << "\n";
    std::cout << "服务器版本: " << conn.server_version() << "\n";

    // 只读事务(SELECT)
    pqxx::work txn(conn);

    auto result = txn.exec("SELECT id, name, age FROM users ORDER BY id");

    for (const auto& row : result) {
        int id = row["id"].as<int>();
        std::string name = row["name"].as<std::string>();
        int age = row["age"].as<int>();
        std::cout << id << " " << name << " " << age << "\n";
    }

    txn.commit();
}

参数化查询(防 SQL 注入)

cpp
pqxx::work txn(conn);

// 参数化查询:$1, $2 是占位符
auto result = txn.exec_params(
    "SELECT * FROM users WHERE age > $1 AND name LIKE $2",
    18,          // $1
    "%Alice%"    // $2
);

// 单行查询
auto row = txn.exec_params1(
    "SELECT id, name FROM users WHERE id = $1", 1);
int id = row["id"].as<int>();
std::string name = row["name"].as<std::string>();

// 插入并返回 ID
auto inserted = txn.exec_params1(
    "INSERT INTO users (name, email, age) VALUES ($1, $2, $3) RETURNING id",
    "Alice", "alice@example.com", 30);
int new_id = inserted["id"].as<int>();

txn.commit();

事务管理

cpp
// 普通事务(读写)
{
    pqxx::work txn(conn);
    try {
        txn.exec_params("INSERT INTO accounts (user_id, balance) VALUES ($1, $2)",
                        1, 1000.0);
        txn.exec_params("UPDATE accounts SET balance = balance - $1 WHERE user_id = $2",
                        100.0, 1);
        txn.exec_params("UPDATE accounts SET balance = balance + $1 WHERE user_id = $2",
                        100.0, 2);
        txn.commit();  // 提交
    } catch (const std::exception& e) {
        // 自动回滚(txn 析构时)
        std::cerr << "事务失败: " << e.what() << "\n";
        throw;
    }
}

// 只读事务(性能更好)
{
    pqxx::read_transaction txn(conn);
    auto result = txn.exec("SELECT * FROM users");
    // 不需要 commit
}

// 不可序列化事务(最高隔离级别)
{
    pqxx::work txn(conn);
    txn.exec("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
    // ...
    txn.commit();
}

批量操作

cpp
// 批量插入(使用 COPY 命令,最快)
pqxx::work txn(conn);

// 方式 1:多值 INSERT
std::string sql = "INSERT INTO users (name, age) VALUES ";
std::vector<std::string> values;
for (int i = 0; i < 1000; ++i) {
    values.push_back("('" + txn.esc("User" + std::to_string(i)) + "', " +
                     std::to_string(20 + i % 50) + ")");
}
sql += pqxx::separated_list(",", values.begin(), values.end());
txn.exec(sql);

// 方式 2:COPY(最快,适合大批量)
pqxx::stream_to stream(txn, "users", std::vector<std::string>{"name", "age"});
for (int i = 0; i < 100000; ++i) {
    stream << std::make_tuple("User" + std::to_string(i), 20 + i % 50);
}
stream.complete();

txn.commit();

连接池

cpp
// libpqxx 本身不提供连接池,需要自己实现或使用第三方
// 简单连接池实现
class PgPool {
    std::vector<std::unique_ptr<pqxx::connection>> pool_;
    std::queue<pqxx::connection*> available_;
    std::mutex mtx_;
    std::condition_variable cv_;
    std::string conn_str_;

public:
    PgPool(const std::string& conn_str, size_t size) : conn_str_(conn_str) {
        for (size_t i = 0; i < size; ++i) {
            pool_.push_back(std::make_unique<pqxx::connection>(conn_str));
            available_.push(pool_.back().get());
        }
    }

    // RAII 连接借用
    class Guard {
        PgPool& pool_;
        pqxx::connection* conn_;
    public:
        Guard(PgPool& p, pqxx::connection* c) : pool_(p), conn_(c) {}
        ~Guard() { pool_.release(conn_); }
        pqxx::connection* get() { return conn_; }
        pqxx::connection* operator->() { return conn_; }
    };

    Guard acquire() {
        std::unique_lock lock(mtx_);
        cv_.wait(lock, [this] { return !available_.empty(); });
        auto* conn = available_.front();
        available_.pop();
        return Guard(*this, conn);
    }

    void release(pqxx::connection* conn) {
        std::lock_guard lock(mtx_);
        available_.push(conn);
        cv_.notify_one();
    }
};

// 使用
PgPool pool("postgresql://localhost/mydb", 10);
{
    auto conn = pool.acquire();
    pqxx::work txn(*conn.get());
    auto result = txn.exec("SELECT count(*) FROM users");
    txn.commit();
}

处理 NULL 值

cpp
pqxx::work txn(conn);
auto result = txn.exec("SELECT id, name, phone FROM users");

for (const auto& row : result) {
    int id = row["id"].as<int>();
    std::string name = row["name"].as<std::string>();

    // 检查 NULL
    if (row["phone"].is_null()) {
        std::cout << id << " " << name << " (无电话)\n";
    } else {
        std::string phone = row["phone"].as<std::string>();
        std::cout << id << " " << name << " " << phone << "\n";
    }

    // 使用 optional
    auto phone = row["phone"].as<std::optional<std::string>>();
    if (phone) std::cout << *phone << "\n";
}

关键认知

libpqxx 的事务对象析构时自动回滚(未 commit),这是 RAII 的完美应用。参数化查询exec_params)是防 SQL 注入的唯一正确方式,永远不要拼接 SQL 字符串。大批量插入用 pqxx::stream_to(COPY 协议),比普通 INSERT 快 10-100x。

系统学习 C++ 生态,深入底层架构