Google Pub/Sub

Google Pub/Sub #

Google Cloud Pub/Sub adalah layanan messaging terkelola dari Google Cloud Platform yang dirancang untuk skala global dengan latensi rendah. Mirip dengan Amazon SQS, tidak ada infrastruktur yang perlu dikelola — tapi Pub/Sub memiliki model yang lebih dekat ke Kafka dalam satu aspek penting: satu topic bisa punya banyak subscription yang masing-masing menerima salinan semua pesan secara independen, bukan berbagi pesan seperti SQS. Di Rust, crate google-cloud-pubsub dari komunitas memberikan akses async yang ergonomis. Artikel ini membahas autentikasi, manajemen topic dan subscription, publish, consume dengan dua mode (pull dan streaming), dan pola arsitektur yang umum di ekosistem GCP.

Arsitektur Pub/Sub #

flowchart LR
    P1["Publisher A"] --> T["Topic\npesanan-events"]
    P2["Publisher B"] --> T

    T --> S1["Subscription 1\npemrosesan-pesanan\n(pull mode)"]
    T --> S2["Subscription 2\nnotifikasi-email\n(push mode → Cloud Run)"]
    T --> S3["Subscription 3\nanalytics-stream\n(BigQuery subscription)"]

    S1 --> C1["Consumer\n(GKE pod)"]
    S2 --> C2["Cloud Run\nService"]
    S3 --> C3["BigQuery\nTable"]
KonsepPenjelasan
TopicSaluran pesan — publisher mengirim ke sini
SubscriptionCara consumer membaca dari topic
PullConsumer aktif meminta pesan dari Pub/Sub
PushPub/Sub mendorong pesan ke HTTP endpoint
AckKonfirmasi pesan berhasil diproses
Ack DeadlineBatas waktu ack sebelum pesan muncul kembali

Perbedaan kunci dari SQS: setiap subscription menerima salinan semua pesan di topic. Jika ada 3 subscription, setiap pesan dikirim 3 kali ke consumer berbeda. Di SQS, satu pesan hanya dikonsumsi satu consumer.


Instalasi #

[dependencies]
google-cloud-pubsub = "0.22"
google-cloud-gax = "0.18"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
base64 = "0.21"  # pesan Pub/Sub di-encode base64

Autentikasi #

Google Cloud menggunakan Application Default Credentials (ADC). Cara paling umum:

# Development lokal — login dengan akun Google
gcloud auth application-default login

# Di GCE/GKE/Cloud Run — otomatis menggunakan Workload Identity atau service account
# Tidak perlu konfigurasi tambahan

# Atau gunakan service account key file
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account-key.json
use google_cloud_pubsub::client::{Client, ClientConfig};

async fn buat_client(project_id: &str) -> Result<Client, Box<dyn std::error::Error>> {
    let config = ClientConfig::default()
        .with_project_id(project_id);

    // ADC otomatis dibaca dari environment
    let client = Client::new(config).await?;
    Ok(client)
}

// Untuk development lokal dengan emulator Pub/Sub
async fn buat_client_emulator(project_id: &str) -> Result<Client, Box<dyn std::error::Error>> {
    // Set environment variable sebelum run:
    // export PUBSUB_EMULATOR_HOST=localhost:8085
    std::env::set_var("PUBSUB_EMULATOR_HOST", "localhost:8085");

    let config = ClientConfig::default()
        .with_project_id(project_id);

    let client = Client::new(config).await?;
    Ok(client)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let project_id = std::env::var("GCP_PROJECT_ID")
        .unwrap_or_else(|_| "my-project".to_string());

    let client = buat_client(&project_id).await?;
    println!("Terhubung ke Google Cloud Pub/Sub (project: {})", project_id);

    Ok(())
}

Manajemen Topic dan Subscription #

use google_cloud_pubsub::client::Client;
use google_cloud_pubsub::topic::TopicConfig;
use google_cloud_pubsub::subscription::{SubscriptionConfig, RetryPolicy};
use std::time::Duration;

async fn setup_topic_dan_subscription(
    client: &Client,
    nama_topic: &str,
    nama_subscription: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    // Buat topic
    let topic = client.create_topic(
        nama_topic,
        None,  // TopicConfig default
        None,  // retry config
    ).await?;
    println!("Topic dibuat: {}", topic.fully_qualified_name());

    // Buat subscription dengan konfigurasi
    let sub_config = SubscriptionConfig {
        // Ack deadline: berapa detik untuk ack sebelum pesan muncul kembali
        ack_deadline_seconds: 60,
        // Retain acknowledged messages untuk berapa lama
        retain_acked_messages: false,
        // Message retention duration
        message_retention_duration: Some(Duration::from_secs(86400)), // 1 hari
        // Retry policy
        retry_policy: Some(RetryPolicy {
            minimum_backoff: Duration::from_secs(10),
            maximum_backoff: Duration::from_secs(600),
        }),
        ..Default::default()
    };

    let subscription = client.create_subscription(
        nama_subscription,
        nama_topic,
        sub_config,
        None,
    ).await?;
    println!("Subscription dibuat: {}", subscription.fully_qualified_name());

    Ok(())
}

