MongoDB #
MongoDB adalah database dokumen yang menyimpan data dalam format BSON (Binary JSON) — setiap record adalah dokumen yang bisa punya struktur berbeda-beda tanpa schema yang kaku. Di Rust, driver resmi mongodb dari MongoDB Inc. menyediakan akses async yang lengkap. Keunggulan menggunakan Rust dengan MongoDB adalah integrasi serde yang mulus: struct Rust yang #[derive(Serialize, Deserialize)] bisa langsung disimpan dan dibaca dari collection tanpa mapping manual. Artikel ini membahas dari koneksi dasar hingga aggregation pipeline, Change Streams untuk real-time, dan pola-pola yang umum digunakan di produksi.
Instalasi #
[dependencies]
mongodb = { version = "2", features = ["tokio-runtime"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
bson = { version = "2", features = ["chrono-0_4"] }
chrono = { version = "0.4", features = ["serde"] }
Konsep Dasar MongoDB #
flowchart LR
subgraph MongoDB
DB["Database\n(contoh)"]
DB --> C1["Collection\npengguna"]
DB --> C2["Collection\nartikel"]
C1 --> D1["Document\n{_id: ObjectId, nama: ...}"]
C1 --> D2["Document\n{_id: ObjectId, nama: ...}"]
C2 --> D3["Document\n{_id: ObjectId, judul: ...}"]
end| Konsep SQL | Konsep MongoDB |
|---|---|
| Database | Database |
| Tabel | Collection |
| Baris | Document |
| Kolom | Field |
| Primary key | _id (ObjectId default) |
| JOIN | $lookup dalam aggregation |
| Index | Index |
| Transaction | Multi-document transaction (replica set) |
Koneksi dan Client #
use mongodb::{Client, options::ClientOptions};
async fn buat_client(uri: &str) -> Result<Client, mongodb::error::Error> {
let mut options = ClientOptions::parse(uri).await?;
// Konfigurasi pool
options.max_pool_size = Some(20);
options.min_pool_size = Some(2);
options.connect_timeout = Some(std::time::Duration::from_secs(5));
options.server_selection_timeout = Some(std::time::Duration::from_secs(5));
// Nama aplikasi — muncul di MongoDB logs
options.app_name = Some("aplikasi-rust".to_string());
Client::with_options(options)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Format: mongodb://user:password@host:port/database
// atau MongoDB Atlas: mongodb+srv://user:[email protected]/database
let uri = std::env::var("MONGODB_URI")
.unwrap_or_else(|_| "mongodb://localhost:27017".to_string());
let client = buat_client(&uri).await?;
// Verifikasi koneksi
client
.database("admin")
.run_command(bson::doc! {"ping": 1}, None)
.await?;
println!("Terhubung ke MongoDB!");
// Akses database dan collection
let db = client.database("contoh");
let koleksi = db.collection::<bson::Document>("pengguna");
println!("Collection: {:?}", koleksi.name());
Ok(())
}
Struct dan Serde #
Struct Rust yang #[derive(Serialize, Deserialize)] langsung bisa disimpan dan dibaca dari MongoDB:
use bson::{oid::ObjectId, DateTime as BsonDateTime};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Pengguna {
// _id menggunakan ObjectId — field khusus MongoDB
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub id: Option<ObjectId>,
pub nama: String,
pub email: String,
#[serde(skip_serializing)] // jangan sertakan di JSON response
pub password: String,
pub peran: String,
pub aktif: bool,
pub tag: Vec<String>,
pub dibuat_pada: DateTime<Utc>,
}
impl Pengguna {
pub fn baru(nama: &str, email: &str, password_hash: &str) -> Self {
Pengguna {
id: None, // MongoDB mengisi secara otomatis
nama: nama.to_string(),
email: email.to_string(),
password: password_hash.to_string(),
peran: "user".to_string(),
aktif: true,
tag: Vec::new(),
dibuat_pada: Utc::now(),
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Artikel {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub id: Option<ObjectId>,
pub pengguna_id: ObjectId, // referensi ke pengguna
pub judul: String,
pub slug: String,
pub konten: String,
pub tag: Vec<String>,
pub diterbitkan: bool,
pub views: u64,
pub dibuat_pada: DateTime<Utc>,
}
Typed Collection #
Collection<T> yang di-parameterisasi dengan tipe struct memberikan operasi yang type-safe:
use mongodb::{Client, Collection};
fn koleksi_pengguna(client: &Client) -> Collection<Pengguna> {
client.database("contoh").collection("pengguna")
}
fn koleksi_artikel(client: &Client) -> Collection<Artikel> {
client.database("contoh").collection("artikel")
}
Create — Menyimpan Dokumen #
use bson::oid::ObjectId;
use mongodb::Collection;
async fn buat_pengguna(
col: &Collection<Pengguna>,
pengguna: Pengguna,
) -> Result<ObjectId, mongodb::error::Error> {
let hasil = col.insert_one(pengguna, None).await?;
// inserted_id adalah bson::Bson — konversi ke ObjectId
let id = hasil.inserted_id
.as_object_id()
.ok_or_else(|| mongodb::error::Error::custom("ID bukan ObjectId"))?;
Ok(id)
}
// Insert banyak sekaligus
async fn buat_banyak_pengguna(
col: &Collection<Pengguna>,
pengguna_list: Vec<Pengguna>,
) -> Result<Vec<ObjectId>, mongodb::error::Error> {
let hasil = col.insert_many(pengguna_list, None).await?;
let ids: Vec<ObjectId> = hasil.inserted_ids
.values()
.filter_map(|id| id.as_object_id())
.collect();
Ok(ids)
}
Read — Mencari Dokumen #
use bson::{doc, oid::ObjectId};
use mongodb::{Collection, options::FindOptions};
use futures::TryStreamExt;
async fn cari_by_id(
col: &Collection<Pengguna>,
id: ObjectId,
) -> Result<Option<Pengguna>, mongodb::error::Error> {
col.find_one(doc! {"_id": id}, None).await
}
async fn cari_by_email(
col: &Collection<Pengguna>,
email: &str,
) -> Result<Option<Pengguna>, mongodb::error::Error> {
col.find_one(doc! {"email": email}, None).await
}
async fn cari_pengguna_aktif(
col: &Collection<Pengguna>,
) -> Result<Vec<Pengguna>, mongodb::error::Error> {
let opsi = FindOptions::builder()
.sort(doc! {"dibuat_pada": -1}) // -1 = descending
.limit(50)
.build();
let mut cursor = col.find(doc! {"aktif": true}, opsi).await?;
let mut hasil = Vec::new();
// Cursor adalah async stream
while let Some(pengguna) = cursor.try_next().await? {
hasil.push(pengguna);
}
Ok(hasil)
}
// Dengan pagination
async fn daftar_pengguna(
col: &Collection<Pengguna>,
halaman: u64,
per_halaman: u64,
) -> Result<Vec<Pengguna>, mongodb::error::Error> {
let opsi = FindOptions::builder()
.sort(doc! {"_id": -1})
.skip(Some((halaman.saturating_sub(1)) * per_halaman))
.limit(Some(per_halaman as i64))
.build();
col.find(doc! {}, opsi)
.await?
.try_collect()
.await
}
// Filter dengan banyak kondisi
async fn cari_dengan_filter(
col: &Collection<Pengguna>,
aktif: bool,
peran: &str,
tag: &str,
) -> Result<Vec<Pengguna>, mongodb::error::Error> {
let filter = doc! {
"aktif": aktif,
"peran": peran,
"tag": tag, // field array: cek apakah tag ada dalam array
};
col.find(filter, None)
.await?
.try_collect()
.await
}
Update — Memperbarui Dokumen #
use bson::{doc, oid::ObjectId};
use mongodb::{Collection, options::UpdateOptions};
async fn perbarui_nama(
col: &Collection<Pengguna>,
id: ObjectId,
nama_baru: &str,
) -> Result<bool, mongodb::error::Error> {
let hasil = col.update_one(
doc! {"_id": id},
doc! {
"$set": {
"nama": nama_baru,
"diperbarui": chrono::Utc::now()
}
},
None,
)
.await?;
Ok(hasil.modified_count > 0)
}
// Push elemen ke array field
async fn tambah_tag(
col: &Collection<Pengguna>,
id: ObjectId,
tag: &str,
) -> Result<bool, mongodb::error::Error> {
let hasil = col.update_one(
doc! {"_id": id},
doc! {
"$addToSet": {"tag": tag} // addToSet: tambah jika belum ada
},
None,
)
.await?;
Ok(hasil.modified_count > 0)
}
// Increment field numerik
async fn increment_views(
col: &Collection<Artikel>,
id: ObjectId,
) -> Result<(), mongodb::error::Error> {
col.update_one(
doc! {"_id": id},
doc! {"$inc": {"views": 1}},
None,
)
.await?;
Ok(())
}
// Upsert — update jika ada, insert jika tidak
async fn upsert_konfigurasi(
col: &Collection<bson::Document>,
kunci: &str,
nilai: &str,
) -> Result<(), mongodb::error::Error> {
let opsi = UpdateOptions::builder().upsert(true).build();
col.update_one(
doc! {"kunci": kunci},
doc! {
"$set": {"nilai": nilai},
"$setOnInsert": {"dibuat_pada": chrono::Utc::now()}
},
opsi,
)
.await?;
Ok(())
}
Delete — Menghapus Dokumen #
use bson::{doc, oid::ObjectId};
use mongodb::Collection;
async fn hapus_pengguna(
col: &Collection<Pengguna>,
id: ObjectId,
) -> Result<bool, mongodb::error::Error> {
let hasil = col.delete_one(doc! {"_id": id}, None).await?;
Ok(hasil.deleted_count > 0)
}
// Hapus banyak dokumen
async fn hapus_pengguna_tidak_aktif(
col: &Collection<Pengguna>,
) -> Result<u64, mongodb::error::Error> {
let hasil = col.delete_many(doc! {"aktif": false}, None).await?;
Ok(hasil.deleted_count)
}
Aggregation Pipeline #
Aggregation pipeline adalah cara MongoDB yang paling powerful untuk query kompleks — setara dengan JOIN, GROUP BY, dan subquery di SQL:
use bson::doc;
use futures::TryStreamExt;
use mongodb::{Collection, options::AggregateOptions};
async fn statistik_pengguna(
col: &Collection<Pengguna>,
) -> Result<Vec<bson::Document>, mongodb::error::Error> {
let pipeline = vec![
// $match — filter dokumen
doc! {"$match": {"aktif": true}},
// $group — kelompokkan berdasarkan peran, hitung jumlah
doc! {
"$group": {
"_id": "$peran",
"jumlah": {"$sum": 1},
"rata_tag": {"$avg": {"$size": "$tag"}}
}
},
// $sort — urutkan berdasarkan jumlah
doc! {"$sort": {"jumlah": -1}},
// $project — pilih field yang ditampilkan
doc! {
"$project": {
"peran": "$_id",
"jumlah": 1,
"rata_tag": {"$round": ["$rata_tag", 1]},
"_id": 0
}
}
];
col.aggregate(pipeline, None)
.await?
.try_collect()
.await
}
// Aggregation dengan $lookup (JOIN antar collection)
async fn artikel_dengan_penulis(
col_artikel: &Collection<Artikel>,
) -> Result<Vec<bson::Document>, mongodb::error::Error> {
let pipeline = vec![
doc! {"$match": {"diterbitkan": true}},
doc! {
"$lookup": {
"from": "pengguna", // nama collection target
"localField": "pengguna_id", // field di artikel
"foreignField": "_id", // field di pengguna
"as": "penulis" // nama field hasil JOIN
}
},
doc! {
"$unwind": "$penulis" // ubah array penulis menjadi object
},
doc! {
"$project": {
"judul": 1,
"tag": 1,
"views": 1,
"dibuat_pada": 1,
"nama_penulis": "$penulis.nama",
"email_penulis": "$penulis.email",
}
},
doc! {"$sort": {"dibuat_pada": -1}},
doc! {"$limit": 20}
];
col_artikel.aggregate(pipeline, None)
.await?
.try_collect()
.await
}
// Aggregation dengan $facet — beberapa pipeline sekaligus
async fn dashboard_artikel(
col: &Collection<Artikel>,
) -> Result<bson::Document, mongodb::error::Error> {
let pipeline = vec![
doc! {
"$facet": {
"total": [
{"$count": "jumlah"}
],
"per_status": [
{"$group": {"_id": "$diterbitkan", "jumlah": {"$sum": 1}}}
],
"top_viewed": [
{"$sort": {"views": -1}},
{"$limit": 5},
{"$project": {"judul": 1, "views": 1}}
]
}
}
];
let mut cursor = col.aggregate(pipeline, None).await?;
cursor.try_next().await?.ok_or_else(|| {
mongodb::error::Error::custom("Aggregation tidak menghasilkan dokumen")
})
}
Index #
use mongodb::{Collection, IndexModel, options::IndexOptions};
use bson::doc;
async fn buat_index(
col: &Collection<Pengguna>,
) -> Result<(), mongodb::error::Error> {
// Index unik pada email
let opsi_email = IndexOptions::builder().unique(true).build();
col.create_index(
IndexModel::builder()
.keys(doc! {"email": 1})
.options(opsi_email)
.build(),
None,
)
.await?;
// Compound index untuk query yang sering dilakukan
col.create_index(
IndexModel::builder()
.keys(doc! {"aktif": 1, "peran": 1, "dibuat_pada": -1})
.build(),
None,
)
.await?;
// Text index untuk full-text search
let col_artikel: Collection<Artikel> = col.clone_with_type();
col_artikel.create_index(
IndexModel::builder()
.keys(doc! {"judul": "text", "konten": "text"})
.build(),
None,
)
.await?;
println!("Index berhasil dibuat");
Ok(())
}
// Full-text search menggunakan $text
async fn cari_artikel_fts(
col: &Collection<Artikel>,
query: &str,
) -> Result<Vec<Artikel>, mongodb::error::Error> {
col.find(
doc! {
"$text": {"$search": query},
"diterbitkan": true
},
mongodb::options::FindOptions::builder()
.sort(doc! {"score": {"$meta": "textScore"}})
.projection(doc! {"score": {"$meta": "textScore"}})
.build(),
)
.await?
.try_collect()
.await
}
Transaksi #
Transaksi MongoDB membutuhkan replica set atau sharded cluster (tidak tersedia di standalone):
use mongodb::{Client, ClientSession};
async fn transfer_dengan_transaksi(
client: &Client,
dari_id: bson::oid::ObjectId,
ke_id: bson::oid::ObjectId,
jumlah: f64,
) -> Result<(), mongodb::error::Error> {
let mut session = client.start_session(None).await?;
// Jalankan transaksi
session.start_transaction(None).await?;
let db = client.database("contoh");
let akun = db.collection::<bson::Document>("akun");
// Kurangi saldo pengirim
let hasil = akun.update_one_with_session(
doc! {"_id": dari_id, "saldo": {"$gte": jumlah}},
doc! {"$inc": {"saldo": -jumlah}},
None,
&mut session,
)
.await?;
if hasil.modified_count == 0 {
session.abort_transaction().await?;
return Err(mongodb::error::Error::custom("Saldo tidak cukup atau akun tidak ditemukan"));
}
// Tambah saldo penerima
akun.update_one_with_session(
doc! {"_id": ke_id},
doc! {"$inc": {"saldo": jumlah}},
None,
&mut session,
)
.await?;
session.commit_transaction().await?;
println!("Transfer berhasil");
Ok(())
}
Change Streams — Real-time Data #
Change Streams memungkinkan kamu mendengarkan perubahan pada collection secara real-time — seperti LISTEN/NOTIFY di PostgreSQL:
use bson::doc;
use futures::TryStreamExt;
use mongodb::{Collection, options::ChangeStreamOptions};
async fn pantau_pengguna_baru(
col: &Collection<Pengguna>,
) -> Result<(), mongodb::error::Error> {
// Filter hanya event insert
let pipeline = vec![
doc! {"$match": {"operationType": "insert"}}
];
let opsi = ChangeStreamOptions::builder()
.full_document(Some(mongodb::options::FullDocumentType::UpdateLookup))
.build();
let mut change_stream = col.watch(pipeline, opsi).await?;
println!("Memantau pengguna baru...");
while let Some(event) = change_stream.try_next().await? {
println!("Event: {:?}", event.operation_type);
if let Some(dokumen) = event.full_document {
println!("Pengguna baru: {} ({})", dokumen.nama, dokumen.email);
}
}
Ok(())
}
Ringkasan #
Collection<T>untuk operasi type-safe — parameterisasi collection dengan struct Rust yang#[derive(Serialize, Deserialize)]untuk CRUD tanpa mapping manual._idmenggunakanOption<ObjectId>— gunakanskip_serializing_if = "Option::is_none"agar saat insert tidak mengirim_id: nullke MongoDB.doc! {}macro — cara idiomatis membuat filter, update, dan pipeline BSON. Mendukung semua operator MongoDB ($match,$set,$inc,$addToSet, dll.).- Cursor adalah async stream — gunakan
try_next().await?dalam loop atau.try_collect().await?untuk mengumpulkan semua dokumen.- Aggregation pipeline menggantikan JOIN —
$lookupuntuk relasi antar collection,$groupuntuk agregasi,$facetuntuk multiple pipeline sekaligus.$addToSetvs$push—$addToSethanya menambahkan jika belum ada di array (set semantics),$pushselalu menambahkan (bisa duplikat).- Index wajib untuk query yang sering — MongoDB tidak punya query planner yang sebaik PostgreSQL; tanpa index, setiap query akan full collection scan.
- Transaksi butuh replica set — MongoDB standalone tidak mendukung multi-document transaction. Gunakan Docker dengan replica set atau MongoDB Atlas untuk testing transaksi.
- Change Streams untuk real-time — pantau perubahan collection tanpa polling, mirip dengan
LISTEN/NOTIFYPostgreSQL. Butuh replica set atau Atlas.