Database Service
Database access patterns in Rust code - connecting to PostgreSQL, using repositories, and accessing the pool in different contexts
On this page
systemprompt.io uses PostgreSQL with SQLx for compile-time checked queries. This guide covers how to access the database in different contexts.
Core Types
systemprompt.io provides two main database wrapper types:
| Type | Import | Use Case |
|---|---|---|
DbPool |
systemprompt::database::DbPool |
MCP servers, background jobs |
Database |
systemprompt::database::Database |
Page data providers, extensions |
Both types wrap Arc<PgPool> and provide a .pool() method to access the underlying connection pool.
Accessing the Pool
In MCP Handlers
MCP tool handlers receive &DbPool as a parameter. Extract the Arc<PgPool> with .pool():
use systemprompt::database::DbPool;
use rmcp::ErrorData as McpError;
pub async fn handle(
db_pool: &DbPool,
request: CallToolRequestParams,
// ... other params
) -> Result<CallToolResult, McpError> {
// Extract the pool - returns Option<Arc<PgPool>>
let pg_pool = db_pool.pool().ok_or_else(|| {
McpError::internal_error("Database pool not available", None)
})?;
// Create a repository
let content_repo = ContentRepository::new(pg_pool);
// Use the repository
let content = content_repo.create(¶ms).await.map_err(|e| {
McpError::internal_error(format!("Database error: {e}"), None)
})?;
Ok(result)
}
In Background Jobs
Jobs access the pool via JobContext:
use systemprompt::database::DbPool;
use systemprompt::traits::{Job, JobContext, JobResult};
#[async_trait::async_trait]
impl Job for MyJob {
async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
// Get DbPool from context
let db_pool = ctx
.db_pool::<DbPool>()
.ok_or_else(|| anyhow::anyhow!("Database not available"))?;
// Get the underlying pool
let pool = db_pool.pool()
.ok_or_else(|| anyhow::anyhow!("Pool not initialized"))?;
// Use directly with sqlx
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM users")
.fetch_one(&*pool)
.await?;
Ok(JobResult::success())
}
}
In Page Data Providers
Page providers receive Arc<Database> from PageContext:
use std::sync::Arc;
use systemprompt::database::Database;
use systemprompt::template_provider::{PageContext, PageDataProvider};
#[async_trait]
impl PageDataProvider for MyProvider {
async fn provide_page_data(&self, ctx: &PageContext<'_>) -> Result<Value> {
// Get Database wrapper
let Some(db) = ctx.db_pool::<Arc<Database>>() else {
return Ok(json!({ "data": [] }));
};
// Extract PgPool
let Some(pool) = db.pool() else {
return Ok(json!({ "data": [] }));
};
// Use with sqlx query macros
let rows = sqlx::query_as!(
MyRow,
"SELECT id, name FROM my_table WHERE active = true"
)
.fetch_all(&*pool)
.await?;
Ok(json!({ "data": rows }))
}
}
Repository Pattern
Repositories provide typed, reusable database access. They encapsulate queries and mutations for a specific domain.
Creating a Repository
use sqlx::PgPool;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct MyRepository {
pool: Arc<PgPool>,
}
impl MyRepository {
/// Create a new repository with the given pool
#[must_use]
pub fn new(pool: Arc<PgPool>) -> Self {
Self { pool }
}
/// Create a new record
pub async fn create(&self, params: &CreateParams) -> Result<MyModel, sqlx::Error> {
sqlx::query_as!(
MyModel,
r#"
INSERT INTO my_table (id, name, created_at)
VALUES ($1, $2, NOW())
RETURNING id, name, created_at
"#,
params.id,
params.name
)
.fetch_one(&*self.pool)
.await
}
/// Get a record by ID
pub async fn get_by_id(&self, id: &str) -> Result<Option<MyModel>, sqlx::Error> {
sqlx::query_as!(
MyModel,
"SELECT id, name, created_at FROM my_table WHERE id = $1",
id
)
.fetch_optional(&*self.pool)
.await
}
/// List records with pagination
pub async fn list(&self, limit: i64, offset: i64) -> Result<Vec<MyModel>, sqlx::Error> {
sqlx::query_as!(
MyModel,
"SELECT id, name, created_at FROM my_table ORDER BY created_at DESC LIMIT $1 OFFSET $2",
limit,
offset
)
.fetch_all(&*self.pool)
.await
}
}
Query and Mutation Separation
For larger repositories, separate queries and mutations:
// repository/mod.rs
mod queries;
mod mutations;
pub use mutations::UpdateParams;
pub struct ContentRepository {
queries: ContentQueryRepository,
mutations: ContentMutationRepository,
}
impl ContentRepository {
pub fn new(pool: Arc<PgPool>) -> Self {
Self {
queries: ContentQueryRepository::new(Arc::clone(&pool)),
mutations: ContentMutationRepository::new(pool),
}
}
pub async fn get_by_id(&self, id: &ContentId) -> Result<Option<Content>, sqlx::Error> {
self.queries.get_by_id(id).await
}
pub async fn create(&self, params: &CreateParams) -> Result<Content, sqlx::Error> {
self.mutations.create(params).await
}
}
Using Extension Repositories
Extensions export their repositories. Add dependencies in Cargo.toml:
[dependencies]
systemprompt-web-extension = { path = "../../web" }
systemprompt-soul-extension = { path = "../../soul" }
Then use the repositories:
use systemprompt_web_extension::{ContentRepository, CreateContentParams, ContentKind};
use systemprompt_soul_extension::{MemoryService, CreateMemoryParams};
// Get the pool
let pg_pool = db_pool.pool().ok_or_else(|| {
McpError::internal_error("Database pool not available", None)
})?;
// Create content using web extension's repository
let content_repo = ContentRepository::new(pg_pool.clone());
let content = content_repo.create(&CreateContentParams::new(
"my-slug".to_string(),
"My Title".to_string(),
"Description".to_string(),
body_content,
"Edward".to_string(),
Utc::now(),
SourceId::new("blog".to_string()),
)).await?;
// Store memory using soul extension's service
let memory_service = MemoryService::new(pg_pool);
memory_service.store(&CreateMemoryParams::new(
MemoryType::LongTerm,
MemoryCategory::Fact,
"blog: my-slug",
"Created blog post about...",
)).await?;
SQLx Query Macros
SQLx provides compile-time checked query macros. Set DATABASE_URL for compile-time verification:
export DATABASE_URL="postgres://user:pass@localhost/dbname"
cargo build
Query Patterns
// Fetch one row (errors if not found)
let user = sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", id)
.fetch_one(&*pool)
.await?;
// Fetch optional row
let user = sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", id)
.fetch_optional(&*pool)
.await?;
// Fetch multiple rows
let users = sqlx::query_as!(User, "SELECT * FROM users WHERE active = true")
.fetch_all(&*pool)
.await?;
// Execute without returning (INSERT, UPDATE, DELETE)
sqlx::query!("DELETE FROM users WHERE id = $1", id)
.execute(&*pool)
.await?;
Error Handling
Convert sqlx errors appropriately for your context:
// In MCP handlers
content_repo.create(¶ms).await.map_err(|e| {
tracing::error!(error = %e, "Database operation failed");
McpError::internal_error(format!("Database error: {e}"), None)
})?;
// In jobs
content_repo.create(¶ms).await
.context("Failed to create content")?;
// In services with custom error types
content_repo.create(¶ms).await
.map_err(BlogError::from)?;
Best Practices
- Always handle pool unavailability - The pool may not be initialized in all contexts
- Use typed repositories - Avoid raw SQL scattered throughout the codebase
- Clone the Arc, not the pool -
Arc::clone(&pool)is cheap - Log database errors - Include context for debugging
- Use transactions for multi-step operations - Ensure atomicity