// Buat subscription dengan Dead Letter Topic
async fn setup_dengan_dead_letter(
    client: &Client,
    nama_topic: &str,
    nama_sub: &str,
    nama_dlq_topic: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    use google_cloud_pubsub::subscription::DeadLetterPolicy;

    // Buat DLQ topic terlebih dahulu
    client.create_topic(nama_dlq_topic, None, None).await?;

    // Buat subscription dengan dead letter policy
    let project_id = std::env::var("GCP_PROJECT_ID").unwrap_or_default();
    let dlq_topic_name = format!(
        "projects/{}/topics/{}",
        project_id, nama_dlq_topic
    );

    let sub_config = SubscriptionConfig {
        ack_deadline_seconds: 30,
        dead_letter_policy: Some(DeadLetterPolicy {
            dead_letter_topic: dlq_topic_name,
            max_delivery_attempts: 5, // pindah ke DLQ setelah 5 kali gagal
        }),
        ..Default::default()
    };

    client.create_subscription(nama_sub, nama_topic, sub_config, None).await?;
    println!("Subscription dengan DLQ dibuat: {}", nama_sub);

    Ok(())
}

Publish Pesan #

use google_cloud_pubsub::client::Client;
use google_cloud_pubsub::publisher::PublisherConfig;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

#[derive(Debug, Serialize, Deserialize, Clone)]
struct EventPesanan {
    pub id: u64,
    pub pengguna_id: u64,
    pub total: f64,
    pub status: String,
    pub dibuat_pada: String,
}

async fn publish_pesan(
    client: &Client,
    nama_topic: &str,
    event: &EventPesanan,
) -> Result<String, Box<dyn std::error::Error>> {
    let topic = client.topic(nama_topic);

    // Konfigurasi publisher
    let publisher = topic.new_publisher(Some(PublisherConfig {
        // Batch: tunggu hingga 100 pesan atau 10ms sebelum kirim
        flush_interval: std::time::Duration::from_millis(10),
        bundle_size: 100,
        ..Default::default()
    }));

    // Encode payload ke bytes
    let payload = serde_json::to_vec(event)?;

    // Message attributes — metadata yang bisa difilter di subscription
    let mut attributes = HashMap::new();
    attributes.insert("event_type".to_string(), event.status.clone());
    attributes.insert("source".to_string(), "api-server".to_string());
    attributes.insert("version".to_string(), "v1".to_string());

    // Ordering key: pesan dengan key sama dijamin terurut di subscription
    let ordering_key = format!("pengguna-{}", event.pengguna_id);

    let pesan = google_cloud_pubsub::publisher::PubsubMessage {
        data: payload,
        attributes,
        ordering_key,  // kosongkan jika tidak butuh ordering
        ..Default::default()
    };

    // Kirim — mengembalikan Future yang resolve ke message ID
    let awaiter = publisher.publish(pesan).await;

    // Tunggu konfirmasi dari server
    let msg_id = awaiter.get().await?;
    println!("Pesan terkirim dengan ID: {}", msg_id);

    // Flush semua pesan dalam buffer sebelum shutdown
    publisher.shutdown().await;

    Ok(msg_id)
}

// Publish banyak pesan secara paralel
async fn publish_batch(
    client: &Client,
    nama_topic: &str,
    events: Vec<EventPesanan>,
) -> Result<Vec<String>, Box<dyn std::error::Error>> {
    let topic = client.topic(nama_topic);
    let publisher = topic.new_publisher(None);

    // Kirim semua pesan tanpa menunggu (concurrent)
    let awaiters: Vec<_> = events.iter().map(|event| {
        let payload = serde_json::to_vec(event).unwrap();
        let pesan = google_cloud_pubsub::publisher::PubsubMessage {
            data: payload,
            ..Default::default()
        };
        publisher.publish(pesan)
    }).collect();

    // Collect future publisher
    let awaiters_resolved: Vec<_> = futures::future::join_all(awaiters).await;

    // Tunggu semua konfirmasi
    let mut msg_ids = Vec::new();
    for awaiter in awaiters_resolved {
        let id = awaiter.get().await?;
        msg_ids.push(id);
    }

    publisher.shutdown().await;
    println!("Batch publish: {} pesan berhasil", msg_ids.len());

    Ok(msg_ids)
}

Pull — Menarik Pesan Secara Manual #

use google_cloud_pubsub::client::Client;
use google_cloud_pubsub::subscription::ReceiveConfig;

async fn pull_pesan(
    client: &Client,
    nama_subscription: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    let subscription = client.subscription(nama_subscription);

    // Tarik pesan dengan batas maksimum
    let pesan_list = subscription
        .pull(10, None)  // maks 10 pesan, tidak ada timeout
        .await?;

    for pesan in pesan_list {
        let data = std::str::from_utf8(&pesan.message.data).unwrap_or("");
        let msg_id = &pesan.message.message_id;

        println!("Message ID: {}", msg_id);

        // Baca attributes
        for (k, v) in &pesan.message.attributes {
            println!("  {}: {}", k, v);
        }

        // Parse payload
        match serde_json::from_str::<EventPesanan>(data) {
            Ok(event) => {
                match proses_event(&event).await {
                    Ok(_) => {
                        // Acknowledge — pesan dihapus dari subscription
                        pesan.ack().await?;
                        println!("Event #{} berhasil diproses dan di-ack", event.id);
                    }
                    Err(e) => {
                        eprintln!("Gagal proses event #{}: {}", event.id, e);
                        // Nack — pesan akan muncul kembali setelah ack deadline
                        pesan.nack().await?;
                    }
                }
            }
            Err(e) => {
                eprintln!("Gagal parse pesan: {}", e);
                pesan.nack().await?;
            }
        }
    }

    Ok(())
}

async fn proses_event(event: &EventPesanan) -> Result<(), String> {
    println!("Memproses pesanan #{} - {}", event.id, event.status);
    Ok(())
}

Streaming Pull — Consume Berkelanjutan #

Mode paling umum untuk consumer yang berjalan terus-menerus:

use google_cloud_pubsub::client::Client;
use google_cloud_pubsub::subscription::ReceiveConfig;
use futures::StreamExt;

async fn streaming_consumer(
    client: &Client,
    nama_subscription: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    let subscription = client.subscription(nama_subscription);

    println!("Streaming consumer aktif: {}", nama_subscription);

    let config = ReceiveConfig {
        // Jumlah worker goroutine untuk memproses pesan paralel
        worker_count: 4,
        ..Default::default()
    };

    // receive: membuka streaming pull yang berkelanjutan
    subscription.receive(
        move |pesan, ack_handler| {
            async move {
                let data = std::str::from_utf8(&pesan.data).unwrap_or("{}");
                let msg_id = &pesan.message_id;

                println!("Menerima [{}]: {}...", msg_id, &data[..data.len().min(80)]);

                match serde_json::from_str::<EventPesanan>(data) {
                    Ok(event) => {
                        match proses_event(&event).await {
                            Ok(_) => {
                                ack_handler.ack().await;
                            }
                            Err(e) => {
                                eprintln!("Error: {}", e);
                                ack_handler.nack().await;
                            }
                        }
                    }
                    Err(_) => {
                        // Pesan tidak bisa diparse — ack agar tidak loop terus
                        eprintln!("Pesan tidak valid, di-ack untuk skip");
                        ack_handler.ack().await;
                    }
                }
            }
        },
        None,    // CancellationToken — untuk graceful shutdown
        Some(config),
    )
    .await?;

    Ok(())
}

Perpanjang Ack Deadline #

Untuk pesan yang butuh waktu lebih lama diproses:

use google_cloud_pubsub::client::Client;
use google_cloud_pubsub::subscription::ReceiveConfig;

async fn consumer_dengan_keepalive(
    client: &Client,
    nama_subscription: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    let subscription = client.subscription(nama_subscription);

    subscription.receive(
        move |pesan, ack_handler| {
            async move {
                let data = std::str::from_utf8(&pesan.data).unwrap_or("{}");

                // Spawn background task untuk perpanjang ack deadline
                let handler_clone = ack_handler.clone();
                let keepalive = tokio::spawn(async move {
                    loop {
                        tokio::time::sleep(std::time::Duration::from_secs(30)).await;
                        // Perpanjang ack deadline 60 detik
                        handler_clone.modify_ack_deadline(60).await;
                        println!("Ack deadline diperpanjang");
                    }
                });

                // Simulasi pemrosesan panjang
                tokio::time::sleep(std::time::Duration::from_secs(90)).await;

                keepalive.abort();
                ack_handler.ack().await;
                println!("Pesan panjang selesai diproses");
            }
        },
        None,
        None,
    )
    .await?;

    Ok(())
}

Ordering Key — Urutan Per Entitas #

Pesan dengan ordering key yang sama dijamin terurut dalam satu subscription:

async fn publish_dengan_ordering(
    client: &Client,
    nama_topic: &str,
    pengguna_id: u64,
    events: Vec<EventPesanan>,
) -> Result<(), Box<dyn std::error::Error>> {
    let topic = client.topic(nama_topic);

    // PENTING: subscription harus dibuat dengan enable_message_ordering = true
    let publisher = topic.new_publisher(None);

    for event in &events {
        let payload = serde_json::to_vec(event)?;
        let pesan = google_cloud_pubsub::publisher::PubsubMessage {
            data: payload,
            // Semua event untuk pengguna yang sama menggunakan key yang sama
            // → dijamin urutan dalam subscription yang mengaktifkan ordering
            ordering_key: format!("user-{}", pengguna_id),
            ..Default::default()
        };

        publisher.publish(pesan).await.get().await?;
    }

    publisher.shutdown().await;
    println!("Semua event untuk pengguna {} dikirim terurut", pengguna_id);
    Ok(())
}

Filter Subscription — Hanya Terima Pesan Tertentu #

async fn buat_subscription_dengan_filter(
    client: &Client,
    nama_subscription: &str,
    nama_topic: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    use google_cloud_pubsub::subscription::SubscriptionConfig;

    // Filter menggunakan Common Expression Language (CEL)
    // Hanya terima pesan dengan attribute event_type = "baru"
    let sub_config = SubscriptionConfig {
        filter: Some("attributes.event_type = \"baru\"".to_string()),
        ack_deadline_seconds: 30,
        ..Default::default()
    };

    client.create_subscription(
        nama_subscription,
        nama_topic,
        sub_config,
        None,
    ).await?;

    println!("Subscription filter dibuat: hanya event_type = baru");
    Ok(())
}

Perbandingan Pub/Sub, SQS, dan Kafka #

AspekGoogle Pub/SubAmazon SQSApache Kafka
ModelTopic-SubscriptionPoint-to-point QueueTopic-Partition
Fan-outNative (banyak sub per topic)Via SNSConsumer group berbeda
OrderingPer ordering keyFIFO queuePer partisi
ReplayTidak (max 7 hari)TidakYa (offset)
ManajemenFully managedFully managedSelf-managed atau Confluent
Throughput10 juta msg/detikTak terbatasSangat tinggi
Push modeYa (ke HTTP endpoint)TidakTidak
FilterYa (CEL expressions)TidakTidak
EkosistemGCPAWSCloud-agnostic
Kapan memilih Google Pub/Sub:
  ✓ Sudah di ekosistem GCP (GKE, Cloud Run, Cloud Functions)
  ✓ Butuh fan-out: satu pesan ke banyak consumer berbeda
  ✓ Push mode ke Cloud Run atau Cloud Functions
  ✓ Filter pesan dengan attribute tanpa kode tambahan
  ✓ Integrasi native dengan BigQuery, Dataflow, GCS

Kapan memilih SQS:
  ✓ Sudah di ekosistem AWS
  ✓ Kebutuhan sederhana: antrian task, satu consumer per pesan

Kapan memilih Kafka:
  ✓ Throughput sangat tinggi dan butuh replay event
  ✓ Tidak terikat ke satu cloud provider

Ringkasan #

  • Setiap subscription mendapat salinan semua pesan — berbeda dari SQS di mana satu pesan hanya dikonsumsi satu consumer. Pub/Sub lebih mirip Kafka dalam aspek ini: banyak subscription = banyak consumer group independen.
  • Dua mode consume: pull dan streaming — pull untuk batch processing sesekali, streaming pull (.receive()) untuk consumer yang selalu aktif.
  • Ack wajib untuk menghapus pesan — pesan yang tidak di-ack akan muncul kembali setelah ack deadline habis. Gunakan nack() untuk sengaja mengembalikan pesan ke antrian.
  • Ordering key untuk urutan per entitas — pesan dengan ordering key yang sama dijamin terurut dalam subscription yang mengaktifkan enable_message_ordering. Gunakan ID entitas sebagai key.
  • Attributes untuk metadata dan filter — kirim attribute bersama pesan, buat subscription dengan filter CEL untuk hanya menerima pesan tertentu tanpa kode tambahan.
  • Dead Letter Topic untuk pesan gagal — setelah max_delivery_attempts kali gagal di-ack, pesan otomatis diteruskan ke DLT untuk investigasi.
  • Publisher batching otomatisPublisherConfig.flush_interval dan bundle_size mengatur kapan batch dikirim. Batching mengurangi biaya API call secara signifikan.
  • Emulator untuk development lokal — set PUBSUB_EMULATOR_HOST=localhost:8085 untuk testing tanpa akun GCP.
  • Perpanjang ack deadline untuk proses panjangmodify_ack_deadline() mencegah pesan muncul kembali di consumer lain saat masih diproses.

← Sebelumnya: Amazon SQS   Berikutnya: Redis →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact