Amazon SQS #
Amazon Simple Queue Service (SQS) adalah layanan message queue terkelola sepenuhnya dari AWS. Tidak ada server yang perlu dikelola, tidak ada cluster yang perlu dikonfigurasi — kamu tinggal buat queue, kirim pesan, dan konsumsi. SQS sangat cocok untuk decoupling komponen dalam arsitektur cloud AWS: Lambda yang dipicu SQS, ECS task yang memproses dari antrian, atau microservice yang berkomunikasi tanpa direct coupling. Di Rust, AWS SDK resmi (aws-sdk-sqs) menyediakan akses async yang lengkap. Artikel ini membahas kedua tipe queue (Standard dan FIFO), operasi lengkap, dan pola arsitektur yang umum di ekosistem AWS.
Dua Tipe Queue SQS #
flowchart LR
subgraph Standard Queue
S["Standard\nAt-least-once delivery\nUrutan tidak terjamin\nThroughput tak terbatas\nHarga lebih murah"]
end
subgraph FIFO Queue
F["FIFO\nExactly-once delivery\nUrutan terjamin per group\nMaks 3000 msg/detik\nHarga lebih mahal"]
end| Aspek | Standard Queue | FIFO Queue |
|---|---|---|
| Urutan | Best-effort | Strict per Message Group ID |
| Delivery | At-least-once (bisa duplikat) | Exactly-once |
| Throughput | Tak terbatas | 3.000 msg/detik (high throughput: 70.000) |
| Harga | Lebih murah | ~10x lebih mahal |
| Nama queue | Bebas | Harus diakhiri .fifo |
| Kapan | Default untuk kebanyakan kasus | Urutan dan deduplication penting |
Instalasi #
[dependencies]
aws-config = { version = "1", features = ["behavior-version-latest"] }
aws-sdk-sqs = "1"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Setup Client #
use aws_sdk_sqs::Client;
async fn buat_client() -> Client {
// Baca konfigurasi dari environment:
// AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION
// atau dari ~/.aws/credentials jika di lokal
let config = aws_config::load_from_env().await;
Client::new(&config)
}
// Untuk development lokal dengan LocalStack
async fn buat_client_lokal() -> Client {
use aws_sdk_sqs::config::Builder;
use aws_types::region::Region;
let config = Builder::new()
.endpoint_url("http://localhost:4566") // LocalStack endpoint
.region(Region::new("us-east-1"))
.credentials_provider(aws_credential_types::provider::SharedCredentialsProvider::new(
aws_credential_types::Credentials::new(
"test", "test", None, None, "test"
)
))
.build();
Client::from_conf(config)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = buat_client().await;
// List queue yang ada
let hasil = client.list_queues().send().await?;
println!("Queue yang ada:");
for url in hasil.queue_urls() {
println!(" - {}", url);
}
Ok(())
}
Manajemen Queue #
use aws_sdk_sqs::{Client, types::QueueAttributeName};
use std::collections::HashMap;
async fn buat_standard_queue(
client: &Client,
nama: &str,
) -> Result<String, aws_sdk_sqs::Error> {
let mut atribut = HashMap::new();
// Visibility timeout: pesan tidak terlihat selama N detik setelah diterima
atribut.insert(QueueAttributeName::VisibilityTimeout, "30".to_string());
// Message retention: berapa lama pesan disimpan (default 4 hari)
atribut.insert(QueueAttributeName::MessageRetentionPeriod, "86400".to_string()); // 1 hari
// Long polling: tunggu hingga N detik untuk pesan (hemat biaya)
atribut.insert(QueueAttributeName::ReceiveMessageWaitTimeSeconds, "20".to_string());
let hasil = client
.create_queue()
.queue_name(nama)
.set_attributes(Some(atribut))
.send()
.await?;
let url = hasil.queue_url().unwrap_or("").to_string();
println!("Standard queue dibuat: {}", url);
Ok(url)
}
async fn buat_fifo_queue(
client: &Client,
nama: &str, // harus diakhiri .fifo
) -> Result<String, aws_sdk_sqs::Error> {
let mut atribut = HashMap::new();
atribut.insert(QueueAttributeName::FifoQueue, "true".to_string());
// Content-based deduplication: hash body pesan sebagai dedup ID
atribut.insert(QueueAttributeName::ContentBasedDeduplication, "true".to_string());
atribut.insert(QueueAttributeName::VisibilityTimeout, "60".to_string());
let hasil = client
.create_queue()
.queue_name(nama) // contoh: "pesanan-prioritas.fifo"
.set_attributes(Some(atribut))
.send()
.await?;
let url = hasil.queue_url().unwrap_or("").to_string();
println!("FIFO queue dibuat: {}", url);
Ok(url)
}
async fn buat_queue_dengan_dlq(
client: &Client,
nama_utama: &str,
maks_receive: i32,
) -> Result<String, Box<dyn std::error::Error>> {
// 1. Buat DLQ terlebih dahulu
let dlq = client
.create_queue()
.queue_name(&format!("{}-dlq", nama_utama))
.send()
.await?;
let dlq_url = dlq.queue_url().unwrap_or("").to_string();
// 2. Ambil ARN dari DLQ
let dlq_attrs = client
.get_queue_attributes()
.queue_url(&dlq_url)
.attribute_names(QueueAttributeName::QueueArn)
.send()
.await?;
let dlq_arn = dlq_attrs
.attributes()
.and_then(|a| a.get(&QueueAttributeName::QueueArn))
.cloned()
.unwrap_or_default();
// 3. Buat queue utama dengan Redrive Policy ke DLQ
let redrive_policy = serde_json::json!({
"deadLetterTargetArn": dlq_arn,
"maxReceiveCount": maks_receive // pindah ke DLQ setelah N kali gagal
})
.to_string();
let mut atribut = HashMap::new();
atribut.insert(
QueueAttributeName::RedrivePolicy,
redrive_policy,
);
atribut.insert(QueueAttributeName::VisibilityTimeout, "30".to_string());
let hasil = client
.create_queue()
.queue_name(nama_utama)
.set_attributes(Some(atribut))
.send()
.await?;
let url = hasil.queue_url().unwrap_or("").to_string();
println!("Queue '{}' dengan DLQ berhasil dibuat", nama_utama);
Ok(url)
}
Mengirim Pesan #
use aws_sdk_sqs::{Client, types::MessageAttributeValue};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct EventPemrosesan {
pub id: u64,
pub tipe: String,
pub data: serde_json::Value,
pub dibuat_pada: String,
}
async fn kirim_pesan(
client: &Client,
queue_url: &str,
event: &EventPemrosesan,
) -> Result<String, aws_sdk_sqs::Error> {
let body = serde_json::to_string(event).unwrap();
let hasil = client
.send_message()
.queue_url(queue_url)
.message_body(&body)
// Delay pengiriman: pesan tidak terlihat selama N detik
.delay_seconds(0)
// Message attributes: metadata tambahan yang bisa difilter
.message_attributes(
"EventType",
MessageAttributeValue::builder()
.data_type("String")
.string_value(&event.tipe)
.build()
.unwrap(),
)
.message_attributes(
"Source",
MessageAttributeValue::builder()
.data_type("String")
.string_value("api-server")
.build()
.unwrap(),
)
.send()
.await?;
let msg_id = hasil.message_id().unwrap_or("").to_string();
println!("Pesan terkirim: {}", msg_id);
Ok(msg_id)
}
// Kirim ke FIFO queue — butuh MessageGroupId dan MessageDeduplicationId
async fn kirim_ke_fifo(
client: &Client,
queue_url: &str,
event: &EventPemrosesan,
group_id: &str, // semua pesan dalam group yang sama diproses berurutan
) -> Result<String, aws_sdk_sqs::Error> {
let body = serde_json::to_string(event).unwrap();
let hasil = client
.send_message()
.queue_url(queue_url)
.message_body(&body)
.message_group_id(group_id)
// Deduplication ID: pesan dengan ID sama dalam 5 menit akan diabaikan
.message_deduplication_id(&format!("{}-{}", event.tipe, event.id))
.send()
.await?;
Ok(hasil.message_id().unwrap_or("").to_string())
}
// Batch send — kirim hingga 10 pesan sekaligus (hemat biaya)
async fn kirim_batch(
client: &Client,
queue_url: &str,
events: &[EventPemrosesan],
) -> Result<usize, Box<dyn std::error::Error>> {
use aws_sdk_sqs::types::SendMessageBatchRequestEntry;
let entries: Vec<SendMessageBatchRequestEntry> = events
.iter()
.enumerate()
.map(|(i, event)| {
SendMessageBatchRequestEntry::builder()
.id(i.to_string())
.message_body(serde_json::to_string(event).unwrap())
.build()
.unwrap()
})
.collect();
let hasil = client
.send_message_batch()
.queue_url(queue_url)
.set_entries(Some(entries))
.send()
.await?;
let berhasil = hasil.successful().len();
let gagal = hasil.failed().len();
if gagal > 0 {
for f in hasil.failed() {
eprintln!("Gagal kirim entry {}: {}", f.id(), f.message().unwrap_or("?"));
}
}
println!("Batch: {}/{} berhasil", berhasil, events.len());
Ok(berhasil)
}
Menerima dan Memproses Pesan #
use aws_sdk_sqs::{Client, types::Message};
async fn terima_dan_proses(
client: &Client,
queue_url: &str,
) -> Result<(), Box<dyn std::error::Error>> {
loop {
// Long polling: tunggu hingga 20 detik jika queue kosong
// Lebih hemat biaya dari short polling yang terus-menerus
let hasil = client
.receive_message()
.queue_url(queue_url)
.max_number_of_messages(10) // maks 10 pesan per request
.wait_time_seconds(20) // long polling
.visibility_timeout(30) // 30 detik tidak terlihat consumer lain
.message_attribute_names("All") // ambil semua message attributes
.send()
.await?;
let pesan_list = hasil.messages();
if pesan_list.is_empty() {
println!("Queue kosong, menunggu...");
continue;
}
for pesan in pesan_list {
match proses_satu_pesan(client, queue_url, pesan).await {
Ok(_) => println!("Pesan berhasil diproses"),
Err(e) => eprintln!("Error memproses pesan: {}", e),
}
}
}
}
async fn proses_satu_pesan(
client: &Client,
queue_url: &str,
pesan: &Message,
) -> Result<(), Box<dyn std::error::Error>> {
let receipt_handle = pesan.receipt_handle().unwrap_or("");
let body = pesan.body().unwrap_or("{}");
println!("Message ID: {}", pesan.message_id().unwrap_or("?"));
// Baca message attributes
if let Some(attrs) = pesan.message_attributes() {
if let Some(event_type) = attrs.get("EventType") {
println!("EventType: {}", event_type.string_value().unwrap_or("?"));
}
}
// Parse body
let event: EventPemrosesan = serde_json::from_str(body)?;
println!("Memproses event: {} #{}", event.tipe, event.id);
// Proses event...
jalankan_logika_bisnis(&event).await?;
// Delete pesan setelah berhasil diproses
// PENTING: jika tidak didelete, pesan akan muncul kembali setelah visibility timeout
client
.delete_message()
.queue_url(queue_url)
.receipt_handle(receipt_handle)
.send()
.await?;
println!("Pesan dihapus dari queue");
Ok(())
}
async fn jalankan_logika_bisnis(event: &EventPemrosesan) -> Result<(), String> {
println!("Menjalankan logika untuk: {}", event.tipe);
Ok(())
}
// Batch delete — hemat API call saat berhasil memproses banyak pesan
async fn hapus_batch(
client: &Client,
queue_url: &str,
pesan_list: &[Message],
) -> Result<(), aws_sdk_sqs::Error> {
use aws_sdk_sqs::types::DeleteMessageBatchRequestEntry;
let entries: Vec<DeleteMessageBatchRequestEntry> = pesan_list
.iter()
.enumerate()
.filter_map(|(i, m)| {
m.receipt_handle().map(|rh| {
DeleteMessageBatchRequestEntry::builder()
.id(i.to_string())
.receipt_handle(rh)
.build()
.unwrap()
})
})
.collect();
client
.delete_message_batch()
.queue_url(queue_url)
.set_entries(Some(entries))
.send()
.await?;
println!("{} pesan dihapus", pesan_list.len());
Ok(())
}
Visibility Timeout dan Perpanjangan #
use aws_sdk_sqs::Client;
// Perpanjang visibility timeout untuk pesan yang butuh waktu lebih lama diproses
async fn perpanjang_visibility(
client: &Client,
queue_url: &str,
receipt_handle: &str,
tambahan_detik: i32,
) -> Result<(), aws_sdk_sqs::Error> {
client
.change_message_visibility()
.queue_url(queue_url)
.receipt_handle(receipt_handle)
.visibility_timeout(tambahan_detik)
.send()
.await?;
println!("Visibility timeout diperpanjang {} detik", tambahan_detik);
Ok(())
}
// Worker yang memproses pesan panjang dan memperpanjang visibility secara berkala
async fn proses_pesan_panjang(
client: std::sync::Arc<Client>,
queue_url: String,
pesan: Message,
) {
let receipt_handle = pesan.receipt_handle().unwrap_or("").to_string();
// Spawn task untuk perpanjang visibility setiap 25 detik (sebelum timeout 30 detik)
let client_clone = std::sync::Arc::clone(&client);
let queue_clone = queue_url.clone();
let handle_clone = receipt_handle.clone();
let keepalive = tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(25)).await;
let _ = perpanjang_visibility(&client_clone, &queue_clone, &handle_clone, 30).await;
}
});
// Proses pesan yang membutuhkan waktu lama
tokio::time::sleep(std::time::Duration::from_secs(60)).await; // simulasi 60 detik
println!("Pesan selesai diproses");
// Stop keepalive
keepalive.abort();
// Hapus pesan
let _ = client
.delete_message()
.queue_url(&queue_url)
.receipt_handle(&receipt_handle)
.send()
.await;
}
SQS dengan SNS Fan-Out #
Pola umum di AWS: SNS menerima event dan meneruskan ke banyak SQS queue sekaligus:
flowchart LR
P["Producer"] --> SNS["SNS Topic\npesanan-events"]
SNS --> Q1["SQS Queue\npemrosesan-pesanan\n(ECS task)"]
SNS --> Q2["SQS Queue\nnotifikasi-email\n(Lambda)"]
SNS --> Q3["SQS Queue\nupdate-inventori\n(ECS task)"]
SNS --> Q4["SQS Queue\nanalytics\n(Kinesis Firehose)"]Konfigurasi di Terraform/CDK (bukan kode Rust, tapi penting dipahami):
# Terraform: SNS topic + SQS queue + subscription
resource "aws_sns_topic" "pesanan_events" {
name = "pesanan-events"
}
resource "aws_sqs_queue" "pemrosesan" {
name = "pemrosesan-pesanan"
visibility_timeout_seconds = 30
message_retention_seconds = 86400
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.pemrosesan_dlq.arn
maxReceiveCount = 3
})
}
resource "aws_sns_topic_subscription" "pemrosesan_sub" {
topic_arn = aws_sns_topic.pesanan_events.arn
protocol = "sqs"
endpoint = aws_sqs_queue.pemrosesan.arn
}
Pesan dari SNS punya wrapper di SQS — perlu di-unwrap:
#[derive(Debug, serde::Deserialize)]
struct SnsSqsWrapper {
#[serde(rename = "Type")]
tipe: String,
#[serde(rename = "Message")]
pesan: String, // JSON string dari SNS
#[serde(rename = "MessageId")]
message_id: String,
#[serde(rename = "TopicArn")]
topic_arn: String,
}
async fn proses_pesan_dari_sns(body: &str) -> Result<(), Box<dyn std::error::Error>> {
// Pesan dari SNS punya wrapper
if let Ok(wrapper) = serde_json::from_str::<SnsSqsWrapper>(body) {
if wrapper.tipe == "Notification" {
// Unwrap: pesan asli ada di field "Message"
let event: EventPemrosesan = serde_json::from_str(&wrapper.pesan)?;
println!("Event dari SNS via SQS: {} #{}", event.tipe, event.id);
}
} else {
// Pesan langsung (bukan dari SNS)
let event: EventPemrosesan = serde_json::from_str(body)?;
println!("Event langsung: {} #{}", event.tipe, event.id);
}
Ok(())
}
Perbandingan SQS, RabbitMQ, dan Kafka #
Aspek Amazon SQS RabbitMQ Kafka
Manajemen infrastruktur
Fully managed Ya Tidak Tidak
Setup Menit Jam Hari
Scaling Otomatis Manual Semi-manual
Delivery guarantee
At-least-once Standard queue Dengan ack Ya
Exactly-once FIFO queue Tidak native Dengan transaksi
At-most-once Tidak Dengan auto-ack Tidak
Replay / history
Bisa baca ulang Tidak Tidak Ya (offset)
Retensi Maks 14 hari Sampai di-consume Dapat dikonfigurasi
Throughput
Maks Tak terbatas ~100k/detik/node Jutaan/detik
Harga model
Per request Ya ($0.40/1M msg) Self-host Self-host / Confluent
Kapan SQS paling tepat:
✓ Sudah di ekosistem AWS (Lambda, ECS, EC2)
✓ Tidak mau kelola infrastructure message broker
✓ Kebutuhan sederhana: kirim pesan, proses, hapus
✓ Scaling otomatis tanpa konfigurasi
✓ Integrasi langsung dengan SNS, Lambda, S3 Events
Ringkasan #
- Standard vs FIFO — Standard untuk kebanyakan kasus (throughput tinggi, at-least-once); FIFO jika urutan dan exactly-once penting tapi throughput lebih rendah.
- Long polling dengan
wait_time_seconds(20)— jauh lebih hemat biaya dari short polling; tunggu hingga 20 detik sebelum return jika queue kosong.- Delete pesan setelah berhasil diproses — jika tidak dihapus, pesan akan muncul kembali setelah visibility timeout. Ini fitur “at-least-once delivery” bawaan SQS.
- Visibility timeout — waktu di mana pesan “tidak terlihat” consumer lain setelah diterima. Set lebih lama dari waktu proses maksimal yang diharapkan.
- Perpanjang visibility timeout untuk proses panjang — spawn background task yang memanggil
change_message_visibilitysetiap N detik agar pesan tidak muncul kembali di tengah pemrosesan.- Batch operations hemat biaya —
send_message_batchdandelete_message_batchkirim/hapus hingga 10 pesan per API call, mengurangi biaya hingga 10x.- Dead Letter Queue via Redrive Policy — pesan yang gagal diproses N kali otomatis dipindahkan ke DLQ. Set
maxReceiveCountsesuai retry yang reasonable.- SNS + SQS fan-out pattern — satu event bisa diteruskan ke banyak queue berbeda via SNS subscription. Pesan SNS punya wrapper JSON — ingat untuk unwrap field
Message.- FIFO membutuhkan
MessageGroupId— semua pesan dalam group yang sama diproses berurutan. Gunakan ID entitas (user ID, order ID) sebagai group ID.