Amazon SQS

Amazon SQS #

Amazon Simple Queue Service (SQS) adalah layanan message queue terkelola sepenuhnya dari AWS. Tidak ada server yang perlu dikelola, tidak ada cluster yang perlu dikonfigurasi — kamu tinggal buat queue, kirim pesan, dan konsumsi. SQS sangat cocok untuk decoupling komponen dalam arsitektur cloud AWS: Lambda yang dipicu SQS, ECS task yang memproses dari antrian, atau microservice yang berkomunikasi tanpa direct coupling. Di Rust, AWS SDK resmi (aws-sdk-sqs) menyediakan akses async yang lengkap. Artikel ini membahas kedua tipe queue (Standard dan FIFO), operasi lengkap, dan pola arsitektur yang umum di ekosistem AWS.

Dua Tipe Queue SQS #

flowchart LR
    subgraph Standard Queue
        S["Standard\nAt-least-once delivery\nUrutan tidak terjamin\nThroughput tak terbatas\nHarga lebih murah"]
    end

    subgraph FIFO Queue
        F["FIFO\nExactly-once delivery\nUrutan terjamin per group\nMaks 3000 msg/detik\nHarga lebih mahal"]
    end
AspekStandard QueueFIFO Queue
UrutanBest-effortStrict per Message Group ID
DeliveryAt-least-once (bisa duplikat)Exactly-once
ThroughputTak terbatas3.000 msg/detik (high throughput: 70.000)
HargaLebih murah~10x lebih mahal
Nama queueBebasHarus diakhiri .fifo
KapanDefault untuk kebanyakan kasusUrutan dan deduplication penting

Instalasi #

[dependencies]
aws-config = { version = "1", features = ["behavior-version-latest"] }
aws-sdk-sqs = "1"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"

Setup Client #

use aws_sdk_sqs::Client;

async fn buat_client() -> Client {
    // Baca konfigurasi dari environment:
    // AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION
    // atau dari ~/.aws/credentials jika di lokal
    let config = aws_config::load_from_env().await;
    Client::new(&config)
}

// Untuk development lokal dengan LocalStack
async fn buat_client_lokal() -> Client {
    use aws_sdk_sqs::config::Builder;
    use aws_types::region::Region;

    let config = Builder::new()
        .endpoint_url("http://localhost:4566")  // LocalStack endpoint
        .region(Region::new("us-east-1"))
        .credentials_provider(aws_credential_types::provider::SharedCredentialsProvider::new(
            aws_credential_types::Credentials::new(
                "test", "test", None, None, "test"
            )
        ))
        .build();

    Client::from_conf(config)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = buat_client().await;

    // List queue yang ada
    let hasil = client.list_queues().send().await?;
    println!("Queue yang ada:");
    for url in hasil.queue_urls() {
        println!("  - {}", url);
    }

    Ok(())
}

Manajemen Queue #

use aws_sdk_sqs::{Client, types::QueueAttributeName};
use std::collections::HashMap;

async fn buat_standard_queue(
    client: &Client,
    nama: &str,
) -> Result<String, aws_sdk_sqs::Error> {
    let mut atribut = HashMap::new();
    // Visibility timeout: pesan tidak terlihat selama N detik setelah diterima
    atribut.insert(QueueAttributeName::VisibilityTimeout, "30".to_string());
    // Message retention: berapa lama pesan disimpan (default 4 hari)
    atribut.insert(QueueAttributeName::MessageRetentionPeriod, "86400".to_string()); // 1 hari
    // Long polling: tunggu hingga N detik untuk pesan (hemat biaya)
    atribut.insert(QueueAttributeName::ReceiveMessageWaitTimeSeconds, "20".to_string());

    let hasil = client
        .create_queue()
        .queue_name(nama)
        .set_attributes(Some(atribut))
        .send()
        .await?;

    let url = hasil.queue_url().unwrap_or("").to_string();
    println!("Standard queue dibuat: {}", url);
    Ok(url)
}

async fn buat_fifo_queue(
    client: &Client,
    nama: &str,  // harus diakhiri .fifo
) -> Result<String, aws_sdk_sqs::Error> {
    let mut atribut = HashMap::new();
    atribut.insert(QueueAttributeName::FifoQueue, "true".to_string());
    // Content-based deduplication: hash body pesan sebagai dedup ID
    atribut.insert(QueueAttributeName::ContentBasedDeduplication, "true".to_string());
    atribut.insert(QueueAttributeName::VisibilityTimeout, "60".to_string());

    let hasil = client
        .create_queue()
        .queue_name(nama)  // contoh: "pesanan-prioritas.fifo"
        .set_attributes(Some(atribut))
        .send()
        .await?;

    let url = hasil.queue_url().unwrap_or("").to_string();
    println!("FIFO queue dibuat: {}", url);
    Ok(url)
}

async fn buat_queue_dengan_dlq(
    client: &Client,
    nama_utama: &str,
    maks_receive: i32,
) -> Result<String, Box<dyn std::error::Error>> {
    // 1. Buat DLQ terlebih dahulu
    let dlq = client
        .create_queue()
        .queue_name(&format!("{}-dlq", nama_utama))
        .send()
        .await?;
    let dlq_url = dlq.queue_url().unwrap_or("").to_string();

    // 2. Ambil ARN dari DLQ
    let dlq_attrs = client
        .get_queue_attributes()
        .queue_url(&dlq_url)
        .attribute_names(QueueAttributeName::QueueArn)
        .send()
        .await?;
    let dlq_arn = dlq_attrs
        .attributes()
        .and_then(|a| a.get(&QueueAttributeName::QueueArn))
        .cloned()
        .unwrap_or_default();

    // 3. Buat queue utama dengan Redrive Policy ke DLQ
    let redrive_policy = serde_json::json!({
        "deadLetterTargetArn": dlq_arn,
        "maxReceiveCount": maks_receive  // pindah ke DLQ setelah N kali gagal
    })
    .to_string();

    let mut atribut = HashMap::new();
    atribut.insert(
        QueueAttributeName::RedrivePolicy,
        redrive_policy,
    );
    atribut.insert(QueueAttributeName::VisibilityTimeout, "30".to_string());

    let hasil = client
        .create_queue()
        .queue_name(nama_utama)
        .set_attributes(Some(atribut))
        .send()
        .await?;

    let url = hasil.queue_url().unwrap_or("").to_string();
    println!("Queue '{}' dengan DLQ berhasil dibuat", nama_utama);
    Ok(url)
}

Mengirim Pesan #

use aws_sdk_sqs::{Client, types::MessageAttributeValue};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
struct EventPemrosesan {
    pub id: u64,
    pub tipe: String,
    pub data: serde_json::Value,
    pub dibuat_pada: String,
}

async fn kirim_pesan(
    client: &Client,
    queue_url: &str,
    event: &EventPemrosesan,
) -> Result<String, aws_sdk_sqs::Error> {
    let body = serde_json::to_string(event).unwrap();

    let hasil = client
        .send_message()
        .queue_url(queue_url)
        .message_body(&body)
        // Delay pengiriman: pesan tidak terlihat selama N detik
        .delay_seconds(0)
        // Message attributes: metadata tambahan yang bisa difilter
        .message_attributes(
            "EventType",
            MessageAttributeValue::builder()
                .data_type("String")
                .string_value(&event.tipe)
                .build()
                .unwrap(),
        )
        .message_attributes(
            "Source",
            MessageAttributeValue::builder()
                .data_type("String")
                .string_value("api-server")
                .build()
                .unwrap(),
        )
        .send()
        .await?;

    let msg_id = hasil.message_id().unwrap_or("").to_string();
    println!("Pesan terkirim: {}", msg_id);
    Ok(msg_id)
}

// Kirim ke FIFO queue — butuh MessageGroupId dan MessageDeduplicationId
async fn kirim_ke_fifo(
    client: &Client,
    queue_url: &str,
    event: &EventPemrosesan,
    group_id: &str,  // semua pesan dalam group yang sama diproses berurutan
) -> Result<String, aws_sdk_sqs::Error> {
    let body = serde_json::to_string(event).unwrap();

    let hasil = client
        .send_message()
        .queue_url(queue_url)
        .message_body(&body)
        .message_group_id(group_id)
        // Deduplication ID: pesan dengan ID sama dalam 5 menit akan diabaikan
        .message_deduplication_id(&format!("{}-{}", event.tipe, event.id))
        .send()
        .await?;

    Ok(hasil.message_id().unwrap_or("").to_string())
}

// Batch send — kirim hingga 10 pesan sekaligus (hemat biaya)
async fn kirim_batch(
    client: &Client,
    queue_url: &str,
    events: &[EventPemrosesan],
) -> Result<usize, Box<dyn std::error::Error>> {
    use aws_sdk_sqs::types::SendMessageBatchRequestEntry;

    let entries: Vec<SendMessageBatchRequestEntry> = events
        .iter()
        .enumerate()
        .map(|(i, event)| {
            SendMessageBatchRequestEntry::builder()
                .id(i.to_string())
                .message_body(serde_json::to_string(event).unwrap())
                .build()
                .unwrap()
        })
        .collect();

    let hasil = client
        .send_message_batch()
        .queue_url(queue_url)
        .set_entries(Some(entries))
        .send()
        .await?;

    let berhasil = hasil.successful().len();
    let gagal = hasil.failed().len();

    if gagal > 0 {
        for f in hasil.failed() {
            eprintln!("Gagal kirim entry {}: {}", f.id(), f.message().unwrap_or("?"));
        }
    }

    println!("Batch: {}/{} berhasil", berhasil, events.len());
    Ok(berhasil)
}

Menerima dan Memproses Pesan #

use aws_sdk_sqs::{Client, types::Message};

async fn terima_dan_proses(
    client: &Client,
    queue_url: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    loop {
        // Long polling: tunggu hingga 20 detik jika queue kosong
        // Lebih hemat biaya dari short polling yang terus-menerus
        let hasil = client
            .receive_message()
            .queue_url(queue_url)
            .max_number_of_messages(10)    // maks 10 pesan per request
            .wait_time_seconds(20)          // long polling
            .visibility_timeout(30)         // 30 detik tidak terlihat consumer lain
            .message_attribute_names("All") // ambil semua message attributes
            .send()
            .await?;

        let pesan_list = hasil.messages();

        if pesan_list.is_empty() {
            println!("Queue kosong, menunggu...");
            continue;
        }

        for pesan in pesan_list {
            match proses_satu_pesan(client, queue_url, pesan).await {
                Ok(_) => println!("Pesan berhasil diproses"),
                Err(e) => eprintln!("Error memproses pesan: {}", e),
            }
        }
    }
}

async fn proses_satu_pesan(
    client: &Client,
    queue_url: &str,
    pesan: &Message,
) -> Result<(), Box<dyn std::error::Error>> {
    let receipt_handle = pesan.receipt_handle().unwrap_or("");
    let body = pesan.body().unwrap_or("{}");

    println!("Message ID: {}", pesan.message_id().unwrap_or("?"));

    // Baca message attributes
    if let Some(attrs) = pesan.message_attributes() {
        if let Some(event_type) = attrs.get("EventType") {
            println!("EventType: {}", event_type.string_value().unwrap_or("?"));
        }
    }

    // Parse body
    let event: EventPemrosesan = serde_json::from_str(body)?;
    println!("Memproses event: {} #{}", event.tipe, event.id);

    // Proses event...
    jalankan_logika_bisnis(&event).await?;

    // Delete pesan setelah berhasil diproses
    // PENTING: jika tidak didelete, pesan akan muncul kembali setelah visibility timeout
    client
        .delete_message()
        .queue_url(queue_url)
        .receipt_handle(receipt_handle)
        .send()
        .await?;

    println!("Pesan dihapus dari queue");
    Ok(())
}

async fn jalankan_logika_bisnis(event: &EventPemrosesan) -> Result<(), String> {
    println!("Menjalankan logika untuk: {}", event.tipe);
    Ok(())
}

// Batch delete — hemat API call saat berhasil memproses banyak pesan
async fn hapus_batch(
    client: &Client,
    queue_url: &str,
    pesan_list: &[Message],
) -> Result<(), aws_sdk_sqs::Error> {
    use aws_sdk_sqs::types::DeleteMessageBatchRequestEntry;

    let entries: Vec<DeleteMessageBatchRequestEntry> = pesan_list
        .iter()
        .enumerate()
        .filter_map(|(i, m)| {
            m.receipt_handle().map(|rh| {
                DeleteMessageBatchRequestEntry::builder()
                    .id(i.to_string())
                    .receipt_handle(rh)
                    .build()
                    .unwrap()
            })
        })
        .collect();

    client
        .delete_message_batch()
        .queue_url(queue_url)
        .set_entries(Some(entries))
        .send()
        .await?;

    println!("{} pesan dihapus", pesan_list.len());
    Ok(())
}

Visibility Timeout dan Perpanjangan #

use aws_sdk_sqs::Client;

// Perpanjang visibility timeout untuk pesan yang butuh waktu lebih lama diproses
async fn perpanjang_visibility(
    client: &Client,
    queue_url: &str,
    receipt_handle: &str,
    tambahan_detik: i32,
) -> Result<(), aws_sdk_sqs::Error> {
    client
        .change_message_visibility()
        .queue_url(queue_url)
        .receipt_handle(receipt_handle)
        .visibility_timeout(tambahan_detik)
        .send()
        .await?;

    println!("Visibility timeout diperpanjang {} detik", tambahan_detik);
    Ok(())
}

// Worker yang memproses pesan panjang dan memperpanjang visibility secara berkala
async fn proses_pesan_panjang(
    client: std::sync::Arc<Client>,
    queue_url: String,
    pesan: Message,
) {
    let receipt_handle = pesan.receipt_handle().unwrap_or("").to_string();

    // Spawn task untuk perpanjang visibility setiap 25 detik (sebelum timeout 30 detik)
    let client_clone = std::sync::Arc::clone(&client);
    let queue_clone = queue_url.clone();
    let handle_clone = receipt_handle.clone();

    let keepalive = tokio::spawn(async move {
        loop {
            tokio::time::sleep(std::time::Duration::from_secs(25)).await;
            let _ = perpanjang_visibility(&client_clone, &queue_clone, &handle_clone, 30).await;
        }
    });

    // Proses pesan yang membutuhkan waktu lama
    tokio::time::sleep(std::time::Duration::from_secs(60)).await; // simulasi 60 detik
    println!("Pesan selesai diproses");

    // Stop keepalive
    keepalive.abort();

    // Hapus pesan
    let _ = client
        .delete_message()
        .queue_url(&queue_url)
        .receipt_handle(&receipt_handle)
        .send()
        .await;
}

SQS dengan SNS Fan-Out #

Pola umum di AWS: SNS menerima event dan meneruskan ke banyak SQS queue sekaligus:

flowchart LR
    P["Producer"] --> SNS["SNS Topic\npesanan-events"]
    SNS --> Q1["SQS Queue\npemrosesan-pesanan\n(ECS task)"]
    SNS --> Q2["SQS Queue\nnotifikasi-email\n(Lambda)"]
    SNS --> Q3["SQS Queue\nupdate-inventori\n(ECS task)"]
    SNS --> Q4["SQS Queue\nanalytics\n(Kinesis Firehose)"]

Konfigurasi di Terraform/CDK (bukan kode Rust, tapi penting dipahami):

# Terraform: SNS topic + SQS queue + subscription

resource "aws_sns_topic" "pesanan_events" {
  name = "pesanan-events"
}

resource "aws_sqs_queue" "pemrosesan" {
  name                       = "pemrosesan-pesanan"
  visibility_timeout_seconds = 30
  message_retention_seconds  = 86400
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.pemrosesan_dlq.arn
    maxReceiveCount     = 3
  })
}

resource "aws_sns_topic_subscription" "pemrosesan_sub" {
  topic_arn = aws_sns_topic.pesanan_events.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.pemrosesan.arn
}

Pesan dari SNS punya wrapper di SQS — perlu di-unwrap:

#[derive(Debug, serde::Deserialize)]
struct SnsSqsWrapper {
    #[serde(rename = "Type")]
    tipe: String,
    #[serde(rename = "Message")]
    pesan: String,  // JSON string dari SNS
    #[serde(rename = "MessageId")]
    message_id: String,
    #[serde(rename = "TopicArn")]
    topic_arn: String,
}

async fn proses_pesan_dari_sns(body: &str) -> Result<(), Box<dyn std::error::Error>> {
    // Pesan dari SNS punya wrapper
    if let Ok(wrapper) = serde_json::from_str::<SnsSqsWrapper>(body) {
        if wrapper.tipe == "Notification" {
            // Unwrap: pesan asli ada di field "Message"
            let event: EventPemrosesan = serde_json::from_str(&wrapper.pesan)?;
            println!("Event dari SNS via SQS: {} #{}", event.tipe, event.id);
        }
    } else {
        // Pesan langsung (bukan dari SNS)
        let event: EventPemrosesan = serde_json::from_str(body)?;
        println!("Event langsung: {} #{}", event.tipe, event.id);
    }
    Ok(())
}

Perbandingan SQS, RabbitMQ, dan Kafka #

Aspek                   Amazon SQS          RabbitMQ            Kafka

Manajemen infrastruktur
  Fully managed         Ya                  Tidak               Tidak
  Setup                 Menit               Jam                 Hari
  Scaling               Otomatis            Manual              Semi-manual

Delivery guarantee
  At-least-once         Standard queue      Dengan ack          Ya
  Exactly-once          FIFO queue          Tidak native        Dengan transaksi
  At-most-once          Tidak               Dengan auto-ack     Tidak

Replay / history
  Bisa baca ulang       Tidak               Tidak               Ya (offset)
  Retensi               Maks 14 hari        Sampai di-consume   Dapat dikonfigurasi

Throughput
  Maks                  Tak terbatas        ~100k/detik/node    Jutaan/detik

Harga model
  Per request           Ya ($0.40/1M msg)  Self-host           Self-host / Confluent

Kapan SQS paling tepat:
  ✓ Sudah di ekosistem AWS (Lambda, ECS, EC2)
  ✓ Tidak mau kelola infrastructure message broker
  ✓ Kebutuhan sederhana: kirim pesan, proses, hapus
  ✓ Scaling otomatis tanpa konfigurasi
  ✓ Integrasi langsung dengan SNS, Lambda, S3 Events

Ringkasan #

  • Standard vs FIFO — Standard untuk kebanyakan kasus (throughput tinggi, at-least-once); FIFO jika urutan dan exactly-once penting tapi throughput lebih rendah.
  • Long polling dengan wait_time_seconds(20) — jauh lebih hemat biaya dari short polling; tunggu hingga 20 detik sebelum return jika queue kosong.
  • Delete pesan setelah berhasil diproses — jika tidak dihapus, pesan akan muncul kembali setelah visibility timeout. Ini fitur “at-least-once delivery” bawaan SQS.
  • Visibility timeout — waktu di mana pesan “tidak terlihat” consumer lain setelah diterima. Set lebih lama dari waktu proses maksimal yang diharapkan.
  • Perpanjang visibility timeout untuk proses panjang — spawn background task yang memanggil change_message_visibility setiap N detik agar pesan tidak muncul kembali di tengah pemrosesan.
  • Batch operations hemat biayasend_message_batch dan delete_message_batch kirim/hapus hingga 10 pesan per API call, mengurangi biaya hingga 10x.
  • Dead Letter Queue via Redrive Policy — pesan yang gagal diproses N kali otomatis dipindahkan ke DLQ. Set maxReceiveCount sesuai retry yang reasonable.
  • SNS + SQS fan-out pattern — satu event bisa diteruskan ke banyak queue berbeda via SNS subscription. Pesan SNS punya wrapper JSON — ingat untuk unwrap field Message.
  • FIFO membutuhkan MessageGroupId — semua pesan dalam group yang sama diproses berurutan. Gunakan ID entitas (user ID, order ID) sebagai group ID.

← Sebelumnya: RabbitMQ   Berikutnya: Google Pub/Sub →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact