Redis

Redis #

Redis adalah in-memory data store yang bisa berfungsi sebagai cache, message broker, session store, rate limiter, dan distributed lock sekaligus. Keunggulannya terletak pada kecepatan ekstrem (ratusan ribu operasi per detik) dan struktur data yang kaya jauh di atas key-value biasa — string, hash, list, set, sorted set, stream, dan lainnya. Di Rust, crate redis adalah driver Redis yang paling banyak digunakan, dan deadpool-redis menambahkan connection pool async di atasnya. Artikel ini membahas semua tipe data Redis, pola-pola caching yang umum, distributed lock, rate limiting, dan Pub/Sub ringan sebagai alternatif message broker.

Instalasi #

[dependencies]
redis = { version = "0.25", features = ["tokio-comp", "connection-manager"] }
deadpool-redis = "0.15"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"

Koneksi dan Connection Pool #

use deadpool_redis::{Config, Pool, Runtime};
use redis::AsyncCommands;

fn buat_pool(url: &str) -> Pool {
    let cfg = Config::from_url(url);
    cfg.create_pool(Some(Runtime::Tokio1))
        .expect("Gagal membuat Redis pool")
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let pool = buat_pool("redis://localhost:6379");

    // Ambil koneksi dari pool
    let mut conn = pool.get().await?;

    // Ping untuk verifikasi
    let pong: String = redis::cmd("PING").query_async(&mut conn).await?;
    println!("Redis: {}", pong);  // PONG

    Ok(())
}

String — Tipe Data Dasar #

use redis::AsyncCommands;

async fn contoh_string(conn: &mut deadpool_redis::Connection)
    -> Result<(), Box<dyn std::error::Error>>
{
    // SET dan GET dasar
    conn.set("kunci", "nilai").await?;
    let nilai: String = conn.get("kunci").await?;
    println!("GET: {}", nilai);

    // SET dengan TTL (expire in seconds)
    conn.set_ex("sesi:user-42", "data-sesi", 3600).await?;  // 1 jam

    // SET hanya jika belum ada (SET NX) — untuk distributed lock sederhana
    let berhasil: bool = conn.set_nx("lock:resource-1", "owner-1").await?;
    println!("Lock berhasil: {}", berhasil);

    // INCR / DECR — atomik tanpa race condition
    conn.set("counter:halaman", 0i64).await?;
    let baru: i64 = conn.incr("counter:halaman", 1).await?;
    println!("Counter: {}", baru);

    // MSET / MGET — batch operation
    conn.set_multiple(&[
        ("user:1:nama", "Budi"),
        ("user:2:nama", "Sari"),
        ("user:3:nama", "Joko"),
    ]).await?;

    let nama_list: Vec<Option<String>> = conn.get(vec![
        "user:1:nama", "user:2:nama", "user:3:nama", "user:99:nama"
    ]).await?;
    println!("{:?}", nama_list);  // [Some("Budi"), Some("Sari"), Some("Joko"), None]

    // TTL — cek sisa waktu hidup kunci
    let ttl: i64 = conn.ttl("sesi:user-42").await?;
    println!("TTL sesi: {} detik", ttl);

    // EXPIRE — set TTL pada kunci yang sudah ada
    conn.expire("kunci", 60).await?;

    Ok(())
}

Hash — Objek Terstruktur #

Hash cocok untuk menyimpan objek dengan banyak field — lebih efisien dari menyimpan JSON string karena bisa update field individual:

use redis::AsyncCommands;

async fn contoh_hash(conn: &mut deadpool_redis::Connection)
    -> Result<(), Box<dyn std::error::Error>>
{
    let kunci = "pengguna:1001";

    // HSET — set satu atau banyak field
    conn.hset_multiple(kunci, &[
        ("nama", "Budi Santoso"),
        ("email", "[email protected]"),
        ("peran", "admin"),
        ("aktif", "true"),
    ]).await?;

    // HGET — ambil satu field
    let nama: String = conn.hget(kunci, "nama").await?;
    println!("Nama: {}", nama);

    // HMGET — ambil banyak field sekaligus
    let data: Vec<Option<String>> = conn.hget(kunci, vec!["nama", "email", "peran"]).await?;
    println!("{:?}", data);

    // HGETALL — ambil semua field dan nilai
    let semua: std::collections::HashMap<String, String> = conn.hgetall(kunci).await?;
    println!("Semua field: {:?}", semua);

    // HINCRBY — increment field numerik secara atomik
    conn.hset(kunci, "skor", "0").await?;
    let skor: i64 = conn.hincr(kunci, "skor", 10).await?;
    println!("Skor: {}", skor);

    // HEXISTS — cek keberadaan field
    let ada: bool = conn.hexists(kunci, "email").await?;
    println!("Email ada: {}", ada);

    // HDEL — hapus field
    conn.hdel(kunci, "aktif").await?;

    // Set TTL pada hash
    conn.expire(kunci, 3600).await?;

    Ok(())
}

List — Antrian dan Stack #

use redis::AsyncCommands;

async fn contoh_list(conn: &mut deadpool_redis::Connection)
    -> Result<(), Box<dyn std::error::Error>>
{
    let kunci = "antrian:email";

    // RPUSH — tambah di kanan (untuk antrian FIFO)
    conn.rpush(kunci, "email-1").await?;
    conn.rpush(kunci, "email-2").await?;
    conn.rpush(kunci, "email-3").await?;

    // LPUSH — tambah di kiri (untuk stack LIFO)
    conn.lpush("stack:undo", "aksi-1").await?;
    conn.lpush("stack:undo", "aksi-2").await?;

    // LLEN — panjang list
    let panjang: i64 = conn.llen(kunci).await?;
    println!("Panjang antrian: {}", panjang);

    // LPOP — ambil dari kiri (FIFO: ambil yang pertama masuk)
    let item: Option<String> = conn.lpop(kunci, None).await?;
    println!("Diambil: {:?}", item);

    // RPOP — ambil dari kanan (LIFO/stack)
    let top: Option<String> = conn.rpop("stack:undo", None).await?;
    println!("Stack pop: {:?}", top);

    // LRANGE — baca semua elemen tanpa menghapus
    let semua: Vec<String> = conn.lrange(kunci, 0, -1).await?;
    println!("Semua: {:?}", semua);

    // BLPOP — blocking pop (tunggu hingga ada elemen)
    // Berguna untuk worker queue
    // let (_, item): (String, String) = conn.blpop(kunci, 5.0).await?;

    Ok(())
}

Set dan Sorted Set #

use redis::AsyncCommands;

async fn contoh_set(conn: &mut deadpool_redis::Connection)
    -> Result<(), Box<dyn std::error::Error>>
{
    // Set: kumpulan elemen unik tanpa duplikat
    conn.sadd("tag:artikel-1", vec!["rust", "backend", "performance"]).await?;
    conn.sadd("tag:artikel-2", vec!["rust", "webdev", "frontend"]).await?;

    // SMEMBERS — semua anggota
    let tag: std::collections::HashSet<String> = conn.smembers("tag:artikel-1").await?;
    println!("Tag: {:?}", tag);

    // SISMEMBER — cek keanggotaan
    let ada: bool = conn.sismember("tag:artikel-1", "rust").await?;
    println!("Ada 'rust': {}", ada);

    // Operasi himpunan
    let irisan: Vec<String> = conn.sinter(vec!["tag:artikel-1", "tag:artikel-2"]).await?;
    println!("Irisan: {:?}", irisan);  // ["rust"]

    let gabungan: Vec<String> = conn.sunion(vec!["tag:artikel-1", "tag:artikel-2"]).await?;
    println!("Gabungan: {:?}", gabungan);

    // Sorted Set: kumpulan elemen dengan skor (float)
    // Cocok untuk leaderboard, ranking, rate limiting dengan sliding window
    conn.zadd("leaderboard", "Alice", 1500.0_f64).await?;
    conn.zadd("leaderboard", "Bob", 1200.0_f64).await?;
    conn.zadd("leaderboard", "Charlie", 1800.0_f64).await?;
    conn.zadd("leaderboard", "Diana", 950.0_f64).await?;

    // ZRANGE — ambil berdasarkan rank (ascending)
    let top: Vec<String> = conn.zrange("leaderboard", 0, 2).await?;
    println!("3 terendah: {:?}", top);

    // ZREVRANGE — ambil berdasarkan rank (descending)
    let top3: Vec<String> = conn.zrevrange("leaderboard", 0, 2).await?;
    println!("Top 3: {:?}", top3);  // ["Charlie", "Alice", "Bob"]

    // ZRANK — posisi elemen (0-based, ascending)
    let rank: Option<i64> = conn.zrank("leaderboard", "Alice").await?;
    println!("Rank Alice: {:?}", rank);

    // ZSCORE — skor elemen
    let skor: Option<f64> = conn.zscore("leaderboard", "Charlie").await?;
    println!("Skor Charlie: {:?}", skor);

    // ZINCRBY — tambah skor secara atomik
    let skor_baru: f64 = conn.zincr("leaderboard", "Bob", 300.0_f64).await?;
    println!("Skor Bob baru: {}", skor_baru);

    Ok(())
}

Pipeline — Batch Command #

Pipeline mengurangi round-trip network dengan mengirim banyak perintah sekaligus:

use redis::{AsyncCommands, Pipeline};

async fn contoh_pipeline(
    pool: &deadpool_redis::Pool,
) -> Result<(), Box<dyn std::error::Error>> {
    let mut conn = pool.get().await?;

    // ANTI-PATTERN: banyak round-trip terpisah
    // for i in 0..100 {
    //     conn.set(format!("key:{}", i), i).await?;  // 100 round-trips!
    // }

    // BENAR: pipeline — satu round-trip untuk banyak command
    let mut pipe = redis::pipe();
    for i in 0..100 {
        pipe.set(format!("key:{}", i), i).ignore();
    }

    // Tambah beberapa GET sekaligus
    pipe.get("key:0").get("key:50").get("key:99");

    let hasil: (String, String, String) = pipe
        .query_async(&mut conn)
        .await?;

    println!("key:0={}, key:50={}, key:99={}", hasil.0, hasil.1, hasil.2);

    Ok(())
}

Pola Caching Umum #

Cache-Aside (Lazy Loading) #

use redis::AsyncCommands;
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
struct Produk {
    id: u64,
    nama: String,
    harga: f64,
}

// Simulasi database query yang lambat
async fn ambil_dari_db(id: u64) -> Produk {
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
    Produk { id, nama: format!("Produk {}", id), harga: id as f64 * 1000.0 }
}

async fn ambil_produk(
    pool: &deadpool_redis::Pool,
    id: u64,
) -> Result<Produk, Box<dyn std::error::Error>> {
    let kunci = format!("produk:{}", id);
    let mut conn = pool.get().await?;

    // 1. Cek cache
    let cached: Option<String> = conn.get(&kunci).await?;

    if let Some(data) = cached {
        println!("Cache HIT untuk produk {}", id);
        let produk: Produk = serde_json::from_str(&data)?;
        return Ok(produk);
    }

    // 2. Cache MISS — ambil dari database
    println!("Cache MISS untuk produk {}", id);
    let produk = ambil_dari_db(id).await;

    // 3. Simpan ke cache dengan TTL
    let json = serde_json::to_string(&produk)?;
    conn.set_ex(&kunci, &json, 300).await?;  // TTL 5 menit

    Ok(produk)
}

// Invalidasi cache saat data berubah
async fn perbarui_produk(
    pool: &deadpool_redis::Pool,
    id: u64,
    harga_baru: f64,
) -> Result<(), Box<dyn std::error::Error>> {
    // Update di database...

    // Hapus cache — akan di-refresh saat request berikutnya
    let mut conn = pool.get().await?;
    conn.del(format!("produk:{}", id)).await?;
    println!("Cache produk {} diinvalidasi", id);

    Ok(())
}

Distributed Lock #

use redis::AsyncCommands;
use uuid::Uuid;

struct RedisLock {
    pool: deadpool_redis::Pool,
    kunci: String,
    token: String,
}

impl RedisLock {
    async fn acquire(
        pool: &deadpool_redis::Pool,
        resource: &str,
        ttl_ms: u64,
    ) -> Option<RedisLock> {
        let kunci = format!("lock:{}", resource);
        let token = Uuid::new_v4().to_string();
        let mut conn = pool.get().await.ok()?;

        // SET NX PX — atomic: set jika belum ada, dengan TTL dalam milidetik
        let result: Option<String> = redis::cmd("SET")
            .arg(&kunci)
            .arg(&token)
            .arg("NX")
            .arg("PX")
            .arg(ttl_ms)
            .query_async(&mut conn)
            .await
            .ok()?;

        if result.is_some() {
            Some(RedisLock {
                pool: pool.clone(),
                kunci,
                token,
            })
        } else {
            None  // Lock sudah dipegang pihak lain
        }
    }

    async fn release(&self) -> bool {
        // Script Lua untuk release atomik — hanya hapus jika token cocok
        // Mencegah lock orang lain terhapus jika TTL kita sudah habis
        let script = redis::Script::new(r"
            if redis.call('GET', KEYS[1]) == ARGV[1] then
                return redis.call('DEL', KEYS[1])
            else
                return 0
            end
        ");

        let mut conn = match self.pool.get().await {
            Ok(c) => c,
            Err(_) => return false,
        };

        let result: i64 = script
            .key(&self.kunci)
            .arg(&self.token)
            .invoke_async(&mut conn)
            .await
            .unwrap_or(0);

        result == 1
    }
}

async fn dengan_lock<F, Fut, T>(
    pool: &deadpool_redis::Pool,
    resource: &str,
    ttl_ms: u64,
    f: F,
) -> Result<T, String>
where
    F: FnOnce() -> Fut,
    Fut: std::future::Future<Output = T>,
{
    let lock = RedisLock::acquire(pool, resource, ttl_ms)
        .await
        .ok_or_else(|| format!("Gagal mendapat lock untuk '{}'", resource))?;

    let hasil = f().await;
    lock.release().await;
    Ok(hasil)
}

Rate Limiting dengan Sliding Window #

use redis::AsyncCommands;

async fn cek_rate_limit(
    pool: &deadpool_redis::Pool,
    user_id: u64,
    maks_request: u64,
    window_detik: u64,
) -> Result<bool, Box<dyn std::error::Error>> {
    let kunci = format!("rate:{}:{}", user_id, chrono::Utc::now().timestamp() / window_detik as i64);
    let mut conn = pool.get().await?;

    // INCR atomik — hitung request dalam window saat ini
    let jumlah: u64 = conn.incr(&kunci, 1u64).await?;

    // Set TTL hanya pada request pertama dalam window
    if jumlah == 1 {
        conn.expire(&kunci, window_detik as i64 * 2).await?;
    }

    let diizinkan = jumlah <= maks_request;
    if !diizinkan {
        println!("Rate limit melebihi batas untuk user {}: {}/{}", user_id, jumlah, maks_request);
    }

    Ok(diizinkan)
}

// Rate limiting dengan Sorted Set (sliding window yang lebih akurat)
async fn rate_limit_sliding(
    pool: &deadpool_redis::Pool,
    identifier: &str,
    maks: u64,
    window_ms: u64,
) -> Result<bool, Box<dyn std::error::Error>> {
    let kunci = format!("sliding:{}", identifier);
    let sekarang = chrono::Utc::now().timestamp_millis() as f64;
    let window_mulai = sekarang - window_ms as f64;
    let mut conn = pool.get().await?;

    let mut pipe = redis::pipe();
    pipe.atomic()
        // Hapus entry yang sudah keluar dari window
        .cmd("ZREMRANGEBYSCORE").arg(&kunci).arg(0).arg(window_mulai).ignore()
        // Tambah request saat ini
        .cmd("ZADD").arg(&kunci).arg(sekarang).arg(sekarang).ignore()
        // Hitung request dalam window
        .cmd("ZCARD").arg(&kunci)
        // Set TTL
        .cmd("EXPIRE").arg(&kunci).arg((window_ms / 1000 + 1) as i64).ignore();

    let (jumlah,): (u64,) = pipe.query_async(&mut conn).await?;

    Ok(jumlah <= maks)
}

Pub/Sub — Messaging Ringan #

Redis Pub/Sub cocok untuk notifikasi real-time dalam satu datacenter — lebih sederhana dari Kafka/RabbitMQ tapi tanpa persistensi:

use redis::AsyncCommands;

async fn redis_pubsub(
    pool: &deadpool_redis::Pool,
) -> Result<(), Box<dyn std::error::Error>> {
    // Publisher — gunakan koneksi terpisah
    let publisher_pool = pool.clone();
    tokio::spawn(async move {
        let mut conn = publisher_pool.get().await.unwrap();
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;

        for i in 1..=5 {
            conn.publish("channel:notifikasi",
                serde_json::json!({"id": i, "pesan": format!("Notifikasi {}", i)}).to_string()
            ).await.unwrap();
            println!("Dikirim notifikasi {}", i);
            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
        }
    });

    // Subscriber — butuh koneksi dedicated (tidak bisa melakukan command lain)
    let mut conn = pool.get().await?;
    let mut pubsub = conn.as_pubsub();
    pubsub.subscribe("channel:notifikasi").await?;

    println!("Subscriber aktif, menunggu notifikasi...");

    for _ in 0..5 {
        let pesan = pubsub.on_message().next_message().await?;
        let payload: String = pesan.get_payload()?;
        println!("Diterima: {}", payload);
    }

    Ok(())
}

Ringkasan #

  • Selalu gunakan connection pooldeadpool-redis mengelola pool koneksi async. Satu koneksi per request sangat mahal; pool berbagi koneksi antar request.
  • Konvensi penamaan kunci dengan : — gunakan namespace seperti pengguna:1001, sesi:abc123, rate:user-42 untuk organisasi yang jelas dan bisa dikelola dengan SCAN.
  • set_ex untuk caching dengan TTL — selalu set TTL pada kunci cache agar memori tidak penuh dan data tidak stale selamanya.
  • Hash lebih efisien dari JSON string untuk objek — bisa update satu field tanpa serialize/deserialize ulang seluruh objek. Cocok untuk data yang sering diperbarui sebagian.
  • Pipeline untuk batch operasi — mengirim banyak command dalam satu round-trip jauh lebih efisien dari command terpisah. Hemat hingga 10x latensi untuk operasi bulk.
  • Distributed lock dengan SET NX PX — atomik dan otomatis expire. Selalu gunakan token unik (UUID) dan script Lua untuk release agar tidak menghapus lock orang lain.
  • Sorted Set untuk leaderboard dan rate limitingZADD + ZRANK + ZINCRBY untuk leaderboard real-time; ZREMRANGEBYSCORE + ZCARD untuk sliding window rate limiting.
  • Redis Pub/Sub untuk notifikasi in-process — lebih sederhana dari Kafka/RabbitMQ, tapi tanpa persistensi. Cocok untuk invalidasi cache antar instance atau notifikasi WebSocket.
  • Lua script untuk operasi atomik kompleks — beberapa perintah Redis yang perlu dieksekusi secara atomik bisa dibungkus dalam script Lua tanpa perlu transaksi penuh.

← Sebelumnya: Google Pub/Sub   Berikutnya: Memcached →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact