MongoDB

MongoDB #

MongoDB adalah database dokumen yang menyimpan data dalam format BSON (Binary JSON) — setiap record adalah dokumen yang bisa punya struktur berbeda-beda tanpa schema yang kaku. Di Rust, driver resmi mongodb dari MongoDB Inc. menyediakan akses async yang lengkap. Keunggulan menggunakan Rust dengan MongoDB adalah integrasi serde yang mulus: struct Rust yang #[derive(Serialize, Deserialize)] bisa langsung disimpan dan dibaca dari collection tanpa mapping manual. Artikel ini membahas dari koneksi dasar hingga aggregation pipeline, Change Streams untuk real-time, dan pola-pola yang umum digunakan di produksi.

Instalasi #

[dependencies]
mongodb = { version = "2", features = ["tokio-runtime"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
bson = { version = "2", features = ["chrono-0_4"] }
chrono = { version = "0.4", features = ["serde"] }

Konsep Dasar MongoDB #

flowchart LR
    subgraph MongoDB
        DB["Database\n(contoh)"]
        DB --> C1["Collection\npengguna"]
        DB --> C2["Collection\nartikel"]
        C1 --> D1["Document\n{_id: ObjectId, nama: ...}"]
        C1 --> D2["Document\n{_id: ObjectId, nama: ...}"]
        C2 --> D3["Document\n{_id: ObjectId, judul: ...}"]
    end
Konsep SQLKonsep MongoDB
DatabaseDatabase
TabelCollection
BarisDocument
KolomField
Primary key_id (ObjectId default)
JOIN$lookup dalam aggregation
IndexIndex
TransactionMulti-document transaction (replica set)

Koneksi dan Client #

use mongodb::{Client, options::ClientOptions};

async fn buat_client(uri: &str) -> Result<Client, mongodb::error::Error> {
    let mut options = ClientOptions::parse(uri).await?;

    // Konfigurasi pool
    options.max_pool_size = Some(20);
    options.min_pool_size = Some(2);
    options.connect_timeout = Some(std::time::Duration::from_secs(5));
    options.server_selection_timeout = Some(std::time::Duration::from_secs(5));

    // Nama aplikasi — muncul di MongoDB logs
    options.app_name = Some("aplikasi-rust".to_string());

    Client::with_options(options)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Format: mongodb://user:password@host:port/database
    // atau MongoDB Atlas: mongodb+srv://user:[email protected]/database
    let uri = std::env::var("MONGODB_URI")
        .unwrap_or_else(|_| "mongodb://localhost:27017".to_string());

    let client = buat_client(&uri).await?;

    // Verifikasi koneksi
    client
        .database("admin")
        .run_command(bson::doc! {"ping": 1}, None)
        .await?;
    println!("Terhubung ke MongoDB!");

    // Akses database dan collection
    let db = client.database("contoh");
    let koleksi = db.collection::<bson::Document>("pengguna");

    println!("Collection: {:?}", koleksi.name());
    Ok(())
}

Struct dan Serde #

Struct Rust yang #[derive(Serialize, Deserialize)] langsung bisa disimpan dan dibaca dari MongoDB:

use bson::{oid::ObjectId, DateTime as BsonDateTime};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Pengguna {
    // _id menggunakan ObjectId — field khusus MongoDB
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub id: Option<ObjectId>,
    pub nama: String,
    pub email: String,
    #[serde(skip_serializing)]  // jangan sertakan di JSON response
    pub password: String,
    pub peran: String,
    pub aktif: bool,
    pub tag: Vec<String>,
    pub dibuat_pada: DateTime<Utc>,
}

impl Pengguna {
    pub fn baru(nama: &str, email: &str, password_hash: &str) -> Self {
        Pengguna {
            id: None,  // MongoDB mengisi secara otomatis
            nama: nama.to_string(),
            email: email.to_string(),
            password: password_hash.to_string(),
            peran: "user".to_string(),
            aktif: true,
            tag: Vec::new(),
            dibuat_pada: Utc::now(),
        }
    }
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Artikel {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub id: Option<ObjectId>,
    pub pengguna_id: ObjectId,  // referensi ke pengguna
    pub judul: String,
    pub slug: String,
    pub konten: String,
    pub tag: Vec<String>,
    pub diterbitkan: bool,
    pub views: u64,
    pub dibuat_pada: DateTime<Utc>,
}

Typed Collection #

Collection<T> yang di-parameterisasi dengan tipe struct memberikan operasi yang type-safe:

use mongodb::{Client, Collection};

fn koleksi_pengguna(client: &Client) -> Collection<Pengguna> {
    client.database("contoh").collection("pengguna")
}

fn koleksi_artikel(client: &Client) -> Collection<Artikel> {
    client.database("contoh").collection("artikel")
}

Create — Menyimpan Dokumen #

use bson::oid::ObjectId;
use mongodb::Collection;

async fn buat_pengguna(
    col: &Collection<Pengguna>,
    pengguna: Pengguna,
) -> Result<ObjectId, mongodb::error::Error> {
    let hasil = col.insert_one(pengguna, None).await?;

    // inserted_id adalah bson::Bson — konversi ke ObjectId
    let id = hasil.inserted_id
        .as_object_id()
        .ok_or_else(|| mongodb::error::Error::custom("ID bukan ObjectId"))?;

    Ok(id)
}

// Insert banyak sekaligus
async fn buat_banyak_pengguna(
    col: &Collection<Pengguna>,
    pengguna_list: Vec<Pengguna>,
) -> Result<Vec<ObjectId>, mongodb::error::Error> {
    let hasil = col.insert_many(pengguna_list, None).await?;

    let ids: Vec<ObjectId> = hasil.inserted_ids
        .values()
        .filter_map(|id| id.as_object_id())
        .collect();

    Ok(ids)
}

Read — Mencari Dokumen #

use bson::{doc, oid::ObjectId};
use mongodb::{Collection, options::FindOptions};
use futures::TryStreamExt;

async fn cari_by_id(
    col: &Collection<Pengguna>,
    id: ObjectId,
) -> Result<Option<Pengguna>, mongodb::error::Error> {
    col.find_one(doc! {"_id": id}, None).await
}

async fn cari_by_email(
    col: &Collection<Pengguna>,
    email: &str,
) -> Result<Option<Pengguna>, mongodb::error::Error> {
    col.find_one(doc! {"email": email}, None).await
}

async fn cari_pengguna_aktif(
    col: &Collection<Pengguna>,
) -> Result<Vec<Pengguna>, mongodb::error::Error> {
    let opsi = FindOptions::builder()
        .sort(doc! {"dibuat_pada": -1})  // -1 = descending
        .limit(50)
        .build();

    let mut cursor = col.find(doc! {"aktif": true}, opsi).await?;
    let mut hasil = Vec::new();

    // Cursor adalah async stream
    while let Some(pengguna) = cursor.try_next().await? {
        hasil.push(pengguna);
    }

    Ok(hasil)
}

// Dengan pagination
async fn daftar_pengguna(
    col: &Collection<Pengguna>,
    halaman: u64,
    per_halaman: u64,
) -> Result<Vec<Pengguna>, mongodb::error::Error> {
    let opsi = FindOptions::builder()
        .sort(doc! {"_id": -1})
        .skip(Some((halaman.saturating_sub(1)) * per_halaman))
        .limit(Some(per_halaman as i64))
        .build();

    col.find(doc! {}, opsi)
        .await?
        .try_collect()
        .await
}

// Filter dengan banyak kondisi
async fn cari_dengan_filter(
    col: &Collection<Pengguna>,
    aktif: bool,
    peran: &str,
    tag: &str,
) -> Result<Vec<Pengguna>, mongodb::error::Error> {
    let filter = doc! {
        "aktif": aktif,
        "peran": peran,
        "tag": tag,          // field array: cek apakah tag ada dalam array
    };

    col.find(filter, None)
        .await?
        .try_collect()
        .await
}

Update — Memperbarui Dokumen #

use bson::{doc, oid::ObjectId};
use mongodb::{Collection, options::UpdateOptions};

async fn perbarui_nama(
    col: &Collection<Pengguna>,
    id: ObjectId,
    nama_baru: &str,
) -> Result<bool, mongodb::error::Error> {
    let hasil = col.update_one(
        doc! {"_id": id},
        doc! {
            "$set": {
                "nama": nama_baru,
                "diperbarui": chrono::Utc::now()
            }
        },
        None,
    )
    .await?;

    Ok(hasil.modified_count > 0)
}

// Push elemen ke array field
async fn tambah_tag(
    col: &Collection<Pengguna>,
    id: ObjectId,
    tag: &str,
) -> Result<bool, mongodb::error::Error> {
    let hasil = col.update_one(
        doc! {"_id": id},
        doc! {
            "$addToSet": {"tag": tag}  // addToSet: tambah jika belum ada
        },
        None,
    )
    .await?;

    Ok(hasil.modified_count > 0)
}

// Increment field numerik
async fn increment_views(
    col: &Collection<Artikel>,
    id: ObjectId,
) -> Result<(), mongodb::error::Error> {
    col.update_one(
        doc! {"_id": id},
        doc! {"$inc": {"views": 1}},
        None,
    )
    .await?;
    Ok(())
}

// Upsert — update jika ada, insert jika tidak
async fn upsert_konfigurasi(
    col: &Collection<bson::Document>,
    kunci: &str,
    nilai: &str,
) -> Result<(), mongodb::error::Error> {
    let opsi = UpdateOptions::builder().upsert(true).build();
    col.update_one(
        doc! {"kunci": kunci},
        doc! {
            "$set": {"nilai": nilai},
            "$setOnInsert": {"dibuat_pada": chrono::Utc::now()}
        },
        opsi,
    )
    .await?;
    Ok(())
}

Delete — Menghapus Dokumen #

use bson::{doc, oid::ObjectId};
use mongodb::Collection;

async fn hapus_pengguna(
    col: &Collection<Pengguna>,
    id: ObjectId,
) -> Result<bool, mongodb::error::Error> {
    let hasil = col.delete_one(doc! {"_id": id}, None).await?;
    Ok(hasil.deleted_count > 0)
}

// Hapus banyak dokumen
async fn hapus_pengguna_tidak_aktif(
    col: &Collection<Pengguna>,
) -> Result<u64, mongodb::error::Error> {
    let hasil = col.delete_many(doc! {"aktif": false}, None).await?;
    Ok(hasil.deleted_count)
}

Aggregation Pipeline #

Aggregation pipeline adalah cara MongoDB yang paling powerful untuk query kompleks — setara dengan JOIN, GROUP BY, dan subquery di SQL:

use bson::doc;
use futures::TryStreamExt;
use mongodb::{Collection, options::AggregateOptions};

async fn statistik_pengguna(
    col: &Collection<Pengguna>,
) -> Result<Vec<bson::Document>, mongodb::error::Error> {
    let pipeline = vec![
        // $match — filter dokumen
        doc! {"$match": {"aktif": true}},

        // $group — kelompokkan berdasarkan peran, hitung jumlah
        doc! {
            "$group": {
                "_id": "$peran",
                "jumlah": {"$sum": 1},
                "rata_tag": {"$avg": {"$size": "$tag"}}
            }
        },

        // $sort — urutkan berdasarkan jumlah
        doc! {"$sort": {"jumlah": -1}},

        // $project — pilih field yang ditampilkan
        doc! {
            "$project": {
                "peran": "$_id",
                "jumlah": 1,
                "rata_tag": {"$round": ["$rata_tag", 1]},
                "_id": 0
            }
        }
    ];

    col.aggregate(pipeline, None)
        .await?
        .try_collect()
        .await
}

// Aggregation dengan $lookup (JOIN antar collection)
async fn artikel_dengan_penulis(
    col_artikel: &Collection<Artikel>,
) -> Result<Vec<bson::Document>, mongodb::error::Error> {
    let pipeline = vec![
        doc! {"$match": {"diterbitkan": true}},
        doc! {
            "$lookup": {
                "from": "pengguna",          // nama collection target
                "localField": "pengguna_id", // field di artikel
                "foreignField": "_id",       // field di pengguna
                "as": "penulis"              // nama field hasil JOIN
            }
        },
        doc! {
            "$unwind": "$penulis"  // ubah array penulis menjadi object
        },
        doc! {
            "$project": {
                "judul": 1,
                "tag": 1,
                "views": 1,
                "dibuat_pada": 1,
                "nama_penulis": "$penulis.nama",
                "email_penulis": "$penulis.email",
            }
        },
        doc! {"$sort": {"dibuat_pada": -1}},
        doc! {"$limit": 20}
    ];

    col_artikel.aggregate(pipeline, None)
        .await?
        .try_collect()
        .await
}

// Aggregation dengan $facet — beberapa pipeline sekaligus
async fn dashboard_artikel(
    col: &Collection<Artikel>,
) -> Result<bson::Document, mongodb::error::Error> {
    let pipeline = vec![
        doc! {
            "$facet": {
                "total": [
                    {"$count": "jumlah"}
                ],
                "per_status": [
                    {"$group": {"_id": "$diterbitkan", "jumlah": {"$sum": 1}}}
                ],
                "top_viewed": [
                    {"$sort": {"views": -1}},
                    {"$limit": 5},
                    {"$project": {"judul": 1, "views": 1}}
                ]
            }
        }
    ];

    let mut cursor = col.aggregate(pipeline, None).await?;
    cursor.try_next().await?.ok_or_else(|| {
        mongodb::error::Error::custom("Aggregation tidak menghasilkan dokumen")
    })
}

Index #

use mongodb::{Collection, IndexModel, options::IndexOptions};
use bson::doc;

async fn buat_index(
    col: &Collection<Pengguna>,
) -> Result<(), mongodb::error::Error> {
    // Index unik pada email
    let opsi_email = IndexOptions::builder().unique(true).build();
    col.create_index(
        IndexModel::builder()
            .keys(doc! {"email": 1})
            .options(opsi_email)
            .build(),
        None,
    )
    .await?;

    // Compound index untuk query yang sering dilakukan
    col.create_index(
        IndexModel::builder()
            .keys(doc! {"aktif": 1, "peran": 1, "dibuat_pada": -1})
            .build(),
        None,
    )
    .await?;

    // Text index untuk full-text search
    let col_artikel: Collection<Artikel> = col.clone_with_type();
    col_artikel.create_index(
        IndexModel::builder()
            .keys(doc! {"judul": "text", "konten": "text"})
            .build(),
        None,
    )
    .await?;

    println!("Index berhasil dibuat");
    Ok(())
}

// Full-text search menggunakan $text
async fn cari_artikel_fts(
    col: &Collection<Artikel>,
    query: &str,
) -> Result<Vec<Artikel>, mongodb::error::Error> {
    col.find(
        doc! {
            "$text": {"$search": query},
            "diterbitkan": true
        },
        mongodb::options::FindOptions::builder()
            .sort(doc! {"score": {"$meta": "textScore"}})
            .projection(doc! {"score": {"$meta": "textScore"}})
            .build(),
    )
    .await?
    .try_collect()
    .await
}

Transaksi #

Transaksi MongoDB membutuhkan replica set atau sharded cluster (tidak tersedia di standalone):

use mongodb::{Client, ClientSession};

async fn transfer_dengan_transaksi(
    client: &Client,
    dari_id: bson::oid::ObjectId,
    ke_id: bson::oid::ObjectId,
    jumlah: f64,
) -> Result<(), mongodb::error::Error> {
    let mut session = client.start_session(None).await?;

    // Jalankan transaksi
    session.start_transaction(None).await?;

    let db = client.database("contoh");
    let akun = db.collection::<bson::Document>("akun");

    // Kurangi saldo pengirim
    let hasil = akun.update_one_with_session(
        doc! {"_id": dari_id, "saldo": {"$gte": jumlah}},
        doc! {"$inc": {"saldo": -jumlah}},
        None,
        &mut session,
    )
    .await?;

    if hasil.modified_count == 0 {
        session.abort_transaction().await?;
        return Err(mongodb::error::Error::custom("Saldo tidak cukup atau akun tidak ditemukan"));
    }

    // Tambah saldo penerima
    akun.update_one_with_session(
        doc! {"_id": ke_id},
        doc! {"$inc": {"saldo": jumlah}},
        None,
        &mut session,
    )
    .await?;

    session.commit_transaction().await?;
    println!("Transfer berhasil");
    Ok(())
}

Change Streams — Real-time Data #

Change Streams memungkinkan kamu mendengarkan perubahan pada collection secara real-time — seperti LISTEN/NOTIFY di PostgreSQL:

use bson::doc;
use futures::TryStreamExt;
use mongodb::{Collection, options::ChangeStreamOptions};

async fn pantau_pengguna_baru(
    col: &Collection<Pengguna>,
) -> Result<(), mongodb::error::Error> {
    // Filter hanya event insert
    let pipeline = vec![
        doc! {"$match": {"operationType": "insert"}}
    ];

    let opsi = ChangeStreamOptions::builder()
        .full_document(Some(mongodb::options::FullDocumentType::UpdateLookup))
        .build();

    let mut change_stream = col.watch(pipeline, opsi).await?;

    println!("Memantau pengguna baru...");

    while let Some(event) = change_stream.try_next().await? {
        println!("Event: {:?}", event.operation_type);
        if let Some(dokumen) = event.full_document {
            println!("Pengguna baru: {} ({})", dokumen.nama, dokumen.email);
        }
    }

    Ok(())
}

Ringkasan #

  • Collection<T> untuk operasi type-safe — parameterisasi collection dengan struct Rust yang #[derive(Serialize, Deserialize)] untuk CRUD tanpa mapping manual.
  • _id menggunakan Option<ObjectId> — gunakan skip_serializing_if = "Option::is_none" agar saat insert tidak mengirim _id: null ke MongoDB.
  • doc! {} macro — cara idiomatis membuat filter, update, dan pipeline BSON. Mendukung semua operator MongoDB ($match, $set, $inc, $addToSet, dll.).
  • Cursor adalah async stream — gunakan try_next().await? dalam loop atau .try_collect().await? untuk mengumpulkan semua dokumen.
  • Aggregation pipeline menggantikan JOIN$lookup untuk relasi antar collection, $group untuk agregasi, $facet untuk multiple pipeline sekaligus.
  • $addToSet vs $push$addToSet hanya menambahkan jika belum ada di array (set semantics), $push selalu menambahkan (bisa duplikat).
  • Index wajib untuk query yang sering — MongoDB tidak punya query planner yang sebaik PostgreSQL; tanpa index, setiap query akan full collection scan.
  • Transaksi butuh replica set — MongoDB standalone tidak mendukung multi-document transaction. Gunakan Docker dengan replica set atau MongoDB Atlas untuk testing transaksi.
  • Change Streams untuk real-time — pantau perubahan collection tanpa polling, mirip dengan LISTEN/NOTIFY PostgreSQL. Butuh replica set atau Atlas.

← Sebelumnya: PostgreSQL   Berikutnya: Elasticsearch →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact