Kafka

Kafka #

Apache Kafka adalah distributed event streaming platform yang dirancang untuk menangani jutaan event per detik dengan latensi rendah dan jaminan durabilitas tinggi. Berbeda dari message queue tradisional seperti RabbitMQ yang menghapus pesan setelah dikonsumsi, Kafka menyimpan event dalam log yang bisa dibaca ulang — ini memungkinkan replay event, audit trail, dan multiple consumer yang masing-masing memproses event secara independen. Di Rust, crate rdkafka adalah binding terhadap librdkafka — library C yang battle-tested dan digunakan di lingkungan produksi terbesar di dunia. Artikel ini membahas producer, consumer, serialisasi, transaksi, dan pola arsitektur yang umum digunakan dengan Kafka.

Konsep Dasar Kafka #

flowchart LR
    subgraph Producer
        P["Producer\n(Rust App)"]
    end

    subgraph Kafka Cluster
        T["Topic: pesanan\nPartisi 0 | Partisi 1 | Partisi 2"]
        T --> P0["Partition 0\n[offset 0][offset 1][offset 2]"]
        T --> P1["Partition 1\n[offset 0][offset 1]"]
    end

    subgraph Consumer Group A
        C1["Consumer 1\n(proses Partisi 0)"]
        C2["Consumer 2\n(proses Partisi 1)"]
    end

    subgraph Consumer Group B
        C3["Consumer 3\n(proses semua partisi)"]
    end

    P --> T
    P0 --> C1
    P1 --> C2
    T --> C3
KonsepPenjelasan
TopicKategori event — seperti nama antrian
PartitionPembagian paralel dalam satu topic
OffsetPosisi unik setiap event dalam partisi
ProducerMengirim event ke topic
ConsumerMembaca event dari topic
Consumer GroupBeberapa consumer yang berbagi beban partisi
BrokerServer Kafka — biasanya 3+ untuk produksi

Instalasi #

rdkafka membutuhkan librdkafka sebagai dependensi sistem. Opsi termudah adalah menggunakan CMake bundled:

[dependencies]
rdkafka = { version = "0.36", features = ["cmake-build"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
chrono = "0.4"

Fitur cmake-build mengkompilasi librdkafka dari source saat cargo build — tidak perlu instalasi sistem tambahan tapi waktu build lebih lama.


Producer — Mengirim Event #

Producer Dasar #

use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;

fn buat_producer(brokers: &str) -> FutureProducer {
    ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("message.timeout.ms", "5000")
        // Jaminan pengiriman: "all" = tunggu semua replica acknowledge
        .set("acks", "all")
        // Retry otomatis jika gagal
        .set("retries", "3")
        .set("retry.backoff.ms", "100")
        // Batch untuk throughput tinggi
        .set("linger.ms", "5")         // tunggu 5ms untuk batch lebih besar
        .set("batch.size", "65536")    // batch maks 64KB
        // Kompresi — hemat bandwidth
        .set("compression.type", "snappy")
        .create()
        .expect("Gagal membuat producer")
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let producer = buat_producer("localhost:9092");

    // Kirim event dengan key dan value
    let payload = serde_json::to_string(&serde_json::json!({
        "id": 1001,
        "aksi": "buat_pesanan",
        "total": 150_000
    }))?;

    let record = FutureRecord::to("pesanan")
        .key("user-42")          // key menentukan partisi (konsisten per user)
        .payload(&payload);

    match producer.send(record, Duration::from_secs(5)).await {
        Ok((partisi, offset)) => {
            println!("Event terkirim ke partisi {} offset {}", partisi, offset);
        }
        Err((e, _)) => {
            eprintln!("Gagal kirim: {}", e);
        }
    }

    Ok(())
}

Producer Terstruktur dengan Serialisasi JSON #

use rdkafka::producer::{FutureProducer, FutureRecord};
use serde::{Deserialize, Serialize};
use std::time::Duration;

#[derive(Debug, Serialize, Deserialize, Clone)]
struct EventPesanan {
    pub id: u64,
    pub pengguna_id: u64,
    pub produk: Vec<ItemPesanan>,
    pub total: f64,
    pub status: String,
    pub dibuat_pada: String,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
struct ItemPesanan {
    pub produk_id: u64,
    pub nama: String,
    pub jumlah: u32,
    pub harga: f64,
}

struct KafkaProducer {
    producer: FutureProducer,
    topic: String,
}

impl KafkaProducer {
    fn baru(brokers: &str, topic: &str) -> Self {
        KafkaProducer {
            producer: buat_producer(brokers),
            topic: topic.to_string(),
        }
    }

    async fn kirim<T: Serialize>(
        &self,
        key: &str,
        event: &T,
    ) -> Result<(i32, i64), rdkafka::error::KafkaError> {
        let payload = serde_json::to_string(event)
            .map_err(|e| rdkafka::error::KafkaError::MessageProduction(
                rdkafka::types::RDKafkaErrorCode::MessageSizeTooLarge
            ))?;

        let record = FutureRecord::to(&self.topic)
            .key(key)
            .payload(&payload);

        self.producer
            .send(record, Duration::from_secs(5))
            .await
            .map_err(|(e, _)| e)
    }

    async fn kirim_batch<T: Serialize>(
        &self,
        events: &[(String, T)],
    ) -> Vec<Result<(i32, i64), rdkafka::error::KafkaError>> {
        let mut hasil = Vec::new();
        for (key, event) in events {
            hasil.push(self.kirim(key, event).await);
        }
        hasil
    }
}

fn buat_producer(brokers: &str) -> FutureProducer {
    ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("acks", "all")
        .set("retries", "3")
        .create()
        .expect("Gagal membuat producer")
}

Consumer — Membaca Event #

Consumer Dasar dengan Consumer Group #

use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::message::Message;
use futures::StreamExt;

fn buat_consumer(brokers: &str, group_id: &str) -> StreamConsumer {
    ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("group.id", group_id)
        // Mulai dari awal jika belum ada offset tersimpan
        .set("auto.offset.reset", "earliest")
        // Commit offset secara manual (lebih aman)
        .set("enable.auto.commit", "false")
        // Interval heartbeat ke broker
        .set("heartbeat.interval.ms", "3000")
        // Timeout jika consumer diam terlalu lama
        .set("session.timeout.ms", "30000")
        .set("max.poll.interval.ms", "300000")
        .create()
        .expect("Gagal membuat consumer")
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let consumer: StreamConsumer = buat_consumer("localhost:9092", "grup-pemroses-pesanan");

    // Subscribe ke satu atau banyak topic
    consumer.subscribe(&["pesanan", "pembayaran"])?;
    println!("Consumer aktif, menunggu event...");

    // Stream event — async iterator
    let mut stream = consumer.stream();

    while let Some(result) = stream.next().await {
        match result {
            Ok(message) => {
                let payload = message
                    .payload_view::<str>()
                    .unwrap_or(Ok(""))
                    .unwrap_or("");

                let key = message
                    .key_view::<str>()
                    .unwrap_or(Ok(""))
                    .unwrap_or("");

                println!(
                    "Topic: {}, Partisi: {}, Offset: {}, Key: {}, Payload: {}",
                    message.topic(),
                    message.partition(),
                    message.offset(),
                    key,
                    &payload[..payload.len().min(100)]
                );

                // Proses event
                if let Ok(event) = serde_json::from_str::<serde_json::Value>(payload) {
                    proses_event(&event).await;
                }

                // Commit offset setelah berhasil diproses
                consumer.commit_message(&message, rdkafka::consumer::CommitMode::Async)?;
            }
            Err(e) => {
                eprintln!("Error Kafka: {}", e);
            }
        }
    }

    Ok(())
}

async fn proses_event(event: &serde_json::Value) {
    println!("Memproses: {:?}", event["aksi"]);
    // ... logika bisnis
}

Consumer dengan Typed Deserialization #

use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::message::Message;
use futures::StreamExt;

struct EventConsumer {
    consumer: StreamConsumer,
}

impl EventConsumer {
    fn baru(brokers: &str, group_id: &str, topics: &[&str]) -> Self {
        let consumer: StreamConsumer = buat_consumer(brokers, group_id);
        consumer.subscribe(topics).expect("Gagal subscribe");
        EventConsumer { consumer }
    }

    async fn proses_loop<T, F, Fut>(&self, handler: F)
    where
        T: for<'de> serde::Deserialize<'de>,
        F: Fn(T, i32, i64) -> Fut,
        Fut: std::future::Future<Output = Result<(), String>>,
    {
        let mut stream = self.consumer.stream();

        while let Some(result) = stream.next().await {
            match result {
                Ok(message) => {
                    let partisi = message.partition();
                    let offset = message.offset();

                    let payload = match message.payload_view::<str>() {
                        Some(Ok(s)) => s,
                        _ => {
                            eprintln!("Payload bukan UTF-8 di offset {}", offset);
                            continue;
                        }
                    };

                    match serde_json::from_str::<T>(payload) {
                        Ok(event) => {
                            match handler(event, partisi, offset).await {
                                Ok(_) => {
                                    // Commit setelah berhasil
                                    let _ = self.consumer.commit_message(
                                        &message,
                                        rdkafka::consumer::CommitMode::Async,
                                    );
                                }
                                Err(e) => {
                                    eprintln!("Error handler di offset {}: {}", offset, e);
                                    // Bisa implementasi dead letter queue di sini
                                }
                            }
                        }
                        Err(e) => {
                            eprintln!("Gagal deserialize di offset {}: {}", offset, e);
                        }
                    }
                }
                Err(e) => eprintln!("Kafka error: {}", e),
            }
        }
    }
}

fn buat_consumer(brokers: &str, group_id: &str) -> StreamConsumer {
    ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("group.id", group_id)
        .set("auto.offset.reset", "earliest")
        .set("enable.auto.commit", "false")
        .create()
        .expect("Gagal membuat consumer")
}

Topic Management dengan Admin API #

use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::client::DefaultClientContext;
use rdkafka::config::ClientConfig;

fn buat_admin(brokers: &str) -> AdminClient<DefaultClientContext> {
    ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .create()
        .expect("Gagal membuat admin client")
}

async fn buat_topic(
    admin: &AdminClient<DefaultClientContext>,
    nama: &str,
    partisi: i32,
    replikasi: i32,
) -> Result<(), Box<dyn std::error::Error>> {
    let topic = NewTopic::new(
        nama,
        partisi,
        TopicReplication::Fixed(replikasi),
    )
    // Retensi 7 hari
    .set("retention.ms", "604800000")
    // Kompresi di sisi broker
    .set("compression.type", "snappy");

    let opsi = AdminOptions::new()
        .operation_timeout(Some(std::time::Duration::from_secs(30)));

    let hasil = admin.create_topics(&[topic], &opsi).await?;

    for r in hasil {
        match r {
            Ok(nama) => println!("Topic '{}' berhasil dibuat", nama),
            Err((nama, e)) => {
                // Topic sudah ada bukan error di produksi
                if e == rdkafka::types::RDKafkaErrorCode::TopicAlreadyExists {
                    println!("Topic '{}' sudah ada", nama);
                } else {
                    eprintln!("Gagal buat topic '{}': {:?}", nama, e);
                }
            }
        }
    }

    Ok(())
}

async fn hapus_topic(
    admin: &AdminClient<DefaultClientContext>,
    nama: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    let opsi = AdminOptions::new();
    let hasil = admin.delete_topics(&[nama], &opsi).await?;
    for r in hasil {
        match r {
            Ok(nama) => println!("Topic '{}' dihapus", nama),
            Err((nama, e)) => eprintln!("Gagal hapus '{}': {:?}", nama, e),
        }
    }
    Ok(())
}

Transaksi Kafka #

Transaksi Kafka memastikan sekelompok event dikirim secara atomik — semua berhasil atau semua dibatalkan. Penting untuk pola “exactly-once semantics”:

use rdkafka::producer::{BaseRecord, ThreadedProducer};

fn buat_producer_transaksional(brokers: &str, transactional_id: &str) -> ThreadedProducer<rdkafka::producer::DefaultProducerContext> {
    ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("transactional.id", transactional_id)
        .set("acks", "all")
        .set("enable.idempotence", "true")
        .create()
        .expect("Gagal membuat producer transaksional")
}

fn kirim_dengan_transaksi(
    producer: &ThreadedProducer<rdkafka::producer::DefaultProducerContext>,
    events: &[(&str, &str, &str)],  // (topic, key, payload)
) -> Result<(), rdkafka::error::KafkaError> {
    // Inisialisasi transaksi (sekali saja saat startup)
    producer.init_transactions(std::time::Duration::from_secs(10))?;

    // Mulai transaksi
    producer.begin_transaction()?;

    for (topic, key, payload) in events {
        producer.send(
            BaseRecord::to(topic)
                .key(*key)
                .payload(*payload),
        ).map_err(|(e, _)| e)?;
    }

    // Commit semua event sekaligus
    producer.commit_transaction(std::time::Duration::from_secs(10))?;
    println!("Transaksi berhasil: {} event dikirim", events.len());
    Ok(())
}

Pola Event-Driven Architecture #

Pattern: Outbox untuk Konsistensi #

Pola yang memastikan event terkirim ke Kafka hanya jika perubahan database berhasil disimpan:

sequenceDiagram
    participant API
    participant DB as Database
    participant Outbox as Outbox Table
    participant Relay as Outbox Relay
    participant Kafka

    API->>DB: BEGIN TRANSACTION
    API->>DB: INSERT pesanan
    API->>Outbox: INSERT event (dalam transaksi sama)
    DB->>API: COMMIT

    Relay->>Outbox: Polling event belum terkirim
    Relay->>Kafka: Produce event
    Kafka->>Relay: Acknowledged
    Relay->>Outbox: Mark as sent
use serde::{Deserialize, Serialize};

// Tabel outbox di database
#[derive(Debug, Serialize, Deserialize)]
struct OutboxEvent {
    pub id: i64,
    pub topic: String,
    pub key: String,
    pub payload: String,
    pub terkirim: bool,
    pub dibuat_pada: chrono::DateTime<chrono::Utc>,
}

// Relay: baca dari outbox, kirim ke Kafka, tandai terkirim
async fn outbox_relay(
    pool: sqlx::PgPool,
    producer: FutureProducer,
) {
    loop {
        // Ambil event yang belum terkirim
        let events = sqlx::query_as!(
            OutboxEvent,
            "SELECT id, topic, key, payload, terkirim, dibuat_pada
             FROM outbox
             WHERE terkirim = FALSE
             ORDER BY id ASC
             LIMIT 100"
        )
        .fetch_all(&pool)
        .await
        .unwrap_or_default();

        for event in &events {
            let record = FutureRecord::to(&event.topic)
                .key(&event.key)
                .payload(&event.payload);

            match producer.send(record, std::time::Duration::from_secs(5)).await {
                Ok(_) => {
                    // Tandai sebagai terkirim
                    let _ = sqlx::query!(
                        "UPDATE outbox SET terkirim = TRUE WHERE id = $1",
                        event.id
                    )
                    .execute(&pool)
                    .await;
                }
                Err((e, _)) => {
                    eprintln!("Gagal kirim event {}: {}", event.id, e);
                }
            }
        }

        // Tunggu sebelum polling berikutnya
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
}

use rdkafka::producer::FutureProducer;


Consumer dengan Dead Letter Queue #

Jika pemrosesan event gagal berulang kali, kirim ke DLQ (Dead Letter Queue) untuk investigasi:

use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::message::Message;
use rdkafka::producer::{FutureProducer, FutureRecord};
use futures::StreamExt;

struct ConsumerDenganDLQ {
    consumer: StreamConsumer,
    producer: FutureProducer,
    dlq_topic: String,
    maks_retry: u32,
}

impl ConsumerDenganDLQ {
    async fn jalankan<F, Fut>(&self, handler: F)
    where
        F: Fn(String) -> Fut,
        Fut: std::future::Future<Output = Result<(), String>>,
    {
        let mut stream = self.consumer.stream();

        while let Some(Ok(message)) = stream.next().await {
            let payload = message
                .payload_view::<str>()
                .unwrap_or(Ok(""))
                .unwrap_or("")
                .to_string();

            let mut berhasil = false;
            let mut percobaan = 0;

            // Retry sampai maks_retry
            while percobaan < self.maks_retry {
                match handler(payload.clone()).await {
                    Ok(_) => {
                        berhasil = true;
                        break;
                    }
                    Err(e) => {
                        percobaan += 1;
                        eprintln!("Percobaan {}/{}: {}", percobaan, self.maks_retry, e);
                        tokio::time::sleep(
                            std::time::Duration::from_millis(100 * percobaan as u64)
                        ).await;
                    }
                }
            }

            if !berhasil {
                // Kirim ke DLQ
                let dlq_payload = serde_json::json!({
                    "payload_asli": payload,
                    "error": "Melebihi maks retry",
                    "topic_asal": message.topic(),
                    "partisi": message.partition(),
                    "offset": message.offset(),
                    "gagal_pada": chrono::Utc::now().to_rfc3339()
                }).to_string();

                let _ = self.producer.send(
                    FutureRecord::to(&self.dlq_topic)
                        .payload(&dlq_payload),
                    std::time::Duration::from_secs(5),
                ).await;

                eprintln!("Event dikirim ke DLQ: {}", &self.dlq_topic);
            }

            // Selalu commit — bahkan yang masuk DLQ
            let _ = self.consumer.commit_message(
                &message,
                rdkafka::consumer::CommitMode::Async,
            );
        }
    }
}

Ringkasan #

  • acks = "all" untuk jaminan pengiriman — producer menunggu semua replica broker mengakui sebelum dianggap berhasil. Lebih lambat tapi tidak ada kehilangan data.
  • Key menentukan partisi — event dengan key yang sama selalu masuk ke partisi yang sama, menjamin urutan per key. Gunakan ID pengguna/entitas sebagai key.
  • enable.auto.commit = false — selalu commit manual setelah event berhasil diproses. Auto-commit berisiko: event bisa di-commit sebelum diproses jika aplikasi crash.
  • Consumer group untuk horizontal scaling — tambah instance consumer baru dalam group yang sama untuk berbagi beban. Jumlah consumer aktif maksimal sama dengan jumlah partisi.
  • auto.offset.reset = "earliest" — mulai dari event paling lama jika belum ada offset tersimpan. Gunakan "latest" jika hanya butuh event baru.
  • Outbox pattern untuk konsistensi — simpan event ke tabel outbox dalam transaksi database yang sama dengan perubahan data, lalu relay ke Kafka. Menjamin tidak ada event yang hilang saat aplikasi crash.
  • Dead Letter Queue untuk event gagal — daripada memblokir consumer saat pemrosesan gagal, kirim ke DLQ untuk investigasi dan retry manual.
  • Transaksi Kafka untuk exactly-once — gunakan transactional.id dan enable.idempotence untuk menjamin event tidak terduplikasi meski ada retry jaringan.
  • Topik dengan partisi cukup — lebih banyak partisi = lebih banyak parallelism, tapi juga lebih banyak overhead. Mulai dengan 3–6 partisi per topic untuk kebanyakan kasus.

← Sebelumnya: Elasticsearch   Berikutnya: RabbitMQ →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact