RabbitMQ

RabbitMQ #

RabbitMQ adalah message broker yang mengimplementasikan protokol AMQP (Advanced Message Queuing Protocol). Berbeda dari Kafka yang menyimpan event dalam log append-only, RabbitMQ adalah message broker klasik: pesan dikirim ke exchange, dirutekan ke queue, dikonsumsi oleh consumer, dan dihapus setelah di-acknowledge. Ini menjadikannya pilihan ideal untuk task queue, RPC asinkron, dan routing pesan yang kompleks. Di Rust, crate lapin adalah implementasi AMQP 0-9-1 yang async dan aktif dikembangkan. Artikel ini membahas semua primitif RabbitMQ — exchange, queue, binding, publish, consume — beserta pola-pola produksi seperti Dead Letter Exchange dan RPC.

Konsep Dasar RabbitMQ #

flowchart LR
    P["Producer"] --> EX

    subgraph RabbitMQ
        EX["Exchange\n(Direct/Fanout/Topic/Headers)"]
        EX -->|routing key| Q1["Queue A"]
        EX -->|routing key| Q2["Queue B"]
        EX -->|routing key| Q3["Queue C"]
    end

    Q1 --> C1["Consumer 1"]
    Q2 --> C2["Consumer 2"]
    Q3 --> C3["Consumer 3"]
KomponenFungsi
ExchangeMenerima pesan dari producer dan meneruskan ke queue berdasarkan aturan
QueueMenyimpan pesan sampai dikonsumsi
BindingAturan yang menghubungkan exchange ke queue
Routing KeyLabel pada pesan yang dicocokkan dengan binding
Ack/NackKonfirmasi pemrosesan pesan dari consumer

Empat Jenis Exchange #

