Google Pub/Sub #
Google Cloud Pub/Sub adalah layanan messaging terkelola dari Google Cloud Platform yang dirancang untuk skala global dengan latensi rendah. Mirip dengan Amazon SQS, tidak ada infrastruktur yang perlu dikelola — tapi Pub/Sub memiliki model yang lebih dekat ke Kafka dalam satu aspek penting: satu topic bisa punya banyak subscription yang masing-masing menerima salinan semua pesan secara independen, bukan berbagi pesan seperti SQS. Di Rust, crate google-cloud-pubsub dari komunitas memberikan akses async yang ergonomis. Artikel ini membahas autentikasi, manajemen topic dan subscription, publish, consume dengan dua mode (pull dan streaming), dan pola arsitektur yang umum di ekosistem GCP.
Arsitektur Pub/Sub #
flowchart LR
P1["Publisher A"] --> T["Topic\npesanan-events"]
P2["Publisher B"] --> T
T --> S1["Subscription 1\npemrosesan-pesanan\n(pull mode)"]
T --> S2["Subscription 2\nnotifikasi-email\n(push mode → Cloud Run)"]
T --> S3["Subscription 3\nanalytics-stream\n(BigQuery subscription)"]
S1 --> C1["Consumer\n(GKE pod)"]
S2 --> C2["Cloud Run\nService"]
S3 --> C3["BigQuery\nTable"]| Konsep | Penjelasan |
|---|---|
| Topic | Saluran pesan — publisher mengirim ke sini |
| Subscription | Cara consumer membaca dari topic |
| Pull | Consumer aktif meminta pesan dari Pub/Sub |
| Push | Pub/Sub mendorong pesan ke HTTP endpoint |
| Ack | Konfirmasi pesan berhasil diproses |
| Ack Deadline | Batas waktu ack sebelum pesan muncul kembali |
Perbedaan kunci dari SQS: setiap subscription menerima salinan semua pesan di topic. Jika ada 3 subscription, setiap pesan dikirim 3 kali ke consumer berbeda. Di SQS, satu pesan hanya dikonsumsi satu consumer.
Instalasi #
[dependencies]
google-cloud-pubsub = "0.22"
google-cloud-gax = "0.18"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
base64 = "0.21" # pesan Pub/Sub di-encode base64
Autentikasi #
Google Cloud menggunakan Application Default Credentials (ADC). Cara paling umum:
# Development lokal — login dengan akun Google
gcloud auth application-default login
# Di GCE/GKE/Cloud Run — otomatis menggunakan Workload Identity atau service account
# Tidak perlu konfigurasi tambahan
# Atau gunakan service account key file
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account-key.json
use google_cloud_pubsub::client::{Client, ClientConfig};
async fn buat_client(project_id: &str) -> Result<Client, Box<dyn std::error::Error>> {
let config = ClientConfig::default()
.with_project_id(project_id);
// ADC otomatis dibaca dari environment
let client = Client::new(config).await?;
Ok(client)
}
// Untuk development lokal dengan emulator Pub/Sub
async fn buat_client_emulator(project_id: &str) -> Result<Client, Box<dyn std::error::Error>> {
// Set environment variable sebelum run:
// export PUBSUB_EMULATOR_HOST=localhost:8085
std::env::set_var("PUBSUB_EMULATOR_HOST", "localhost:8085");
let config = ClientConfig::default()
.with_project_id(project_id);
let client = Client::new(config).await?;
Ok(client)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let project_id = std::env::var("GCP_PROJECT_ID")
.unwrap_or_else(|_| "my-project".to_string());
let client = buat_client(&project_id).await?;
println!("Terhubung ke Google Cloud Pub/Sub (project: {})", project_id);
Ok(())
}
Manajemen Topic dan Subscription #
use google_cloud_pubsub::client::Client;
use google_cloud_pubsub::topic::TopicConfig;
use google_cloud_pubsub::subscription::{SubscriptionConfig, RetryPolicy};
use std::time::Duration;
async fn setup_topic_dan_subscription(
client: &Client,
nama_topic: &str,
nama_subscription: &str,
) -> Result<(), Box<dyn std::error::Error>> {
// Buat topic
let topic = client.create_topic(
nama_topic,
None, // TopicConfig default
None, // retry config
).await?;
println!("Topic dibuat: {}", topic.fully_qualified_name());
// Buat subscription dengan konfigurasi
let sub_config = SubscriptionConfig {
// Ack deadline: berapa detik untuk ack sebelum pesan muncul kembali
ack_deadline_seconds: 60,
// Retain acknowledged messages untuk berapa lama
retain_acked_messages: false,
// Message retention duration
message_retention_duration: Some(Duration::from_secs(86400)), // 1 hari
// Retry policy
retry_policy: Some(RetryPolicy {
minimum_backoff: Duration::from_secs(10),
maximum_backoff: Duration::from_secs(600),
}),
..Default::default()
};
let subscription = client.create_subscription(
nama_subscription,
nama_topic,
sub_config,
None,
).await?;
println!("Subscription dibuat: {}", subscription.fully_qualified_name());
Ok(())
}
// Buat subscription dengan Dead Letter Topic
async fn setup_dengan_dead_letter(
client: &Client,
nama_topic: &str,
nama_sub: &str,
nama_dlq_topic: &str,
) -> Result<(), Box<dyn std::error::Error>> {
use google_cloud_pubsub::subscription::DeadLetterPolicy;
// Buat DLQ topic terlebih dahulu
client.create_topic(nama_dlq_topic, None, None).await?;
// Buat subscription dengan dead letter policy
let project_id = std::env::var("GCP_PROJECT_ID").unwrap_or_default();
let dlq_topic_name = format!(
"projects/{}/topics/{}",
project_id, nama_dlq_topic
);
let sub_config = SubscriptionConfig {
ack_deadline_seconds: 30,
dead_letter_policy: Some(DeadLetterPolicy {
dead_letter_topic: dlq_topic_name,
max_delivery_attempts: 5, // pindah ke DLQ setelah 5 kali gagal
}),
..Default::default()
};
client.create_subscription(nama_sub, nama_topic, sub_config, None).await?;
println!("Subscription dengan DLQ dibuat: {}", nama_sub);
Ok(())
}
Publish Pesan #
use google_cloud_pubsub::client::Client;
use google_cloud_pubsub::publisher::PublisherConfig;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Serialize, Deserialize, Clone)]
struct EventPesanan {
pub id: u64,
pub pengguna_id: u64,
pub total: f64,
pub status: String,
pub dibuat_pada: String,
}
async fn publish_pesan(
client: &Client,
nama_topic: &str,
event: &EventPesanan,
) -> Result<String, Box<dyn std::error::Error>> {
let topic = client.topic(nama_topic);
// Konfigurasi publisher
let publisher = topic.new_publisher(Some(PublisherConfig {
// Batch: tunggu hingga 100 pesan atau 10ms sebelum kirim
flush_interval: std::time::Duration::from_millis(10),
bundle_size: 100,
..Default::default()
}));
// Encode payload ke bytes
let payload = serde_json::to_vec(event)?;
// Message attributes — metadata yang bisa difilter di subscription
let mut attributes = HashMap::new();
attributes.insert("event_type".to_string(), event.status.clone());
attributes.insert("source".to_string(), "api-server".to_string());
attributes.insert("version".to_string(), "v1".to_string());
// Ordering key: pesan dengan key sama dijamin terurut di subscription
let ordering_key = format!("pengguna-{}", event.pengguna_id);
let pesan = google_cloud_pubsub::publisher::PubsubMessage {
data: payload,
attributes,
ordering_key, // kosongkan jika tidak butuh ordering
..Default::default()
};
// Kirim — mengembalikan Future yang resolve ke message ID
let awaiter = publisher.publish(pesan).await;
// Tunggu konfirmasi dari server
let msg_id = awaiter.get().await?;
println!("Pesan terkirim dengan ID: {}", msg_id);
// Flush semua pesan dalam buffer sebelum shutdown
publisher.shutdown().await;
Ok(msg_id)
}
// Publish banyak pesan secara paralel
async fn publish_batch(
client: &Client,
nama_topic: &str,
events: Vec<EventPesanan>,
) -> Result<Vec<String>, Box<dyn std::error::Error>> {
let topic = client.topic(nama_topic);
let publisher = topic.new_publisher(None);
// Kirim semua pesan tanpa menunggu (concurrent)
let awaiters: Vec<_> = events.iter().map(|event| {
let payload = serde_json::to_vec(event).unwrap();
let pesan = google_cloud_pubsub::publisher::PubsubMessage {
data: payload,
..Default::default()
};
publisher.publish(pesan)
}).collect();
// Collect future publisher
let awaiters_resolved: Vec<_> = futures::future::join_all(awaiters).await;
// Tunggu semua konfirmasi
let mut msg_ids = Vec::new();
for awaiter in awaiters_resolved {
let id = awaiter.get().await?;
msg_ids.push(id);
}
publisher.shutdown().await;
println!("Batch publish: {} pesan berhasil", msg_ids.len());
Ok(msg_ids)
}
Pull — Menarik Pesan Secara Manual #
use google_cloud_pubsub::client::Client;
use google_cloud_pubsub::subscription::ReceiveConfig;
async fn pull_pesan(
client: &Client,
nama_subscription: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let subscription = client.subscription(nama_subscription);
// Tarik pesan dengan batas maksimum
let pesan_list = subscription
.pull(10, None) // maks 10 pesan, tidak ada timeout
.await?;
for pesan in pesan_list {
let data = std::str::from_utf8(&pesan.message.data).unwrap_or("");
let msg_id = &pesan.message.message_id;
println!("Message ID: {}", msg_id);
// Baca attributes
for (k, v) in &pesan.message.attributes {
println!(" {}: {}", k, v);
}
// Parse payload
match serde_json::from_str::<EventPesanan>(data) {
Ok(event) => {
match proses_event(&event).await {
Ok(_) => {
// Acknowledge — pesan dihapus dari subscription
pesan.ack().await?;
println!("Event #{} berhasil diproses dan di-ack", event.id);
}
Err(e) => {
eprintln!("Gagal proses event #{}: {}", event.id, e);
// Nack — pesan akan muncul kembali setelah ack deadline
pesan.nack().await?;
}
}
}
Err(e) => {
eprintln!("Gagal parse pesan: {}", e);
pesan.nack().await?;
}
}
}
Ok(())
}
async fn proses_event(event: &EventPesanan) -> Result<(), String> {
println!("Memproses pesanan #{} - {}", event.id, event.status);
Ok(())
}
Streaming Pull — Consume Berkelanjutan #
Mode paling umum untuk consumer yang berjalan terus-menerus:
use google_cloud_pubsub::client::Client;
use google_cloud_pubsub::subscription::ReceiveConfig;
use futures::StreamExt;
async fn streaming_consumer(
client: &Client,
nama_subscription: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let subscription = client.subscription(nama_subscription);
println!("Streaming consumer aktif: {}", nama_subscription);
let config = ReceiveConfig {
// Jumlah worker goroutine untuk memproses pesan paralel
worker_count: 4,
..Default::default()
};
// receive: membuka streaming pull yang berkelanjutan
subscription.receive(
move |pesan, ack_handler| {
async move {
let data = std::str::from_utf8(&pesan.data).unwrap_or("{}");
let msg_id = &pesan.message_id;
println!("Menerima [{}]: {}...", msg_id, &data[..data.len().min(80)]);
match serde_json::from_str::<EventPesanan>(data) {
Ok(event) => {
match proses_event(&event).await {
Ok(_) => {
ack_handler.ack().await;
}
Err(e) => {
eprintln!("Error: {}", e);
ack_handler.nack().await;
}
}
}
Err(_) => {
// Pesan tidak bisa diparse — ack agar tidak loop terus
eprintln!("Pesan tidak valid, di-ack untuk skip");
ack_handler.ack().await;
}
}
}
},
None, // CancellationToken — untuk graceful shutdown
Some(config),
)
.await?;
Ok(())
}
Perpanjang Ack Deadline #
Untuk pesan yang butuh waktu lebih lama diproses:
use google_cloud_pubsub::client::Client;
use google_cloud_pubsub::subscription::ReceiveConfig;
async fn consumer_dengan_keepalive(
client: &Client,
nama_subscription: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let subscription = client.subscription(nama_subscription);
subscription.receive(
move |pesan, ack_handler| {
async move {
let data = std::str::from_utf8(&pesan.data).unwrap_or("{}");
// Spawn background task untuk perpanjang ack deadline
let handler_clone = ack_handler.clone();
let keepalive = tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
// Perpanjang ack deadline 60 detik
handler_clone.modify_ack_deadline(60).await;
println!("Ack deadline diperpanjang");
}
});
// Simulasi pemrosesan panjang
tokio::time::sleep(std::time::Duration::from_secs(90)).await;
keepalive.abort();
ack_handler.ack().await;
println!("Pesan panjang selesai diproses");
}
},
None,
None,
)
.await?;
Ok(())
}
Ordering Key — Urutan Per Entitas #
Pesan dengan ordering key yang sama dijamin terurut dalam satu subscription:
async fn publish_dengan_ordering(
client: &Client,
nama_topic: &str,
pengguna_id: u64,
events: Vec<EventPesanan>,
) -> Result<(), Box<dyn std::error::Error>> {
let topic = client.topic(nama_topic);
// PENTING: subscription harus dibuat dengan enable_message_ordering = true
let publisher = topic.new_publisher(None);
for event in &events {
let payload = serde_json::to_vec(event)?;
let pesan = google_cloud_pubsub::publisher::PubsubMessage {
data: payload,
// Semua event untuk pengguna yang sama menggunakan key yang sama
// → dijamin urutan dalam subscription yang mengaktifkan ordering
ordering_key: format!("user-{}", pengguna_id),
..Default::default()
};
publisher.publish(pesan).await.get().await?;
}
publisher.shutdown().await;
println!("Semua event untuk pengguna {} dikirim terurut", pengguna_id);
Ok(())
}
Filter Subscription — Hanya Terima Pesan Tertentu #
async fn buat_subscription_dengan_filter(
client: &Client,
nama_subscription: &str,
nama_topic: &str,
) -> Result<(), Box<dyn std::error::Error>> {
use google_cloud_pubsub::subscription::SubscriptionConfig;
// Filter menggunakan Common Expression Language (CEL)
// Hanya terima pesan dengan attribute event_type = "baru"
let sub_config = SubscriptionConfig {
filter: Some("attributes.event_type = \"baru\"".to_string()),
ack_deadline_seconds: 30,
..Default::default()
};
client.create_subscription(
nama_subscription,
nama_topic,
sub_config,
None,
).await?;
println!("Subscription filter dibuat: hanya event_type = baru");
Ok(())
}
Perbandingan Pub/Sub, SQS, dan Kafka #
| Aspek | Google Pub/Sub | Amazon SQS | Apache Kafka |
|---|---|---|---|
| Model | Topic-Subscription | Point-to-point Queue | Topic-Partition |
| Fan-out | Native (banyak sub per topic) | Via SNS | Consumer group berbeda |
| Ordering | Per ordering key | FIFO queue | Per partisi |
| Replay | Tidak (max 7 hari) | Tidak | Ya (offset) |
| Manajemen | Fully managed | Fully managed | Self-managed atau Confluent |
| Throughput | 10 juta msg/detik | Tak terbatas | Sangat tinggi |
| Push mode | Ya (ke HTTP endpoint) | Tidak | Tidak |
| Filter | Ya (CEL expressions) | Tidak | Tidak |
| Ekosistem | GCP | AWS | Cloud-agnostic |
Kapan memilih Google Pub/Sub:
✓ Sudah di ekosistem GCP (GKE, Cloud Run, Cloud Functions)
✓ Butuh fan-out: satu pesan ke banyak consumer berbeda
✓ Push mode ke Cloud Run atau Cloud Functions
✓ Filter pesan dengan attribute tanpa kode tambahan
✓ Integrasi native dengan BigQuery, Dataflow, GCS
Kapan memilih SQS:
✓ Sudah di ekosistem AWS
✓ Kebutuhan sederhana: antrian task, satu consumer per pesan
Kapan memilih Kafka:
✓ Throughput sangat tinggi dan butuh replay event
✓ Tidak terikat ke satu cloud provider
Ringkasan #
- Setiap subscription mendapat salinan semua pesan — berbeda dari SQS di mana satu pesan hanya dikonsumsi satu consumer. Pub/Sub lebih mirip Kafka dalam aspek ini: banyak subscription = banyak consumer group independen.
- Dua mode consume: pull dan streaming — pull untuk batch processing sesekali, streaming pull (
.receive()) untuk consumer yang selalu aktif.- Ack wajib untuk menghapus pesan — pesan yang tidak di-ack akan muncul kembali setelah ack deadline habis. Gunakan
nack()untuk sengaja mengembalikan pesan ke antrian.- Ordering key untuk urutan per entitas — pesan dengan ordering key yang sama dijamin terurut dalam subscription yang mengaktifkan
enable_message_ordering. Gunakan ID entitas sebagai key.- Attributes untuk metadata dan filter — kirim attribute bersama pesan, buat subscription dengan filter CEL untuk hanya menerima pesan tertentu tanpa kode tambahan.
- Dead Letter Topic untuk pesan gagal — setelah
max_delivery_attemptskali gagal di-ack, pesan otomatis diteruskan ke DLT untuk investigasi.- Publisher batching otomatis —
PublisherConfig.flush_intervaldanbundle_sizemengatur kapan batch dikirim. Batching mengurangi biaya API call secara signifikan.- Emulator untuk development lokal — set
PUBSUB_EMULATOR_HOST=localhost:8085untuk testing tanpa akun GCP.- Perpanjang ack deadline untuk proses panjang —
modify_ack_deadline()mencegah pesan muncul kembali di consumer lain saat masih diproses.