Kafka #
Apache Kafka adalah distributed event streaming platform yang dirancang untuk menangani jutaan event per detik dengan latensi rendah dan jaminan durabilitas tinggi. Berbeda dari message queue tradisional seperti RabbitMQ yang menghapus pesan setelah dikonsumsi, Kafka menyimpan event dalam log yang bisa dibaca ulang — ini memungkinkan replay event, audit trail, dan multiple consumer yang masing-masing memproses event secara independen. Di Rust, crate rdkafka adalah binding terhadap librdkafka — library C yang battle-tested dan digunakan di lingkungan produksi terbesar di dunia. Artikel ini membahas producer, consumer, serialisasi, transaksi, dan pola arsitektur yang umum digunakan dengan Kafka.
Konsep Dasar Kafka #
flowchart LR
subgraph Producer
P["Producer\n(Rust App)"]
end
subgraph Kafka Cluster
T["Topic: pesanan\nPartisi 0 | Partisi 1 | Partisi 2"]
T --> P0["Partition 0\n[offset 0][offset 1][offset 2]"]
T --> P1["Partition 1\n[offset 0][offset 1]"]
end
subgraph Consumer Group A
C1["Consumer 1\n(proses Partisi 0)"]
C2["Consumer 2\n(proses Partisi 1)"]
end
subgraph Consumer Group B
C3["Consumer 3\n(proses semua partisi)"]
end
P --> T
P0 --> C1
P1 --> C2
T --> C3| Konsep | Penjelasan |
|---|---|
| Topic | Kategori event — seperti nama antrian |
| Partition | Pembagian paralel dalam satu topic |
| Offset | Posisi unik setiap event dalam partisi |
| Producer | Mengirim event ke topic |
| Consumer | Membaca event dari topic |
| Consumer Group | Beberapa consumer yang berbagi beban partisi |
| Broker | Server Kafka — biasanya 3+ untuk produksi |
Instalasi #
rdkafka membutuhkan librdkafka sebagai dependensi sistem. Opsi termudah adalah menggunakan CMake bundled:
[dependencies]
rdkafka = { version = "0.36", features = ["cmake-build"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
chrono = "0.4"
Fitur cmake-build mengkompilasi librdkafka dari source saat cargo build — tidak perlu instalasi sistem tambahan tapi waktu build lebih lama.
Producer — Mengirim Event #
Producer Dasar #
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;
fn buat_producer(brokers: &str) -> FutureProducer {
ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
// Jaminan pengiriman: "all" = tunggu semua replica acknowledge
.set("acks", "all")
// Retry otomatis jika gagal
.set("retries", "3")
.set("retry.backoff.ms", "100")
// Batch untuk throughput tinggi
.set("linger.ms", "5") // tunggu 5ms untuk batch lebih besar
.set("batch.size", "65536") // batch maks 64KB
// Kompresi — hemat bandwidth
.set("compression.type", "snappy")
.create()
.expect("Gagal membuat producer")
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let producer = buat_producer("localhost:9092");
// Kirim event dengan key dan value
let payload = serde_json::to_string(&serde_json::json!({
"id": 1001,
"aksi": "buat_pesanan",
"total": 150_000
}))?;
let record = FutureRecord::to("pesanan")
.key("user-42") // key menentukan partisi (konsisten per user)
.payload(&payload);
match producer.send(record, Duration::from_secs(5)).await {
Ok((partisi, offset)) => {
println!("Event terkirim ke partisi {} offset {}", partisi, offset);
}
Err((e, _)) => {
eprintln!("Gagal kirim: {}", e);
}
}
Ok(())
}
Producer Terstruktur dengan Serialisasi JSON #
use rdkafka::producer::{FutureProducer, FutureRecord};
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Serialize, Deserialize, Clone)]
struct EventPesanan {
pub id: u64,
pub pengguna_id: u64,
pub produk: Vec<ItemPesanan>,
pub total: f64,
pub status: String,
pub dibuat_pada: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct ItemPesanan {
pub produk_id: u64,
pub nama: String,
pub jumlah: u32,
pub harga: f64,
}
struct KafkaProducer {
producer: FutureProducer,
topic: String,
}
impl KafkaProducer {
fn baru(brokers: &str, topic: &str) -> Self {
KafkaProducer {
producer: buat_producer(brokers),
topic: topic.to_string(),
}
}
async fn kirim<T: Serialize>(
&self,
key: &str,
event: &T,
) -> Result<(i32, i64), rdkafka::error::KafkaError> {
let payload = serde_json::to_string(event)
.map_err(|e| rdkafka::error::KafkaError::MessageProduction(
rdkafka::types::RDKafkaErrorCode::MessageSizeTooLarge
))?;
let record = FutureRecord::to(&self.topic)
.key(key)
.payload(&payload);
self.producer
.send(record, Duration::from_secs(5))
.await
.map_err(|(e, _)| e)
}
async fn kirim_batch<T: Serialize>(
&self,
events: &[(String, T)],
) -> Vec<Result<(i32, i64), rdkafka::error::KafkaError>> {
let mut hasil = Vec::new();
for (key, event) in events {
hasil.push(self.kirim(key, event).await);
}
hasil
}
}
fn buat_producer(brokers: &str) -> FutureProducer {
ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("acks", "all")
.set("retries", "3")
.create()
.expect("Gagal membuat producer")
}
Consumer — Membaca Event #
Consumer Dasar dengan Consumer Group #
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::message::Message;
use futures::StreamExt;
fn buat_consumer(brokers: &str, group_id: &str) -> StreamConsumer {
ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("group.id", group_id)
// Mulai dari awal jika belum ada offset tersimpan
.set("auto.offset.reset", "earliest")
// Commit offset secara manual (lebih aman)
.set("enable.auto.commit", "false")
// Interval heartbeat ke broker
.set("heartbeat.interval.ms", "3000")
// Timeout jika consumer diam terlalu lama
.set("session.timeout.ms", "30000")
.set("max.poll.interval.ms", "300000")
.create()
.expect("Gagal membuat consumer")
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let consumer: StreamConsumer = buat_consumer("localhost:9092", "grup-pemroses-pesanan");
// Subscribe ke satu atau banyak topic
consumer.subscribe(&["pesanan", "pembayaran"])?;
println!("Consumer aktif, menunggu event...");
// Stream event — async iterator
let mut stream = consumer.stream();
while let Some(result) = stream.next().await {
match result {
Ok(message) => {
let payload = message
.payload_view::<str>()
.unwrap_or(Ok(""))
.unwrap_or("");
let key = message
.key_view::<str>()
.unwrap_or(Ok(""))
.unwrap_or("");
println!(
"Topic: {}, Partisi: {}, Offset: {}, Key: {}, Payload: {}",
message.topic(),
message.partition(),
message.offset(),
key,
&payload[..payload.len().min(100)]
);
// Proses event
if let Ok(event) = serde_json::from_str::<serde_json::Value>(payload) {
proses_event(&event).await;
}
// Commit offset setelah berhasil diproses
consumer.commit_message(&message, rdkafka::consumer::CommitMode::Async)?;
}
Err(e) => {
eprintln!("Error Kafka: {}", e);
}
}
}
Ok(())
}
async fn proses_event(event: &serde_json::Value) {
println!("Memproses: {:?}", event["aksi"]);
// ... logika bisnis
}
Consumer dengan Typed Deserialization #
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::message::Message;
use futures::StreamExt;
struct EventConsumer {
consumer: StreamConsumer,
}
impl EventConsumer {
fn baru(brokers: &str, group_id: &str, topics: &[&str]) -> Self {
let consumer: StreamConsumer = buat_consumer(brokers, group_id);
consumer.subscribe(topics).expect("Gagal subscribe");
EventConsumer { consumer }
}
async fn proses_loop<T, F, Fut>(&self, handler: F)
where
T: for<'de> serde::Deserialize<'de>,
F: Fn(T, i32, i64) -> Fut,
Fut: std::future::Future<Output = Result<(), String>>,
{
let mut stream = self.consumer.stream();
while let Some(result) = stream.next().await {
match result {
Ok(message) => {
let partisi = message.partition();
let offset = message.offset();
let payload = match message.payload_view::<str>() {
Some(Ok(s)) => s,
_ => {
eprintln!("Payload bukan UTF-8 di offset {}", offset);
continue;
}
};
match serde_json::from_str::<T>(payload) {
Ok(event) => {
match handler(event, partisi, offset).await {
Ok(_) => {
// Commit setelah berhasil
let _ = self.consumer.commit_message(
&message,
rdkafka::consumer::CommitMode::Async,
);
}
Err(e) => {
eprintln!("Error handler di offset {}: {}", offset, e);
// Bisa implementasi dead letter queue di sini
}
}
}
Err(e) => {
eprintln!("Gagal deserialize di offset {}: {}", offset, e);
}
}
}
Err(e) => eprintln!("Kafka error: {}", e),
}
}
}
}
fn buat_consumer(brokers: &str, group_id: &str) -> StreamConsumer {
ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("group.id", group_id)
.set("auto.offset.reset", "earliest")
.set("enable.auto.commit", "false")
.create()
.expect("Gagal membuat consumer")
}
Topic Management dengan Admin API #
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::client::DefaultClientContext;
use rdkafka::config::ClientConfig;
fn buat_admin(brokers: &str) -> AdminClient<DefaultClientContext> {
ClientConfig::new()
.set("bootstrap.servers", brokers)
.create()
.expect("Gagal membuat admin client")
}
async fn buat_topic(
admin: &AdminClient<DefaultClientContext>,
nama: &str,
partisi: i32,
replikasi: i32,
) -> Result<(), Box<dyn std::error::Error>> {
let topic = NewTopic::new(
nama,
partisi,
TopicReplication::Fixed(replikasi),
)
// Retensi 7 hari
.set("retention.ms", "604800000")
// Kompresi di sisi broker
.set("compression.type", "snappy");
let opsi = AdminOptions::new()
.operation_timeout(Some(std::time::Duration::from_secs(30)));
let hasil = admin.create_topics(&[topic], &opsi).await?;
for r in hasil {
match r {
Ok(nama) => println!("Topic '{}' berhasil dibuat", nama),
Err((nama, e)) => {
// Topic sudah ada bukan error di produksi
if e == rdkafka::types::RDKafkaErrorCode::TopicAlreadyExists {
println!("Topic '{}' sudah ada", nama);
} else {
eprintln!("Gagal buat topic '{}': {:?}", nama, e);
}
}
}
}
Ok(())
}
async fn hapus_topic(
admin: &AdminClient<DefaultClientContext>,
nama: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let opsi = AdminOptions::new();
let hasil = admin.delete_topics(&[nama], &opsi).await?;
for r in hasil {
match r {
Ok(nama) => println!("Topic '{}' dihapus", nama),
Err((nama, e)) => eprintln!("Gagal hapus '{}': {:?}", nama, e),
}
}
Ok(())
}
Transaksi Kafka #
Transaksi Kafka memastikan sekelompok event dikirim secara atomik — semua berhasil atau semua dibatalkan. Penting untuk pola “exactly-once semantics”:
use rdkafka::producer::{BaseRecord, ThreadedProducer};
fn buat_producer_transaksional(brokers: &str, transactional_id: &str) -> ThreadedProducer<rdkafka::producer::DefaultProducerContext> {
ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("transactional.id", transactional_id)
.set("acks", "all")
.set("enable.idempotence", "true")
.create()
.expect("Gagal membuat producer transaksional")
}
fn kirim_dengan_transaksi(
producer: &ThreadedProducer<rdkafka::producer::DefaultProducerContext>,
events: &[(&str, &str, &str)], // (topic, key, payload)
) -> Result<(), rdkafka::error::KafkaError> {
// Inisialisasi transaksi (sekali saja saat startup)
producer.init_transactions(std::time::Duration::from_secs(10))?;
// Mulai transaksi
producer.begin_transaction()?;
for (topic, key, payload) in events {
producer.send(
BaseRecord::to(topic)
.key(*key)
.payload(*payload),
).map_err(|(e, _)| e)?;
}
// Commit semua event sekaligus
producer.commit_transaction(std::time::Duration::from_secs(10))?;
println!("Transaksi berhasil: {} event dikirim", events.len());
Ok(())
}
Pola Event-Driven Architecture #
Pattern: Outbox untuk Konsistensi #
Pola yang memastikan event terkirim ke Kafka hanya jika perubahan database berhasil disimpan:
sequenceDiagram
participant API
participant DB as Database
participant Outbox as Outbox Table
participant Relay as Outbox Relay
participant Kafka
API->>DB: BEGIN TRANSACTION
API->>DB: INSERT pesanan
API->>Outbox: INSERT event (dalam transaksi sama)
DB->>API: COMMIT
Relay->>Outbox: Polling event belum terkirim
Relay->>Kafka: Produce event
Kafka->>Relay: Acknowledged
Relay->>Outbox: Mark as sentuse serde::{Deserialize, Serialize};
// Tabel outbox di database
#[derive(Debug, Serialize, Deserialize)]
struct OutboxEvent {
pub id: i64,
pub topic: String,
pub key: String,
pub payload: String,
pub terkirim: bool,
pub dibuat_pada: chrono::DateTime<chrono::Utc>,
}
// Relay: baca dari outbox, kirim ke Kafka, tandai terkirim
async fn outbox_relay(
pool: sqlx::PgPool,
producer: FutureProducer,
) {
loop {
// Ambil event yang belum terkirim
let events = sqlx::query_as!(
OutboxEvent,
"SELECT id, topic, key, payload, terkirim, dibuat_pada
FROM outbox
WHERE terkirim = FALSE
ORDER BY id ASC
LIMIT 100"
)
.fetch_all(&pool)
.await
.unwrap_or_default();
for event in &events {
let record = FutureRecord::to(&event.topic)
.key(&event.key)
.payload(&event.payload);
match producer.send(record, std::time::Duration::from_secs(5)).await {
Ok(_) => {
// Tandai sebagai terkirim
let _ = sqlx::query!(
"UPDATE outbox SET terkirim = TRUE WHERE id = $1",
event.id
)
.execute(&pool)
.await;
}
Err((e, _)) => {
eprintln!("Gagal kirim event {}: {}", event.id, e);
}
}
}
// Tunggu sebelum polling berikutnya
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
use rdkafka::producer::FutureProducer;
Consumer dengan Dead Letter Queue #
Jika pemrosesan event gagal berulang kali, kirim ke DLQ (Dead Letter Queue) untuk investigasi:
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::message::Message;
use rdkafka::producer::{FutureProducer, FutureRecord};
use futures::StreamExt;
struct ConsumerDenganDLQ {
consumer: StreamConsumer,
producer: FutureProducer,
dlq_topic: String,
maks_retry: u32,
}
impl ConsumerDenganDLQ {
async fn jalankan<F, Fut>(&self, handler: F)
where
F: Fn(String) -> Fut,
Fut: std::future::Future<Output = Result<(), String>>,
{
let mut stream = self.consumer.stream();
while let Some(Ok(message)) = stream.next().await {
let payload = message
.payload_view::<str>()
.unwrap_or(Ok(""))
.unwrap_or("")
.to_string();
let mut berhasil = false;
let mut percobaan = 0;
// Retry sampai maks_retry
while percobaan < self.maks_retry {
match handler(payload.clone()).await {
Ok(_) => {
berhasil = true;
break;
}
Err(e) => {
percobaan += 1;
eprintln!("Percobaan {}/{}: {}", percobaan, self.maks_retry, e);
tokio::time::sleep(
std::time::Duration::from_millis(100 * percobaan as u64)
).await;
}
}
}
if !berhasil {
// Kirim ke DLQ
let dlq_payload = serde_json::json!({
"payload_asli": payload,
"error": "Melebihi maks retry",
"topic_asal": message.topic(),
"partisi": message.partition(),
"offset": message.offset(),
"gagal_pada": chrono::Utc::now().to_rfc3339()
}).to_string();
let _ = self.producer.send(
FutureRecord::to(&self.dlq_topic)
.payload(&dlq_payload),
std::time::Duration::from_secs(5),
).await;
eprintln!("Event dikirim ke DLQ: {}", &self.dlq_topic);
}
// Selalu commit — bahkan yang masuk DLQ
let _ = self.consumer.commit_message(
&message,
rdkafka::consumer::CommitMode::Async,
);
}
}
}
Ringkasan #
acks = "all"untuk jaminan pengiriman — producer menunggu semua replica broker mengakui sebelum dianggap berhasil. Lebih lambat tapi tidak ada kehilangan data.- Key menentukan partisi — event dengan key yang sama selalu masuk ke partisi yang sama, menjamin urutan per key. Gunakan ID pengguna/entitas sebagai key.
enable.auto.commit = false— selalu commit manual setelah event berhasil diproses. Auto-commit berisiko: event bisa di-commit sebelum diproses jika aplikasi crash.- Consumer group untuk horizontal scaling — tambah instance consumer baru dalam group yang sama untuk berbagi beban. Jumlah consumer aktif maksimal sama dengan jumlah partisi.
auto.offset.reset = "earliest"— mulai dari event paling lama jika belum ada offset tersimpan. Gunakan"latest"jika hanya butuh event baru.- Outbox pattern untuk konsistensi — simpan event ke tabel outbox dalam transaksi database yang sama dengan perubahan data, lalu relay ke Kafka. Menjamin tidak ada event yang hilang saat aplikasi crash.
- Dead Letter Queue untuk event gagal — daripada memblokir consumer saat pemrosesan gagal, kirim ke DLQ untuk investigasi dan retry manual.
- Transaksi Kafka untuk exactly-once — gunakan
transactional.iddanenable.idempotenceuntuk menjamin event tidak terduplikasi meski ada retry jaringan.- Topik dengan partisi cukup — lebih banyak partisi = lebih banyak parallelism, tapi juga lebih banyak overhead. Mulai dengan 3–6 partisi per topic untuk kebanyakan kasus.