ExchangeRoutingKapan digunakan
DirectCocokkan routing key tepatTask queue, routing spesifik
FanoutKirim ke semua queue yang terikatBroadcast, notifikasi
TopicCocokkan pola wildcard (* dan #)Log routing, event categories
HeadersCocokkan header pesanRouting kompleks tanpa routing key

Instalasi #

[dependencies]
lapin = "2"
tokio = { version = "1", features = ["full"] }
tokio-amqp = "2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
futures = "0.3"

Koneksi dan Channel #

use lapin::{
    options::*, types::FieldTable, Connection, ConnectionProperties,
};

async fn buat_koneksi(uri: &str) -> Result<Connection, lapin::Error> {
    Connection::connect(
        uri,
        ConnectionProperties::default()
            .with_executor(tokio_executor_trait::Tokio::current())
            .with_reactor(tokio_reactor_trait::Tokio),
    )
    .await
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Format: amqp://user:password@host:port/vhost
    let uri = std::env::var("RABBITMQ_URL")
        .unwrap_or_else(|_| "amqp://guest:guest@localhost:5672/%2f".to_string());

    let conn = buat_koneksi(&uri).await?;
    println!("Terhubung ke RabbitMQ");

    // Channel: unit kerja dalam satu koneksi
    // Satu koneksi bisa punya banyak channel (lebih efisien dari banyak koneksi)
    let channel = conn.create_channel().await?;
    println!("Channel dibuat");

    Ok(())
}

Deklarasi Exchange dan Queue #

Deklarasi bersifat idempoten — aman dipanggil berkali-kali selama parameter sama:

use lapin::{
    options::*,
    types::{FieldTable, AMQPValue},
    Channel, ExchangeKind,
};

async fn setup_infrastruktur(channel: &Channel) -> Result<(), lapin::Error> {
    // Deklarasi exchange Direct
    channel.exchange_declare(
        "pesanan",                          // nama exchange
        ExchangeKind::Direct,
        ExchangeDeclareOptions {
            durable: true,                  // bertahan setelah broker restart
            ..Default::default()
        },
        FieldTable::default(),
    )
    .await?;

    // Deklarasi exchange Topic untuk routing berdasarkan pola
    channel.exchange_declare(
        "events",
        ExchangeKind::Topic,
        ExchangeDeclareOptions { durable: true, ..Default::default() },
        FieldTable::default(),
    )
    .await?;

    // Deklarasi exchange Fanout untuk broadcast
    channel.exchange_declare(
        "notifikasi",
        ExchangeKind::Fanout,
        ExchangeDeclareOptions { durable: true, ..Default::default() },
        FieldTable::default(),
    )
    .await?;

    // Deklarasi Dead Letter Exchange (DLX) — untuk pesan yang gagal
    channel.exchange_declare(
        "dlx",
        ExchangeKind::Direct,
        ExchangeDeclareOptions { durable: true, ..Default::default() },
        FieldTable::default(),
    )
    .await?;

    // Deklarasi queue dengan DLX
    let mut args = FieldTable::default();
    args.insert(
        "x-dead-letter-exchange".into(),
        AMQPValue::LongString("dlx".into()),
    );
    args.insert(
        "x-message-ttl".into(),
        AMQPValue::LongUInt(300_000), // 5 menit TTL
    );

    channel.queue_declare(
        "pesanan.baru",
        QueueDeclareOptions {
            durable: true,              // queue bertahan setelah restart
            ..Default::default()
        },
        args,
    )
    .await?;

    // Queue DLQ untuk menerima pesan yang mati
    channel.queue_declare(
        "dlq.pesanan",
        QueueDeclareOptions { durable: true, ..Default::default() },
        FieldTable::default(),
    )
    .await?;

    // Binding queue ke exchange dengan routing key
    channel.queue_bind(
        "pesanan.baru",     // nama queue
        "pesanan",          // nama exchange
        "baru",             // routing key
        QueueBindOptions::default(),
        FieldTable::default(),
    )
    .await?;

    // Binding DLQ ke DLX
    channel.queue_bind(
        "dlq.pesanan",
        "dlx",
        "pesanan.baru",
        QueueBindOptions::default(),
        FieldTable::default(),
    )
    .await?;

    println!("Exchange dan queue berhasil dideklarasi");
    Ok(())
}

Publisher — Mengirim Pesan #

use lapin::{
    options::BasicPublishOptions,
    BasicProperties, Channel,
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
struct PesanPesanan {
    pub id: u64,
    pub pengguna_id: u64,
    pub produk: Vec<String>,
    pub total: f64,
    pub prioritas: u8,
}

struct Publisher {
    channel: Channel,
}

impl Publisher {
    fn baru(channel: Channel) -> Self {
        Publisher { channel }
    }

    async fn kirim<T: Serialize>(
        &self,
        exchange: &str,
        routing_key: &str,
        pesan: &T,
        prioritas: u8,
    ) -> Result<(), lapin::Error> {
        let payload = serde_json::to_vec(pesan)
            .map_err(|_| lapin::Error::InvalidChannel(0))?;

        let properties = BasicProperties::default()
            .with_content_type("application/json".into())
            .with_delivery_mode(2)      // 2 = persistent (bertahan setelah restart)
            .with_priority(prioritas)   // prioritas 0-255
            .with_message_id(uuid::Uuid::new_v4().to_string().into());

        self.channel
            .basic_publish(
                exchange,
                routing_key,
                BasicPublishOptions {
                    mandatory: true,    // error jika tidak ada queue yang menerima
                    ..Default::default()
                },
                &payload,
                properties,
            )
            .await?
            .await?;  // tunggu konfirmasi dari broker (publisher confirm)

        Ok(())
    }
}

use uuid;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let conn = buat_koneksi("amqp://guest:guest@localhost:5672/%2f").await?;
    let channel = conn.create_channel().await?;

    // Aktifkan publisher confirm — tunggu broker acknowledge setiap pesan
    channel.confirm_select(ConfirmSelectOptions::default()).await?;

    setup_infrastruktur(&channel).await?;

    let publisher = Publisher::baru(channel);

    let pesanan = PesanPesanan {
        id: 1001,
        pengguna_id: 42,
        produk: vec!["Laptop".to_string(), "Mouse".to_string()],
        total: 15_250_000.0,
        prioritas: 5,
    };

    // Kirim ke exchange "pesanan" dengan routing key "baru"
    publisher.kirim("pesanan", "baru", &pesanan, 5).await?;
    println!("Pesan terkirim: pesanan #{}", pesanan.id);

    Ok(())
}

async fn buat_koneksi(uri: &str) -> Result<lapin::Connection, lapin::Error> {
    lapin::Connection::connect(
        uri,
        lapin::ConnectionProperties::default(),
    )
    .await
}

async fn setup_infrastruktur(_: &lapin::Channel) -> Result<(), lapin::Error> {
    Ok(()) // implementasi di atas
}

Consumer — Menerima Pesan #

Consumer dengan Manual Ack #

use futures::StreamExt;
use lapin::{
    message::DeliveryResult,
    options::*,
    Channel,
};

async fn konsumsi_pesanan(channel: Channel) -> Result<(), lapin::Error> {
    // QoS: maks 10 pesan dalam proses bersamaan (prefetch)
    channel.basic_qos(10, BasicQosOptions::default()).await?;

    let mut consumer = channel
        .basic_consume(
            "pesanan.baru",             // nama queue
            "consumer-pesanan-1",       // consumer tag (unik per consumer)
            BasicConsumeOptions {
                no_ack: false,          // manual ack (lebih aman)
                ..Default::default()
            },
            FieldTable::default(),
        )
        .await?;

    println!("Consumer aktif, menunggu pesan...");

    while let Some(delivery_result) = consumer.next().await {
        match delivery_result {
            Ok(delivery) => {
                let payload = std::str::from_utf8(&delivery.data)
                    .unwrap_or("");

                println!(
                    "Pesan diterima [tag: {}]: {}",
                    delivery.delivery_tag,
                    &payload[..payload.len().min(100)]
                );

                match serde_json::from_str::<PesanPesanan>(payload) {
                    Ok(pesanan) => {
                        match proses_pesanan(&pesanan).await {
                            Ok(_) => {
                                // Acknowledge — pesan dihapus dari queue
                                delivery
                                    .ack(BasicAckOptions::default())
                                    .await?;
                                println!("Pesanan #{} selesai diproses", pesanan.id);
                            }
                            Err(e) => {
                                eprintln!("Gagal proses pesanan #{}: {}", pesanan.id, e);
                                // Nack dengan requeue=false → pesan ke DLX
                                delivery
                                    .nack(BasicNackOptions {
                                        requeue: false,
                                        ..Default::default()
                                    })
                                    .await?;
                            }
                        }
                    }
                    Err(e) => {
                        eprintln!("Pesan tidak bisa diparse: {}", e);
                        // Pesan rusak → buang (jangan requeue)
                        delivery.reject(BasicRejectOptions { requeue: false }).await?;
                    }
                }
            }
            Err(e) => {
                eprintln!("Error consumer: {}", e);
                break;
            }
        }
    }

    Ok(())
}

async fn proses_pesanan(pesanan: &PesanPesanan) -> Result<(), String> {
    println!("Memproses pesanan #{} total Rp{:.0}", pesanan.id, pesanan.total);
    // Logika bisnis...
    Ok(())
}

Consumer Paralel dengan Banyak Worker #

use std::sync::Arc;
use tokio::sync::Semaphore;

async fn konsumsi_paralel(
    channel: Channel,
    maks_paralel: usize,
) -> Result<(), lapin::Error> {
    // Semaphore membatasi pemrosesan paralel
    let semaphore = Arc::new(Semaphore::new(maks_paralel));

    channel.basic_qos(
        maks_paralel as u16,
        BasicQosOptions::default(),
    ).await?;

    let mut consumer = channel
        .basic_consume(
            "pesanan.baru",
            "consumer-paralel",
            BasicConsumeOptions { no_ack: false, ..Default::default() },
            FieldTable::default(),
        )
        .await?;

    while let Some(Ok(delivery)) = consumer.next().await {
        let permit = Arc::clone(&semaphore).acquire_owned().await.unwrap();

        tokio::spawn(async move {
            let _permit = permit; // permit dilepas saat task selesai

            let payload = std::str::from_utf8(&delivery.data).unwrap_or("");
            if let Ok(pesanan) = serde_json::from_str::<PesanPesanan>(payload) {
                match proses_pesanan(&pesanan).await {
                    Ok(_) => { let _ = delivery.ack(BasicAckOptions::default()).await; }
                    Err(_) => {
                        let _ = delivery.nack(BasicNackOptions {
                            requeue: false, ..Default::default()
                        }).await;
                    }
                }
            } else {
                let _ = delivery.reject(BasicRejectOptions { requeue: false }).await;
            }
        });
    }

    Ok(())
}

use lapin::Channel;

async fn proses_pesanan(_: &PesanPesanan) -> Result<(), String> { Ok(()) }

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
struct PesanPesanan {
    id: u64,
    pengguna_id: u64,
    produk: Vec<String>,
    total: f64,
    prioritas: u8,
}

Topic Exchange — Routing Berbasis Pola #

Topic exchange menggunakan wildcard untuk routing yang fleksibel:

async fn setup_topic_routing(channel: &Channel) -> Result<(), lapin::Error> {
    // Exchange topic sudah dideklarasi sebelumnya

    // Queue untuk semua event pesanan
    channel.queue_declare("log.pesanan", QueueDeclareOptions {
        durable: true, ..Default::default()
    }, FieldTable::default()).await?;

    // Queue khusus event error saja
    channel.queue_declare("alert.error", QueueDeclareOptions {
        durable: true, ..Default::default()
    }, FieldTable::default()).await?;

    // Queue untuk semua event dari layanan pembayaran
    channel.queue_declare("log.pembayaran", QueueDeclareOptions {
        durable: true, ..Default::default()
    }, FieldTable::default()).await?;

    // Binding dengan pola topic:
    // * = satu kata, # = nol atau lebih kata

    // pesanan.* → semua event pesanan (pesanan.baru, pesanan.selesai, dll.)
    channel.queue_bind("log.pesanan", "events", "pesanan.*",
        QueueBindOptions::default(), FieldTable::default()).await?;

    // *.error → semua error dari semua layanan
    channel.queue_bind("alert.error", "events", "*.error",
        QueueBindOptions::default(), FieldTable::default()).await?;

    // pembayaran.# → semua event dari layanan pembayaran
    channel.queue_bind("log.pembayaran", "events", "pembayaran.#",
        QueueBindOptions::default(), FieldTable::default()).await?;

    Ok(())
}

// Mengirim event dengan routing key bertingkat
async fn kirim_event_topic(channel: &Channel) -> Result<(), lapin::Error> {
    let events = vec![
        ("pesanan.baru", r#"{"id": 1, "status": "baru"}"#),
        ("pesanan.selesai", r#"{"id": 1, "status": "selesai"}"#),
        ("pembayaran.berhasil", r#"{"id": 1, "jumlah": 150000}"#),
        ("pembayaran.gagal.timeout", r#"{"id": 2, "alasan": "timeout"}"#),
        ("inventori.error", r#"{"item": "A", "error": "stok habis"}"#),
    ];

    for (routing_key, payload) in events {
        channel.basic_publish(
            "events",
            routing_key,
            BasicPublishOptions::default(),
            payload.as_bytes(),
            BasicProperties::default().with_delivery_mode(2),
        )
        .await?
        .await?;

        println!("Event '{}' dikirim", routing_key);
    }

    Ok(())
}

RPC Pattern — Request-Reply via RabbitMQ #

RabbitMQ bisa digunakan untuk RPC asinkron — producer mengirim request dan menunggu reply di queue sementara:

use lapin::{options::*, BasicProperties, Channel};
use futures::StreamExt;
use tokio::sync::oneshot;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

// Client RPC
async fn rpc_call(
    channel: &Channel,
    payload: &str,
) -> Result<String, Box<dyn std::error::Error>> {
    // Buat queue reply sementara yang akan dihapus otomatis
    let reply_queue = channel.queue_declare(
        "",  // nama kosong = nama unik dihasilkan broker
        QueueDeclareOptions {
            exclusive: true,    // hanya bisa diakses oleh koneksi ini
            auto_delete: true,  // dihapus saat consumer terakhir pergi
            ..Default::default()
        },
        FieldTable::default(),
    )
    .await?;

    let reply_queue_name = reply_queue.name().as_str().to_string();
    let correlation_id = uuid::Uuid::new_v4().to_string();

    // Kirim request dengan correlation_id dan reply_to
    channel.basic_publish(
        "",             // default exchange
        "rpc.hitung",  // routing ke queue server RPC
        BasicPublishOptions::default(),
        payload.as_bytes(),
        BasicProperties::default()
            .with_reply_to(reply_queue_name.clone().into())
            .with_correlation_id(correlation_id.clone().into()),
    )
    .await?
    .await?;

    // Tunggu reply di queue sementara
    let mut consumer = channel.basic_consume(
        &reply_queue_name,
        "",
        BasicConsumeOptions {
            no_ack: true,
            ..Default::default()
        },
        FieldTable::default(),
    )
    .await?;

    // Timeout 5 detik
    let reply = tokio::time::timeout(
        std::time::Duration::from_secs(5),
        async {
            while let Some(Ok(delivery)) = consumer.next().await {
                let corr = delivery.properties.correlation_id()
                    .as_ref()
                    .map(|s| s.as_str().to_string())
                    .unwrap_or_default();

                if corr == correlation_id {
                    return Ok(String::from_utf8_lossy(&delivery.data).to_string());
                }
            }
            Err("Tidak ada reply")
        },
    )
    .await
    .map_err(|_| "Timeout menunggu reply")??;

    Ok(reply)
}

Perbandingan RabbitMQ vs Kafka #

Aspek                   RabbitMQ                    Kafka

Model penyimpanan
  Hapus setelah ack     Ya (default)                Tidak — simpan di log
  Replay event          Tidak (kecuali konfigurasi) Ya — konsumen baca ulang offset mana saja

Routing
  Kompleksitas          Sangat kaya (4 jenis exch.) Sederhana (topic + partisi)
  Wildcard              Ya (Topic exchange)         Tidak

Throughput
  Per node              ~50k-100k msg/detik         ~1juta+ msg/detik
  Cocok untuk           Task queue, RPC             Event streaming, log

Ordering
  Per queue             Ya (1 consumer)             Per partisi

Acknowledgment
  Model                 Ack/Nack/Reject per pesan  Commit offset (batch)

Consumer
  Konkurensi            Banyak consumer per queue   1 consumer per partisi per group

Setup
  Kompleksitas          Sederhana (single broker)  Lebih kompleks (ZK/KRaft + broker)

Kapan memilih RabbitMQ:
  ✓ Task queue — proses job satu per satu dengan ack
  ✓ RPC asinkron — request-reply pattern
  ✓ Routing kompleks berdasarkan pola atau header
  ✓ Dead Letter Queue yang mudah dikonfigurasi
  ✓ Tim sudah familiar dengan AMQP

Kapan memilih Kafka:
  ✓ Event streaming volume sangat tinggi (jutaan/detik)
  ✓ Perlu replay event historis
  ✓ Event log sebagai source of truth
  ✓ Banyak consumer group independen membaca event yang sama

Ringkasan #

  • Channel bukan koneksi — satu koneksi AMQP bisa punya banyak channel. Buat satu channel per thread/task, bukan satu koneksi per thread.
  • Deklarasi idempotenqueue_declare dan exchange_declare aman dipanggil berkali-kali; broker tidak error jika sudah ada dengan parameter sama.
  • delivery_mode: 2 untuk persistent message — pesan di-flush ke disk sebelum di-ack. Tanpanya, pesan hilang jika broker restart.
  • Manual ack selalu lebih aman dari auto-ack — dengan no_ack: false, pesan baru dianggap selesai setelah delivery.ack() dipanggil. Jika consumer crash sebelum ack, pesan akan di-requeue.
  • Nack dengan requeue: false → Dead Letter Exchange — jika queue dikonfigurasi dengan DLX, pesan yang di-nack tanpa requeue akan diteruskan ke DLX untuk investigasi.
  • Prefetch (basic_qos) untuk rate limiting — batasi berapa pesan yang dikirim broker ke consumer sebelum ada ack. Mencegah consumer kewalahan dengan beban yang tak terbatas.
  • Topic exchange untuk routing fleksibelpesanan.* cocokkan satu level, pembayaran.# cocokkan nol atau lebih level. Lebih powerful dari Direct tapi tetap efisien.
  • Publisher confirm untuk jaminan pengiriman — aktifkan confirm_select dan await hasil basic_publish dua kali untuk memastikan broker sudah menerima pesan.
  • RabbitMQ lebih mudah, Kafka lebih scalable — pilih RabbitMQ untuk task queue dan RPC, Kafka untuk event streaming volume tinggi yang perlu replay.

← Sebelumnya: Kafka   Berikutnya: Amazon SQS →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact