Elasticsearch

Elasticsearch #

Elasticsearch adalah search engine terdistribusi berbasis Apache Lucene yang dioptimalkan untuk pencarian teks penuh, analitik log, dan observability. Berbeda dari database biasa yang menyimpan data untuk diambil kembali, Elasticsearch mengindeks setiap kata dalam dokumen untuk pencarian super cepat bahkan di dataset berukuran miliaran dokumen. Di Rust, crate elasticsearch dari Elastic menyediakan client async yang lengkap. Karena Elasticsearch berkomunikasi via REST API dengan JSON, semua request dan response menggunakan serde_json::Value — ini membuat API-nya sangat fleksibel meski sedikit kurang type-safe dibanding sqlx. Artikel ini membahas dari setup dasar hingga query lanjutan dan pola sinkronisasi dengan database utama.

Instalasi #

[dependencies]
elasticsearch = "8"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
chrono = { version = "0.4", features = ["serde"] }

Konsep Dasar Elasticsearch #

flowchart LR
    subgraph Elasticsearch
        IDX["Index\n(seperti tabel)"]
        IDX --> D1["Document\n{_id, _source: {...}}"]
        IDX --> D2["Document"]
        IDX --> S["Shard 1..N\n(partisi data)"]
    end

    subgraph Cluster
        N1["Node 1\n(master + data)"]
        N2["Node 2\n(data)"]
        N3["Node 3\n(data)"]
    end
Konsep RDBMSKonsep Elasticsearch
DatabaseCluster
TabelIndex
BarisDocument
KolomField
SchemaMapping
SQL QueryQuery DSL (JSON)
Full-text searchInverted index (built-in)

Koneksi dan Client #

use elasticsearch::{
    auth::Credentials,
    cert::CertificateValidation,
    http::transport::{SingleNodeConnectionPool, TransportBuilder},
    Elasticsearch,
};
use url::Url;

fn buat_client(url: &str) -> Result<Elasticsearch, Box<dyn std::error::Error>> {
    let url = Url::parse(url)?;
    let pool = SingleNodeConnectionPool::new(url);
    let transport = TransportBuilder::new(pool)
        .disable_proxy()
        .build()?;
    Ok(Elasticsearch::new(transport))
}

// Dengan autentikasi (Elastic Cloud atau self-hosted dengan security)
fn buat_client_auth(
    url: &str,
    username: &str,
    password: &str,
) -> Result<Elasticsearch, Box<dyn std::error::Error>> {
    let url = Url::parse(url)?;
    let pool = SingleNodeConnectionPool::new(url);
    let creds = Credentials::Basic(username.to_string(), password.to_string());
    let transport = TransportBuilder::new(pool)
        .auth(creds)
        .cert_validation(CertificateValidation::None)  // untuk dev/testing
        .build()?;
    Ok(Elasticsearch::new(transport))
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = buat_client("http://localhost:9200")?;

    // Verifikasi koneksi
    let info = client.info().send().await?;
    let body: serde_json::Value = info.json().await?;
    println!("Elasticsearch: {}", body["version"]["number"].as_str().unwrap_or("?"));

    Ok(())
}

Mapping — Mendefinisikan Struktur Index #

Mapping di Elasticsearch mirip dengan schema di database — mendefinisikan tipe setiap field:

use elasticsearch::{indices::IndicesCreateParts, Elasticsearch};
use serde_json::{json, Value};

