MySQL #
Akses database di Rust paling idiomatis menggunakan sqlx — sebuah crate async yang unik karena memverifikasi query SQL langsung saat compile time melalui macro query!. Jika kamu menulis query dengan kolom yang salah atau tipe yang tidak cocok, program tidak akan bisa dikompilasi, bukan crash di runtime. Ini berbeda dari ORM tradisional yang memetakan tabel ke struct dan menyembunyikan SQL di balik abstraksi — sqlx membiarkan kamu menulis SQL asli dengan tetap mendapat keamanan tipe penuh. Artikel ini membahas koneksi, query, transaksi, dan pola repository untuk MySQL menggunakan sqlx.
Pilihan Crate untuk MySQL #
Ada tiga pilihan utama untuk MySQL di Rust:
| Crate | Pendekatan | Async | Query safety |
|---|---|---|---|
sqlx | SQL langsung + type check | Ya (tokio) | Compile-time via macro |
diesel | ORM dengan DSL Rust | Tidak (sync) | Compile-time via DSL |
mysql / mysql_async | Driver tingkat rendah | Keduanya | Runtime |
Artikel ini berfokus pada sqlx karena memberikan keseimbangan terbaik antara keamanan, fleksibilitas, dan performa untuk sebagian besar proyek.
Instalasi #
[dependencies]
sqlx = { version = "0.7", features = ["runtime-tokio-native-tls", "mysql", "macros", "chrono", "uuid"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
chrono = { version = "0.4", features = ["serde"] }
dotenvy = "0.15" # untuk membaca .env file
Install sqlx-cli untuk manajemen migrasi:
cargo install sqlx-cli --no-default-features --features native-tls,mysql
Koneksi dan Connection Pool #
Selalu gunakan connection pool — membuat koneksi baru untuk setiap query sangat mahal. sqlx menggunakan MySqlPool yang mengelola koneksi secara otomatis:
use sqlx::mysql::MySqlPoolOptions;
use sqlx::MySqlPool;
async fn buat_pool(database_url: &str) -> Result<MySqlPool, sqlx::Error> {
MySqlPoolOptions::new()
.max_connections(20) // maks koneksi bersamaan
.min_connections(2) // koneksi siaga minimum
.acquire_timeout(std::time::Duration::from_secs(5)) // batas tunggu koneksi
.idle_timeout(std::time::Duration::from_secs(300)) // tutup koneksi idle setelah 5 menit
.max_lifetime(std::time::Duration::from_secs(1800)) // maksimum umur koneksi
.connect(database_url)
.await
}
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
// Ambil URL dari environment variable
let database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "mysql://root:password@localhost/contoh".to_string());
let pool = buat_pool(&database_url).await?;
// Verifikasi koneksi
sqlx::query("SELECT 1")
.execute(&pool)
.await?;
println!("Koneksi ke MySQL berhasil!");
println!("Ukuran pool: {}/{}", pool.size(), pool.options().get_max_connections());
Ok(())
}
Membuat Tabel (Migrasi) #
sqlx-cli mengelola migrasi database dengan cara yang versi-kontrol-friendly:
# Buat database
sqlx database create
# Buat file migrasi baru
sqlx migrate add buat_tabel_pengguna
# Jalankan semua migrasi yang belum dijalankan
sqlx migrate run
# Lihat status migrasi
sqlx migrate info
File migrasi yang dihasilkan (migrations/20240824_buat_tabel_pengguna.sql):
-- migrations/20240824_buat_tabel_pengguna.sql
CREATE TABLE IF NOT EXISTS pengguna (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
nama VARCHAR(100) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE,
password VARCHAR(255) NOT NULL,
peran ENUM('user', 'admin', 'moderator') NOT NULL DEFAULT 'user',
aktif BOOLEAN NOT NULL DEFAULT TRUE,
dibuat_pada DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
diperbarui DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
CREATE TABLE IF NOT EXISTS artikel (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
pengguna_id BIGINT UNSIGNED NOT NULL,
judul VARCHAR(255) NOT NULL,
konten TEXT NOT NULL,
diterbitkan BOOLEAN NOT NULL DEFAULT FALSE,
dibuat_pada DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (pengguna_id) REFERENCES pengguna(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE INDEX idx_artikel_pengguna ON artikel(pengguna_id);
CREATE INDEX idx_artikel_diterbitkan ON artikel(diterbitkan, dibuat_pada DESC);
Jalankan migrasi dari kode:
use sqlx::MySqlPool;
async fn jalankan_migrasi(pool: &MySqlPool) -> Result<(), sqlx::Error> {
sqlx::migrate!("./migrations")
.run(pool)
.await?;
println!("Migrasi selesai");
Ok(())
}
Struct dan Mapping Baris #
sqlx memetakan baris database ke struct Rust via derive macro FromRow:
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
#[derive(Debug, FromRow, Serialize, Deserialize, Clone)]
struct Pengguna {
pub id: u64,
pub nama: String,
pub email: String,
#[serde(skip_serializing)] // jangan sertakan password di JSON response
pub password: String,
pub peran: String,
pub aktif: bool,
pub dibuat_pada: NaiveDateTime,
pub diperbarui: NaiveDateTime,
}
#[derive(Debug, FromRow, Serialize)]
struct Artikel {
pub id: u64,
pub pengguna_id: u64,
pub judul: String,
pub konten: String,
pub diterbitkan: bool,
pub dibuat_pada: NaiveDateTime,
}
// Struct untuk JOIN query — bukan semua field dari tabel
#[derive(Debug, FromRow, Serialize)]
struct ArtikelDenganPenulis {
pub id: u64,
pub judul: String,
pub nama_penulis: String, // dari tabel pengguna
pub dibuat_pada: NaiveDateTime,
}
Query dan Fetch #
Fetch Semua Baris #
use sqlx::MySqlPool;
async fn ambil_semua_pengguna(pool: &MySqlPool) -> Result<Vec<Pengguna>, sqlx::Error> {
let pengguna = sqlx::query_as!(
Pengguna,
"SELECT id, nama, email, password, peran, aktif, dibuat_pada, diperbarui
FROM pengguna
ORDER BY dibuat_pada DESC"
)
.fetch_all(pool)
.await?;
Ok(pengguna)
}
Fetch Satu Baris #
async fn cari_pengguna_by_id(pool: &MySqlPool, id: u64) -> Result<Option<Pengguna>, sqlx::Error> {
// fetch_optional — mengembalikan None jika tidak ditemukan (tidak error)
let pengguna = sqlx::query_as!(
Pengguna,
"SELECT id, nama, email, password, peran, aktif, dibuat_pada, diperbarui
FROM pengguna
WHERE id = ?",
id // parameter binding — aman dari SQL injection
)
.fetch_optional(pool)
.await?;
Ok(pengguna)
}
async fn cari_pengguna_by_email(pool: &MySqlPool, email: &str) -> Result<Option<Pengguna>, sqlx::Error> {
sqlx::query_as!(
Pengguna,
"SELECT * FROM pengguna WHERE email = ? AND aktif = TRUE",
email
)
.fetch_optional(pool)
.await
}
Query dengan Parameter Banyak dan WHERE Dinamis #
async fn cari_artikel(
pool: &MySqlPool,
pengguna_id: Option<u64>,
hanya_diterbitkan: bool,
batas: u32,
offset: u32,
) -> Result<Vec<Artikel>, sqlx::Error> {
// Untuk WHERE dinamis, bangun query secara kondisional
if let Some(uid) = pengguna_id {
sqlx::query_as!(
Artikel,
"SELECT id, pengguna_id, judul, konten, diterbitkan, dibuat_pada
FROM artikel
WHERE pengguna_id = ? AND (? = FALSE OR diterbitkan = TRUE)
ORDER BY dibuat_pada DESC
LIMIT ? OFFSET ?",
uid, hanya_diterbitkan, batas, offset
)
.fetch_all(pool)
.await
} else {
sqlx::query_as!(
Artikel,
"SELECT id, pengguna_id, judul, konten, diterbitkan, dibuat_pada
FROM artikel
WHERE (? = FALSE OR diterbitkan = TRUE)
ORDER BY dibuat_pada DESC
LIMIT ? OFFSET ?",
hanya_diterbitkan, batas, offset
)
.fetch_all(pool)
.await
}
}
JOIN Query #
async fn ambil_artikel_dengan_penulis(
pool: &MySqlPool,
) -> Result<Vec<ArtikelDenganPenulis>, sqlx::Error> {
sqlx::query_as!(
ArtikelDenganPenulis,
r#"SELECT
a.id,
a.judul,
p.nama AS nama_penulis,
a.dibuat_pada
FROM artikel a
INNER JOIN pengguna p ON a.pengguna_id = p.id
WHERE a.diterbitkan = TRUE
ORDER BY a.dibuat_pada DESC
LIMIT 20"#
)
.fetch_all(pool)
.await
}
INSERT, UPDATE, DELETE #
use sqlx::MySqlPool;
// Struct untuk input CREATE — tanpa field yang di-generate database
#[derive(Debug)]
struct BuatPengguna {
pub nama: String,
pub email: String,
pub password_hash: String,
}
async fn buat_pengguna(
pool: &MySqlPool,
input: &BuatPengguna,
) -> Result<u64, sqlx::Error> {
let hasil = sqlx::query!(
"INSERT INTO pengguna (nama, email, password) VALUES (?, ?, ?)",
input.nama,
input.email,
input.password_hash
)
.execute(pool)
.await?;
// last_insert_id() mengembalikan ID baris yang baru dibuat
Ok(hasil.last_insert_id())
}
async fn perbarui_pengguna(
pool: &MySqlPool,
id: u64,
nama: &str,
email: &str,
) -> Result<bool, sqlx::Error> {
let hasil = sqlx::query!(
"UPDATE pengguna SET nama = ?, email = ? WHERE id = ?",
nama,
email,
id
)
.execute(pool)
.await?;
// rows_affected() > 0 berarti ada baris yang terupdate
Ok(hasil.rows_affected() > 0)
}
async fn nonaktifkan_pengguna(
pool: &MySqlPool,
id: u64,
) -> Result<bool, sqlx::Error> {
let hasil = sqlx::query!(
"UPDATE pengguna SET aktif = FALSE WHERE id = ?",
id
)
.execute(pool)
.await?;
Ok(hasil.rows_affected() > 0)
}
async fn hapus_pengguna(
pool: &MySqlPool,
id: u64,
) -> Result<bool, sqlx::Error> {
let hasil = sqlx::query!(
"DELETE FROM pengguna WHERE id = ?",
id
)
.execute(pool)
.await?;
Ok(hasil.rows_affected() > 0)
}
Transaksi #
Transaksi memastikan sekelompok operasi dieksekusi secara atomik — semua berhasil atau semua dibatalkan:
use sqlx::{MySqlPool, Transaction, MySql};
async fn transfer_saldo(
pool: &MySqlPool,
dari_id: u64,
ke_id: u64,
jumlah: f64,
) -> Result<(), sqlx::Error> {
// Mulai transaksi
let mut tx = pool.begin().await?;
// Cek saldo pengirim
let saldo: Option<f64> = sqlx::query_scalar!(
"SELECT saldo FROM akun WHERE id = ? FOR UPDATE", // lock baris
dari_id
)
.fetch_optional(&mut *tx)
.await?;
let saldo_pengirim = saldo.ok_or_else(|| {
sqlx::Error::RowNotFound
})?;
if saldo_pengirim < jumlah {
// Rollback otomatis saat tx di-drop tanpa commit
return Err(sqlx::Error::Protocol("Saldo tidak cukup".into()));
}
// Kurangi saldo pengirim
sqlx::query!(
"UPDATE akun SET saldo = saldo - ? WHERE id = ?",
jumlah, dari_id
)
.execute(&mut *tx)
.await?;
// Tambah saldo penerima
sqlx::query!(
"UPDATE akun SET saldo = saldo + ? WHERE id = ?",
jumlah, ke_id
)
.execute(&mut *tx)
.await?;
// Catat riwayat transfer
sqlx::query!(
"INSERT INTO riwayat_transfer (dari_id, ke_id, jumlah) VALUES (?, ?, ?)",
dari_id, ke_id, jumlah
)
.execute(&mut *tx)
.await?;
// Commit — jika ini gagal, semua operasi di atas dibatalkan
tx.commit().await?;
println!("Transfer Rp{:.0} dari {} ke {} berhasil", jumlah, dari_id, ke_id);
Ok(())
}
Pola Repository #
Repository pattern memisahkan logika akses data dari logika bisnis — membuat kode lebih testable dan mudah diubah implementasi database-nya:
use async_trait::async_trait;
use sqlx::MySqlPool;
// Trait repository — mendefinisikan kontrak
#[async_trait]
pub trait PenggunaRepository: Send + Sync {
async fn cari_by_id(&self, id: u64) -> Result<Option<Pengguna>, sqlx::Error>;
async fn cari_by_email(&self, email: &str) -> Result<Option<Pengguna>, sqlx::Error>;
async fn buat(&self, input: &BuatPengguna) -> Result<u64, sqlx::Error>;
async fn perbarui(&self, id: u64, nama: &str) -> Result<bool, sqlx::Error>;
async fn hapus(&self, id: u64) -> Result<bool, sqlx::Error>;
async fn daftar(&self, batas: u32, offset: u32) -> Result<Vec<Pengguna>, sqlx::Error>;
}
// Implementasi MySQL
pub struct MySqlPenggunaRepository {
pool: MySqlPool,
}
impl MySqlPenggunaRepository {
pub fn baru(pool: MySqlPool) -> Self {
MySqlPenggunaRepository { pool }
}
}
#[async_trait]
impl PenggunaRepository for MySqlPenggunaRepository {
async fn cari_by_id(&self, id: u64) -> Result<Option<Pengguna>, sqlx::Error> {
sqlx::query_as!(
Pengguna,
"SELECT id, nama, email, password, peran, aktif, dibuat_pada, diperbarui
FROM pengguna WHERE id = ?",
id
)
.fetch_optional(&self.pool)
.await
}
async fn cari_by_email(&self, email: &str) -> Result<Option<Pengguna>, sqlx::Error> {
sqlx::query_as!(
Pengguna,
"SELECT id, nama, email, password, peran, aktif, dibuat_pada, diperbarui
FROM pengguna WHERE email = ?",
email
)
.fetch_optional(&self.pool)
.await
}
async fn buat(&self, input: &BuatPengguna) -> Result<u64, sqlx::Error> {
let hasil = sqlx::query!(
"INSERT INTO pengguna (nama, email, password) VALUES (?, ?, ?)",
input.nama, input.email, input.password_hash
)
.execute(&self.pool)
.await?;
Ok(hasil.last_insert_id())
}
async fn perbarui(&self, id: u64, nama: &str) -> Result<bool, sqlx::Error> {
let hasil = sqlx::query!(
"UPDATE pengguna SET nama = ? WHERE id = ?",
nama, id
)
.execute(&self.pool)
.await?;
Ok(hasil.rows_affected() > 0)
}
async fn hapus(&self, id: u64) -> Result<bool, sqlx::Error> {
let hasil = sqlx::query!("DELETE FROM pengguna WHERE id = ?", id)
.execute(&self.pool)
.await?;
Ok(hasil.rows_affected() > 0)
}
async fn daftar(&self, batas: u32, offset: u32) -> Result<Vec<Pengguna>, sqlx::Error> {
sqlx::query_as!(
Pengguna,
"SELECT id, nama, email, password, peran, aktif, dibuat_pada, diperbarui
FROM pengguna ORDER BY id DESC LIMIT ? OFFSET ?",
batas, offset
)
.fetch_all(&self.pool)
.await
}
}
// Service yang menggunakan repository — bisa di-test dengan mock
pub struct LayananPengguna<R: PenggunaRepository> {
repo: R,
}
impl<R: PenggunaRepository> LayananPengguna<R> {
pub fn baru(repo: R) -> Self {
LayananPengguna { repo }
}
pub async fn profil(&self, id: u64) -> Result<Pengguna, String> {
self.repo
.cari_by_id(id)
.await
.map_err(|e| e.to_string())?
.ok_or_else(|| format!("Pengguna ID {} tidak ditemukan", id))
}
pub async fn daftar_pengguna(
&self,
halaman: u32,
per_halaman: u32,
) -> Result<Vec<Pengguna>, String> {
let offset = (halaman.saturating_sub(1)) * per_halaman;
self.repo
.daftar(per_halaman, offset)
.await
.map_err(|e| e.to_string())
}
}
Integrasi dengan Axum #
Pola lengkap: pool sebagai shared state, repository dalam handler:
use axum::{extract::{Path, State}, http::StatusCode, response::Json, routing::get, Router};
use std::sync::Arc;
// State aplikasi
#[derive(Clone)]
struct AppState {
repo: Arc<dyn PenggunaRepository>,
}
// Handler GET /pengguna/:id
async fn handler_profil(
Path(id): Path<u64>,
State(state): State<AppState>,
) -> Result<Json<Pengguna>, (StatusCode, String)> {
state.repo
.cari_by_id(id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.map(Json)
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("Pengguna {} tidak ditemukan", id)))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let database_url = std::env::var("DATABASE_URL")?;
let pool = MySqlPoolOptions::new()
.max_connections(20)
.connect(&database_url)
.await?;
// Jalankan migrasi saat startup
sqlx::migrate!("./migrations").run(&pool).await?;
let state = AppState {
repo: Arc::new(MySqlPenggunaRepository::baru(pool)),
};
let app = Router::new()
.route("/pengguna/:id", get(handler_profil))
.with_state(state);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
println!("Server di http://localhost:3000");
axum::serve(listener, app).await?;
Ok(())
}
Ringkasan #
- Selalu gunakan connection pool —
MySqlPoolOptionsdenganmax_connections,min_connections, danacquire_timeout. Satu pool per aplikasi, bagikan viaArcatauState.query!danquery_as!memverifikasi SQL saat compile — kesalahan nama kolom atau tipe data terdeteksi sebelum program dijalankan, bukan saat runtime.- Parameter binding dengan
?— selalu gunakan parameter binding, jangan pernah interpolasi string langsung ke SQL. Ini mencegah SQL injection secara struktural.fetch_alluntuk banyak baris,fetch_onejika pasti ada,fetch_optionaljika mungkin tidak ada — pilih yang tepat agar tidak dapat error yang mengejutkan.rows_affected()untuk verifikasi UPDATE/DELETE — operasi yang mengembalikan 0 berarti tidak ada baris yang terpengaruh, mungkin ID tidak ada.last_insert_id()untuk mendapat ID setelah INSERT — kembalikan ke pemanggil agar bisa langsung menggunakan data yang baru dibuat.- Transaksi dengan
pool.begin()dantx.commit()— gunakan untuk operasi yang harus atomik. Transaksi otomatis di-rollback saattxdi-drop tanpacommit().- Repository pattern untuk testability — definisikan trait repository, buat implementasi MySQL, dan gunakan trait sebagai parameter di service. Saat testing, bisa inject mock.
- Jalankan migrasi saat startup —
sqlx::migrate!("./migrations").run(&pool).await?memastikan schema selalu sinkron tanpa intervensi manual.