RabbitMQ #
RabbitMQ adalah message broker yang mengimplementasikan protokol AMQP (Advanced Message Queuing Protocol). Berbeda dari Kafka yang menyimpan event dalam log append-only, RabbitMQ adalah message broker klasik: pesan dikirim ke exchange, dirutekan ke queue, dikonsumsi oleh consumer, dan dihapus setelah di-acknowledge. Ini menjadikannya pilihan ideal untuk task queue, RPC asinkron, dan routing pesan yang kompleks. Di Rust, crate lapin adalah implementasi AMQP 0-9-1 yang async dan aktif dikembangkan. Artikel ini membahas semua primitif RabbitMQ — exchange, queue, binding, publish, consume — beserta pola-pola produksi seperti Dead Letter Exchange dan RPC.
Konsep Dasar RabbitMQ #
flowchart LR
P["Producer"] --> EX
subgraph RabbitMQ
EX["Exchange\n(Direct/Fanout/Topic/Headers)"]
EX -->|routing key| Q1["Queue A"]
EX -->|routing key| Q2["Queue B"]
EX -->|routing key| Q3["Queue C"]
end
Q1 --> C1["Consumer 1"]
Q2 --> C2["Consumer 2"]
Q3 --> C3["Consumer 3"]| Komponen | Fungsi |
|---|---|
| Exchange | Menerima pesan dari producer dan meneruskan ke queue berdasarkan aturan |
| Queue | Menyimpan pesan sampai dikonsumsi |
| Binding | Aturan yang menghubungkan exchange ke queue |
| Routing Key | Label pada pesan yang dicocokkan dengan binding |
| Ack/Nack | Konfirmasi pemrosesan pesan dari consumer |
Empat Jenis Exchange #
| Exchange | Routing | Kapan digunakan |
|---|---|---|
| Direct | Cocokkan routing key tepat | Task queue, routing spesifik |
| Fanout | Kirim ke semua queue yang terikat | Broadcast, notifikasi |
| Topic | Cocokkan pola wildcard (* dan #) | Log routing, event categories |
| Headers | Cocokkan header pesan | Routing kompleks tanpa routing key |
Instalasi #
[dependencies]
lapin = "2"
tokio = { version = "1", features = ["full"] }
tokio-amqp = "2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
futures = "0.3"
Koneksi dan Channel #
use lapin::{
options::*, types::FieldTable, Connection, ConnectionProperties,
};
async fn buat_koneksi(uri: &str) -> Result<Connection, lapin::Error> {
Connection::connect(
uri,
ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio),
)
.await
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Format: amqp://user:password@host:port/vhost
let uri = std::env::var("RABBITMQ_URL")
.unwrap_or_else(|_| "amqp://guest:guest@localhost:5672/%2f".to_string());
let conn = buat_koneksi(&uri).await?;
println!("Terhubung ke RabbitMQ");
// Channel: unit kerja dalam satu koneksi
// Satu koneksi bisa punya banyak channel (lebih efisien dari banyak koneksi)
let channel = conn.create_channel().await?;
println!("Channel dibuat");
Ok(())
}
Deklarasi Exchange dan Queue #
Deklarasi bersifat idempoten — aman dipanggil berkali-kali selama parameter sama:
use lapin::{
options::*,
types::{FieldTable, AMQPValue},
Channel, ExchangeKind,
};
async fn setup_infrastruktur(channel: &Channel) -> Result<(), lapin::Error> {
// Deklarasi exchange Direct
channel.exchange_declare(
"pesanan", // nama exchange
ExchangeKind::Direct,
ExchangeDeclareOptions {
durable: true, // bertahan setelah broker restart
..Default::default()
},
FieldTable::default(),
)
.await?;
// Deklarasi exchange Topic untuk routing berdasarkan pola
channel.exchange_declare(
"events",
ExchangeKind::Topic,
ExchangeDeclareOptions { durable: true, ..Default::default() },
FieldTable::default(),
)
.await?;
// Deklarasi exchange Fanout untuk broadcast
channel.exchange_declare(
"notifikasi",
ExchangeKind::Fanout,
ExchangeDeclareOptions { durable: true, ..Default::default() },
FieldTable::default(),
)
.await?;
// Deklarasi Dead Letter Exchange (DLX) — untuk pesan yang gagal
channel.exchange_declare(
"dlx",
ExchangeKind::Direct,
ExchangeDeclareOptions { durable: true, ..Default::default() },
FieldTable::default(),
)
.await?;
// Deklarasi queue dengan DLX
let mut args = FieldTable::default();
args.insert(
"x-dead-letter-exchange".into(),
AMQPValue::LongString("dlx".into()),
);
args.insert(
"x-message-ttl".into(),
AMQPValue::LongUInt(300_000), // 5 menit TTL
);
channel.queue_declare(
"pesanan.baru",
QueueDeclareOptions {
durable: true, // queue bertahan setelah restart
..Default::default()
},
args,
)
.await?;
// Queue DLQ untuk menerima pesan yang mati
channel.queue_declare(
"dlq.pesanan",
QueueDeclareOptions { durable: true, ..Default::default() },
FieldTable::default(),
)
.await?;
// Binding queue ke exchange dengan routing key
channel.queue_bind(
"pesanan.baru", // nama queue
"pesanan", // nama exchange
"baru", // routing key
QueueBindOptions::default(),
FieldTable::default(),
)
.await?;
// Binding DLQ ke DLX
channel.queue_bind(
"dlq.pesanan",
"dlx",
"pesanan.baru",
QueueBindOptions::default(),
FieldTable::default(),
)
.await?;
println!("Exchange dan queue berhasil dideklarasi");
Ok(())
}
Publisher — Mengirim Pesan #
use lapin::{
options::BasicPublishOptions,
BasicProperties, Channel,
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct PesanPesanan {
pub id: u64,
pub pengguna_id: u64,
pub produk: Vec<String>,
pub total: f64,
pub prioritas: u8,
}
struct Publisher {
channel: Channel,
}
impl Publisher {
fn baru(channel: Channel) -> Self {
Publisher { channel }
}
async fn kirim<T: Serialize>(
&self,
exchange: &str,
routing_key: &str,
pesan: &T,
prioritas: u8,
) -> Result<(), lapin::Error> {
let payload = serde_json::to_vec(pesan)
.map_err(|_| lapin::Error::InvalidChannel(0))?;
let properties = BasicProperties::default()
.with_content_type("application/json".into())
.with_delivery_mode(2) // 2 = persistent (bertahan setelah restart)
.with_priority(prioritas) // prioritas 0-255
.with_message_id(uuid::Uuid::new_v4().to_string().into());
self.channel
.basic_publish(
exchange,
routing_key,
BasicPublishOptions {
mandatory: true, // error jika tidak ada queue yang menerima
..Default::default()
},
&payload,
properties,
)
.await?
.await?; // tunggu konfirmasi dari broker (publisher confirm)
Ok(())
}
}
use uuid;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let conn = buat_koneksi("amqp://guest:guest@localhost:5672/%2f").await?;
let channel = conn.create_channel().await?;
// Aktifkan publisher confirm — tunggu broker acknowledge setiap pesan
channel.confirm_select(ConfirmSelectOptions::default()).await?;
setup_infrastruktur(&channel).await?;
let publisher = Publisher::baru(channel);
let pesanan = PesanPesanan {
id: 1001,
pengguna_id: 42,
produk: vec!["Laptop".to_string(), "Mouse".to_string()],
total: 15_250_000.0,
prioritas: 5,
};
// Kirim ke exchange "pesanan" dengan routing key "baru"
publisher.kirim("pesanan", "baru", &pesanan, 5).await?;
println!("Pesan terkirim: pesanan #{}", pesanan.id);
Ok(())
}
async fn buat_koneksi(uri: &str) -> Result<lapin::Connection, lapin::Error> {
lapin::Connection::connect(
uri,
lapin::ConnectionProperties::default(),
)
.await
}
async fn setup_infrastruktur(_: &lapin::Channel) -> Result<(), lapin::Error> {
Ok(()) // implementasi di atas
}
Consumer — Menerima Pesan #
Consumer dengan Manual Ack #
use futures::StreamExt;
use lapin::{
message::DeliveryResult,
options::*,
Channel,
};
async fn konsumsi_pesanan(channel: Channel) -> Result<(), lapin::Error> {
// QoS: maks 10 pesan dalam proses bersamaan (prefetch)
channel.basic_qos(10, BasicQosOptions::default()).await?;
let mut consumer = channel
.basic_consume(
"pesanan.baru", // nama queue
"consumer-pesanan-1", // consumer tag (unik per consumer)
BasicConsumeOptions {
no_ack: false, // manual ack (lebih aman)
..Default::default()
},
FieldTable::default(),
)
.await?;
println!("Consumer aktif, menunggu pesan...");
while let Some(delivery_result) = consumer.next().await {
match delivery_result {
Ok(delivery) => {
let payload = std::str::from_utf8(&delivery.data)
.unwrap_or("");
println!(
"Pesan diterima [tag: {}]: {}",
delivery.delivery_tag,
&payload[..payload.len().min(100)]
);
match serde_json::from_str::<PesanPesanan>(payload) {
Ok(pesanan) => {
match proses_pesanan(&pesanan).await {
Ok(_) => {
// Acknowledge — pesan dihapus dari queue
delivery
.ack(BasicAckOptions::default())
.await?;
println!("Pesanan #{} selesai diproses", pesanan.id);
}
Err(e) => {
eprintln!("Gagal proses pesanan #{}: {}", pesanan.id, e);
// Nack dengan requeue=false → pesan ke DLX
delivery
.nack(BasicNackOptions {
requeue: false,
..Default::default()
})
.await?;
}
}
}
Err(e) => {
eprintln!("Pesan tidak bisa diparse: {}", e);
// Pesan rusak → buang (jangan requeue)
delivery.reject(BasicRejectOptions { requeue: false }).await?;
}
}
}
Err(e) => {
eprintln!("Error consumer: {}", e);
break;
}
}
}
Ok(())
}
async fn proses_pesanan(pesanan: &PesanPesanan) -> Result<(), String> {
println!("Memproses pesanan #{} total Rp{:.0}", pesanan.id, pesanan.total);
// Logika bisnis...
Ok(())
}
Consumer Paralel dengan Banyak Worker #
use std::sync::Arc;
use tokio::sync::Semaphore;
async fn konsumsi_paralel(
channel: Channel,
maks_paralel: usize,
) -> Result<(), lapin::Error> {
// Semaphore membatasi pemrosesan paralel
let semaphore = Arc::new(Semaphore::new(maks_paralel));
channel.basic_qos(
maks_paralel as u16,
BasicQosOptions::default(),
).await?;
let mut consumer = channel
.basic_consume(
"pesanan.baru",
"consumer-paralel",
BasicConsumeOptions { no_ack: false, ..Default::default() },
FieldTable::default(),
)
.await?;
while let Some(Ok(delivery)) = consumer.next().await {
let permit = Arc::clone(&semaphore).acquire_owned().await.unwrap();
tokio::spawn(async move {
let _permit = permit; // permit dilepas saat task selesai
let payload = std::str::from_utf8(&delivery.data).unwrap_or("");
if let Ok(pesanan) = serde_json::from_str::<PesanPesanan>(payload) {
match proses_pesanan(&pesanan).await {
Ok(_) => { let _ = delivery.ack(BasicAckOptions::default()).await; }
Err(_) => {
let _ = delivery.nack(BasicNackOptions {
requeue: false, ..Default::default()
}).await;
}
}
} else {
let _ = delivery.reject(BasicRejectOptions { requeue: false }).await;
}
});
}
Ok(())
}
use lapin::Channel;
async fn proses_pesanan(_: &PesanPesanan) -> Result<(), String> { Ok(()) }
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
struct PesanPesanan {
id: u64,
pengguna_id: u64,
produk: Vec<String>,
total: f64,
prioritas: u8,
}
Topic Exchange — Routing Berbasis Pola #
Topic exchange menggunakan wildcard untuk routing yang fleksibel:
async fn setup_topic_routing(channel: &Channel) -> Result<(), lapin::Error> {
// Exchange topic sudah dideklarasi sebelumnya
// Queue untuk semua event pesanan
channel.queue_declare("log.pesanan", QueueDeclareOptions {
durable: true, ..Default::default()
}, FieldTable::default()).await?;
// Queue khusus event error saja
channel.queue_declare("alert.error", QueueDeclareOptions {
durable: true, ..Default::default()
}, FieldTable::default()).await?;
// Queue untuk semua event dari layanan pembayaran
channel.queue_declare("log.pembayaran", QueueDeclareOptions {
durable: true, ..Default::default()
}, FieldTable::default()).await?;
// Binding dengan pola topic:
// * = satu kata, # = nol atau lebih kata
// pesanan.* → semua event pesanan (pesanan.baru, pesanan.selesai, dll.)
channel.queue_bind("log.pesanan", "events", "pesanan.*",
QueueBindOptions::default(), FieldTable::default()).await?;
// *.error → semua error dari semua layanan
channel.queue_bind("alert.error", "events", "*.error",
QueueBindOptions::default(), FieldTable::default()).await?;
// pembayaran.# → semua event dari layanan pembayaran
channel.queue_bind("log.pembayaran", "events", "pembayaran.#",
QueueBindOptions::default(), FieldTable::default()).await?;
Ok(())
}
// Mengirim event dengan routing key bertingkat
async fn kirim_event_topic(channel: &Channel) -> Result<(), lapin::Error> {
let events = vec![
("pesanan.baru", r#"{"id": 1, "status": "baru"}"#),
("pesanan.selesai", r#"{"id": 1, "status": "selesai"}"#),
("pembayaran.berhasil", r#"{"id": 1, "jumlah": 150000}"#),
("pembayaran.gagal.timeout", r#"{"id": 2, "alasan": "timeout"}"#),
("inventori.error", r#"{"item": "A", "error": "stok habis"}"#),
];
for (routing_key, payload) in events {
channel.basic_publish(
"events",
routing_key,
BasicPublishOptions::default(),
payload.as_bytes(),
BasicProperties::default().with_delivery_mode(2),
)
.await?
.await?;
println!("Event '{}' dikirim", routing_key);
}
Ok(())
}
RPC Pattern — Request-Reply via RabbitMQ #
RabbitMQ bisa digunakan untuk RPC asinkron — producer mengirim request dan menunggu reply di queue sementara:
use lapin::{options::*, BasicProperties, Channel};
use futures::StreamExt;
use tokio::sync::oneshot;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
// Client RPC
async fn rpc_call(
channel: &Channel,
payload: &str,
) -> Result<String, Box<dyn std::error::Error>> {
// Buat queue reply sementara yang akan dihapus otomatis
let reply_queue = channel.queue_declare(
"", // nama kosong = nama unik dihasilkan broker
QueueDeclareOptions {
exclusive: true, // hanya bisa diakses oleh koneksi ini
auto_delete: true, // dihapus saat consumer terakhir pergi
..Default::default()
},
FieldTable::default(),
)
.await?;
let reply_queue_name = reply_queue.name().as_str().to_string();
let correlation_id = uuid::Uuid::new_v4().to_string();
// Kirim request dengan correlation_id dan reply_to
channel.basic_publish(
"", // default exchange
"rpc.hitung", // routing ke queue server RPC
BasicPublishOptions::default(),
payload.as_bytes(),
BasicProperties::default()
.with_reply_to(reply_queue_name.clone().into())
.with_correlation_id(correlation_id.clone().into()),
)
.await?
.await?;
// Tunggu reply di queue sementara
let mut consumer = channel.basic_consume(
&reply_queue_name,
"",
BasicConsumeOptions {
no_ack: true,
..Default::default()
},
FieldTable::default(),
)
.await?;
// Timeout 5 detik
let reply = tokio::time::timeout(
std::time::Duration::from_secs(5),
async {
while let Some(Ok(delivery)) = consumer.next().await {
let corr = delivery.properties.correlation_id()
.as_ref()
.map(|s| s.as_str().to_string())
.unwrap_or_default();
if corr == correlation_id {
return Ok(String::from_utf8_lossy(&delivery.data).to_string());
}
}
Err("Tidak ada reply")
},
)
.await
.map_err(|_| "Timeout menunggu reply")??;
Ok(reply)
}
Perbandingan RabbitMQ vs Kafka #
Aspek RabbitMQ Kafka
Model penyimpanan
Hapus setelah ack Ya (default) Tidak — simpan di log
Replay event Tidak (kecuali konfigurasi) Ya — konsumen baca ulang offset mana saja
Routing
Kompleksitas Sangat kaya (4 jenis exch.) Sederhana (topic + partisi)
Wildcard Ya (Topic exchange) Tidak
Throughput
Per node ~50k-100k msg/detik ~1juta+ msg/detik
Cocok untuk Task queue, RPC Event streaming, log
Ordering
Per queue Ya (1 consumer) Per partisi
Acknowledgment
Model Ack/Nack/Reject per pesan Commit offset (batch)
Consumer
Konkurensi Banyak consumer per queue 1 consumer per partisi per group
Setup
Kompleksitas Sederhana (single broker) Lebih kompleks (ZK/KRaft + broker)
Kapan memilih RabbitMQ:
✓ Task queue — proses job satu per satu dengan ack
✓ RPC asinkron — request-reply pattern
✓ Routing kompleks berdasarkan pola atau header
✓ Dead Letter Queue yang mudah dikonfigurasi
✓ Tim sudah familiar dengan AMQP
Kapan memilih Kafka:
✓ Event streaming volume sangat tinggi (jutaan/detik)
✓ Perlu replay event historis
✓ Event log sebagai source of truth
✓ Banyak consumer group independen membaca event yang sama
Ringkasan #
- Channel bukan koneksi — satu koneksi AMQP bisa punya banyak channel. Buat satu channel per thread/task, bukan satu koneksi per thread.
- Deklarasi idempoten —
queue_declaredanexchange_declareaman dipanggil berkali-kali; broker tidak error jika sudah ada dengan parameter sama.delivery_mode: 2untuk persistent message — pesan di-flush ke disk sebelum di-ack. Tanpanya, pesan hilang jika broker restart.- Manual ack selalu lebih aman dari auto-ack — dengan
no_ack: false, pesan baru dianggap selesai setelahdelivery.ack()dipanggil. Jika consumer crash sebelum ack, pesan akan di-requeue.- Nack dengan
requeue: false→ Dead Letter Exchange — jika queue dikonfigurasi dengan DLX, pesan yang di-nack tanpa requeue akan diteruskan ke DLX untuk investigasi.- Prefetch (
basic_qos) untuk rate limiting — batasi berapa pesan yang dikirim broker ke consumer sebelum ada ack. Mencegah consumer kewalahan dengan beban yang tak terbatas.- Topic exchange untuk routing fleksibel —
pesanan.*cocokkan satu level,pembayaran.#cocokkan nol atau lebih level. Lebih powerful dari Direct tapi tetap efisien.- Publisher confirm untuk jaminan pengiriman — aktifkan
confirm_selectdan await hasilbasic_publishdua kali untuk memastikan broker sudah menerima pesan.- RabbitMQ lebih mudah, Kafka lebih scalable — pilih RabbitMQ untuk task queue dan RPC, Kafka untuk event streaming volume tinggi yang perlu replay.