async fn buat_index_artikel(
    client: &Elasticsearch,
) -> Result<(), Box<dyn std::error::Error>> {
    let mapping = json!({
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0,  // 0 untuk single-node dev
            "analysis": {
                "analyzer": {
                    "analyzer_indonesia": {
                        "type": "standard",
                        "stopwords": "_indonesian_"
                    }
                }
            }
        },
        "mappings": {
            "properties": {
                "judul": {
                    "type": "text",
                    "analyzer": "analyzer_indonesia",
                    "fields": {
                        "keyword": {  // sub-field untuk exact match dan sort
                            "type": "keyword"
                        }
                    }
                },
                "konten": {
                    "type": "text",
                    "analyzer": "analyzer_indonesia"
                },
                "tag": {
                    "type": "keyword"  // keyword: exact match, tidak dianalisis
                },
                "pengguna_id": {
                    "type": "long"
                },
                "diterbitkan": {
                    "type": "boolean"
                },
                "views": {
                    "type": "long"
                },
                "dibuat_pada": {
                    "type": "date",
                    "format": "strict_date_optional_time||epoch_millis"
                }
            }
        }
    });

    // Buat index — gunakan ignore_unavailable untuk idempoten
    let respons = client
        .indices()
        .create(IndicesCreateParts::Index("artikel"))
        .body(mapping)
        .send()
        .await?;

    if respons.status_code().is_success() {
        println!("Index 'artikel' berhasil dibuat");
    } else {
        let error: Value = respons.json().await?;
        println!("Index mungkin sudah ada: {}", error["error"]["type"].as_str().unwrap_or("?"));
    }

    Ok(())
}

// Hapus dan buat ulang index (untuk development)
async fn reset_index(
    client: &Elasticsearch,
    nama_index: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    use elasticsearch::indices::{IndicesDeleteParts, IndicesExistsParts};

    // Hapus jika ada
    let ada = client
        .indices()
        .exists(IndicesExistsParts::Index(&[nama_index]))
        .send()
        .await?
        .status_code()
        .is_success();

    if ada {
        client
            .indices()
            .delete(IndicesDeleteParts::Index(&[nama_index]))
            .send()
            .await?;
        println!("Index '{}' dihapus", nama_index);
    }

    Ok(())
}

Indexing Dokumen #

use elasticsearch::{IndexParts, BulkParts, Elasticsearch};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};

#[derive(Debug, Serialize, Deserialize, Clone)]
struct DokumenArtikel {
    pub id: i64,
    pub judul: String,
    pub konten: String,
    pub tag: Vec<String>,
    pub pengguna_id: i64,
    pub diterbitkan: bool,
    pub views: u64,
    pub dibuat_pada: String,  // ISO 8601 string
}

// Index satu dokumen
async fn index_artikel(
    client: &Elasticsearch,
    artikel: &DokumenArtikel,
) -> Result<String, Box<dyn std::error::Error>> {
    let respons = client
        .index(IndexParts::IndexId("artikel", &artikel.id.to_string()))
        .body(artikel)
        .send()
        .await?;

    let body: Value = respons.json().await?;
    let result = body["result"].as_str().unwrap_or("unknown").to_string();
    println!("Dokumen {}: {}", artikel.id, result); // "created" atau "updated"
    Ok(result)
}

// Bulk indexing — jauh lebih efisien untuk banyak dokumen
async fn bulk_index_artikel(
    client: &Elasticsearch,
    artikel_list: &[DokumenArtikel],
) -> Result<u64, Box<dyn std::error::Error>> {
    let mut body: Vec<Value> = Vec::new();

    for artikel in artikel_list {
        // Setiap operasi bulk terdiri dari dua baris: action + document
        body.push(json!({
            "index": {
                "_index": "artikel",
                "_id": artikel.id.to_string()
            }
        }));
        body.push(serde_json::to_value(artikel)?);
    }

    let respons = client
        .bulk(BulkParts::None)
        .body(body)
        .send()
        .await?;

    let result: Value = respons.json().await?;

    // Hitung dokumen yang berhasil
    let berhasil = result["items"]
        .as_array()
        .map(|items| items.iter().filter(|item| item["index"]["error"].is_null()).count())
        .unwrap_or(0);

    println!("Bulk index: {}/{} berhasil", berhasil, artikel_list.len());
    Ok(berhasil as u64)
}

// Hapus dokumen
async fn hapus_dokumen(
    client: &Elasticsearch,
    id: i64,
) -> Result<bool, Box<dyn std::error::Error>> {
    use elasticsearch::DeleteParts;

    let respons = client
        .delete(DeleteParts::IndexId("artikel", &id.to_string()))
        .send()
        .await?;

    Ok(respons.status_code().is_success())
}

Search — Pencarian Dokumen #

Full-Text Search Dasar #

use elasticsearch::{SearchParts, Elasticsearch};
use serde_json::{json, Value};

async fn cari_artikel(
    client: &Elasticsearch,
    query: &str,
) -> Result<Vec<Value>, Box<dyn std::error::Error>> {
    let respons = client
        .search(SearchParts::Index(&["artikel"]))
        .body(json!({
            "query": {
                // multi_match: cari di beberapa field sekaligus
                "multi_match": {
                    "query": query,
                    "fields": [
                        "judul^3",   // ^3 = boost: judul 3x lebih penting dari konten
                        "konten",
                        "tag"
                    ],
                    "type": "best_fields",
                    "fuzziness": "AUTO"  // toleransi typo otomatis
                }
            },
            "size": 20,
            "_source": ["id", "judul", "tag", "views", "dibuat_pada"]  // pilih field
        }))
        .send()
        .await?;

    let body: Value = respons.json().await?;
    let hits = body["hits"]["hits"]
        .as_array()
        .cloned()
        .unwrap_or_default();

    println!("Total ditemukan: {}", body["hits"]["total"]["value"]);
    Ok(hits)
}

Bool Query — Kombinasi Kondisi #

async fn cari_artikel_lanjutan(
    client: &Elasticsearch,
    query: &str,
    tag: Option<&str>,
    hanya_diterbitkan: bool,
    min_views: Option<u64>,
) -> Result<Vec<Value>, Box<dyn std::error::Error>> {
    let mut must: Vec<Value> = vec![];
    let mut filter: Vec<Value> = vec![];

    // must: kondisi yang mempengaruhi relevance score
    if !query.is_empty() {
        must.push(json!({
            "multi_match": {
                "query": query,
                "fields": ["judul^3", "konten"]
            }
        }));
    }

    // filter: kondisi yang tidak mempengaruhi score (lebih cepat, di-cache)
    if hanya_diterbitkan {
        filter.push(json!({"term": {"diterbitkan": true}}));
    }
    if let Some(t) = tag {
        filter.push(json!({"term": {"tag": t}}));
    }
    if let Some(views) = min_views {
        filter.push(json!({"range": {"views": {"gte": views}}}));
    }

    // Jika tidak ada query teks, tampilkan semua (match_all)
    if must.is_empty() {
        must.push(json!({"match_all": {}}));
    }

    let respons = client
        .search(SearchParts::Index(&["artikel"]))
        .body(json!({
            "query": {
                "bool": {
                    "must": must,
                    "filter": filter
                }
            },
            "sort": [
                {"_score": "desc"},
                {"dibuat_pada": "desc"}
            ],
            "size": 20
        }))
        .send()
        .await?;

    let body: Value = respons.json().await?;
    Ok(body["hits"]["hits"].as_array().cloned().unwrap_or_default())
}

Highlighting — Tandai Kata yang Cocok #

async fn cari_dengan_highlight(
    client: &Elasticsearch,
    query: &str,
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error>> {
    let respons = client
        .search(SearchParts::Index(&["artikel"]))
        .body(json!({
            "query": {
                "multi_match": {
                    "query": query,
                    "fields": ["judul", "konten"]
                }
            },
            "highlight": {
                "fields": {
                    "judul": {
                        "pre_tags": ["<strong>"],
                        "post_tags": ["</strong>"]
                    },
                    "konten": {
                        "pre_tags": ["<em>"],
                        "post_tags": ["</em>"],
                        "number_of_fragments": 3,
                        "fragment_size": 150
                    }
                }
            },
            "size": 10
        }))
        .send()
        .await?;

    let body: Value = respons.json().await?;
    let hasil: Vec<(String, String)> = body["hits"]["hits"]
        .as_array()
        .unwrap_or(&vec![])
        .iter()
        .map(|hit| {
            let judul = hit["highlight"]["judul"][0]
                .as_str()
                .or_else(|| hit["_source"]["judul"].as_str())
                .unwrap_or("")
                .to_string();
            let snippet = hit["highlight"]["konten"][0]
                .as_str()
                .unwrap_or("")
                .to_string();
            (judul, snippet)
        })
        .collect();

    Ok(hasil)
}

Aggregation — Analitik #

async fn statistik_artikel(
    client: &Elasticsearch,
) -> Result<Value, Box<dyn std::error::Error>> {
    let respons = client
        .search(SearchParts::Index(&["artikel"]))
        .body(json!({
            "size": 0,  // tidak perlu dokumen individual — hanya agregasi
            "aggs": {
                "per_tag": {
                    "terms": {
                        "field": "tag",
                        "size": 10  // top 10 tag
                    },
                    "aggs": {
                        "total_views": {
                            "sum": {"field": "views"}
                        },
                        "rata_views": {
                            "avg": {"field": "views"}
                        }
                    }
                },
                "per_bulan": {
                    "date_histogram": {
                        "field": "dibuat_pada",
                        "calendar_interval": "month",
                        "format": "yyyy-MM"
                    },
                    "aggs": {
                        "jumlah_diterbitkan": {
                            "filter": {"term": {"diterbitkan": true}}
                        }
                    }
                },
                "views_stats": {
                    "stats": {"field": "views"}  // min, max, avg, sum, count
                }
            }
        }))
        .send()
        .await?;

    let body: Value = respons.json().await?;

    // Tampilkan agregasi
    if let Some(per_tag) = body["aggregations"]["per_tag"]["buckets"].as_array() {
        println!("\n=== Top Tag ===");
        for bucket in per_tag {
            println!(
                "{}: {} artikel, {} total views",
                bucket["key"].as_str().unwrap_or("?"),
                bucket["doc_count"],
                bucket["total_views"]["value"]
            );
        }
    }

    Ok(body["aggregations"].clone())
}

Pagination dengan search_after #

Untuk dataset besar, from/size tidak efisien setelah halaman 100+. Gunakan search_after untuk kedalaman pagination yang efisien:

async fn cari_berhalaman(
    client: &Elasticsearch,
    query: &str,
    per_halaman: u64,
    search_after: Option<Vec<Value>>,  // cursor dari halaman sebelumnya
) -> Result<(Vec<Value>, Option<Vec<Value>>), Box<dyn std::error::Error>> {
    let mut body = json!({
        "query": {
            "multi_match": {
                "query": query,
                "fields": ["judul", "konten"]
            }
        },
        "sort": [
            {"_score": "desc"},
            {"_id": "asc"}  // tie-breaker wajib untuk search_after
        ],
        "size": per_halaman,
        // pit: point in time untuk konsistensi halaman
    });

    // Tambahkan search_after jika ada (halaman berikutnya)
    if let Some(cursor) = search_after {
        body["search_after"] = json!(cursor);
    }

    let respons = client
        .search(SearchParts::Index(&["artikel"]))
        .body(body)
        .send()
        .await?;

    let result: Value = respons.json().await?;
    let hits = result["hits"]["hits"]
        .as_array()
        .cloned()
        .unwrap_or_default();

    // Ambil sort values dari dokumen terakhir sebagai cursor berikutnya
    let cursor_berikutnya = hits.last()
        .and_then(|hit| hit["sort"].as_array().cloned());

    Ok((hits, cursor_berikutnya))
}

Sinkronisasi Database → Elasticsearch #

Pola umum dalam arsitektur produksi: database utama (PostgreSQL/MySQL) sebagai source of truth, Elasticsearch untuk pencarian:

sequenceDiagram
    participant API as API Server
    participant DB as PostgreSQL
    participant ES as Elasticsearch

    API->>DB: INSERT/UPDATE artikel
    DB->>API: OK + data artikel
    API->>ES: Index dokumen (async)
    ES->>API: Acknowledged

    API->>ES: Cari artikel
    ES->>API: Hasil pencarian (cepat)

    API->>DB: Ambil detail artikel by ID
    DB->>API: Data lengkap
use tokio::sync::mpsc;

#[derive(Debug, Clone)]
enum EventSinkronisasi {
    Index { id: i64, dokumen: DokumenArtikel },
    Hapus { id: i64 },
}

// Worker yang mendengarkan event dan mengindeks ke ES
async fn worker_sinkronisasi(
    client: elasticsearch::Elasticsearch,
    mut rx: mpsc::Receiver<EventSinkronisasi>,
) {
    while let Some(event) = rx.recv().await {
        match event {
            EventSinkronisasi::Index { id, dokumen } => {
                if let Err(e) = index_artikel(&client, &dokumen).await {
                    eprintln!("Gagal index artikel {}: {}", id, e);
                }
            }
            EventSinkronisasi::Hapus { id } => {
                if let Err(e) = hapus_dokumen(&client, id).await {
                    eprintln!("Gagal hapus dokumen {}: {}", id, e);
                }
            }
        }
    }
}

// Di handler API — kirim event setelah operasi database
async fn handler_buat_artikel(
    tx: mpsc::Sender<EventSinkronisasi>,
    // ... data artikel dari request
) {
    // 1. Simpan ke database (source of truth)
    // let artikel = db.insert_artikel(...).await?;

    // 2. Kirim event untuk diindeks ke ES (fire and forget)
    let dokumen = DokumenArtikel {
        id: 1,
        judul: "Judul Artikel".to_string(),
        konten: "Konten lengkap...".to_string(),
        tag: vec!["rust".to_string()],
        pengguna_id: 1,
        diterbitkan: true,
        views: 0,
        dibuat_pada: chrono::Utc::now().to_rfc3339(),
    };

    let _ = tx.send(EventSinkronisasi::Index { id: 1, dokumen }).await;
}

Re-indexing — Memperbarui Semua Dokumen #

async fn reindex_semua(
    client: &Elasticsearch,
    artikel_list: Vec<DokumenArtikel>,
) -> Result<(), Box<dyn std::error::Error>> {
    const BATCH_SIZE: usize = 500;

    // Hapus index lama dan buat ulang
    reset_index(client, "artikel").await?;
    buat_index_artikel(client).await?;

    // Proses dalam batch
    for (i, batch) in artikel_list.chunks(BATCH_SIZE).enumerate() {
        bulk_index_artikel(client, batch).await?;
        println!("Batch {}: {} dokumen diindeks", i + 1, batch.len());
    }

    println!("Reindex selesai: {} dokumen total", artikel_list.len());
    Ok(())
}

Ringkasan #

  • Elasticsearch bukan pengganti database — gunakan sebagai lapisan pencarian di atas database utama (PostgreSQL/MySQL). Database adalah source of truth, Elasticsearch untuk query search yang cepat.
  • Mapping penting untuk performa — definisikan tipe field secara eksplisit. text untuk full-text search, keyword untuk exact match dan aggregation, date untuk range query waktu.
  • Boost field penting"fields": ["judul^3", "konten"] membuat hasil di field judul 3x lebih relevan dari konten. Sesuaikan bobot berdasarkan domain aplikasi.
  • filter vs must dalam bool queryfilter tidak mempengaruhi relevance score dan di-cache oleh Elasticsearch (lebih cepat). Gunakan filter untuk kondisi binary (aktif/tidak, range tanggal), must untuk kondisi yang butuh scoring.
  • Bulk indexing untuk banyak dokumen — jauh lebih efisien dari indexing satu per satu. Gunakan batch size 500–1000 dokumen per request.
  • search_after untuk deep pagination — lebih efisien dari from/size yang memuat semua dokumen dari halaman 1. Wajib ada field sort yang unik sebagai tie-breaker (misalnya _id).
  • Aggregation untuk analitikterms, date_histogram, stats, range. Set size: 0 jika hanya butuh agregasi tanpa dokumen.
  • Sinkronisasi async dengan channel — kirim event ke worker yang mengindeks ke ES setelah operasi database sukses. Hindari sinkronisasi blocking di hot path request.
  • Re-indexing dengan zero downtime — buat index baru, isi dari database, lalu pindahkan alias. Jangan drop index yang aktif sebelum index baru siap.

← Sebelumnya: MongoDB   Berikutnya: Kafka →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact