Redis #
Redis adalah in-memory data store yang bisa berfungsi sebagai cache, message broker, session store, rate limiter, dan distributed lock sekaligus. Keunggulannya terletak pada kecepatan ekstrem (ratusan ribu operasi per detik) dan struktur data yang kaya jauh di atas key-value biasa — string, hash, list, set, sorted set, stream, dan lainnya. Di Rust, crate redis adalah driver Redis yang paling banyak digunakan, dan deadpool-redis menambahkan connection pool async di atasnya. Artikel ini membahas semua tipe data Redis, pola-pola caching yang umum, distributed lock, rate limiting, dan Pub/Sub ringan sebagai alternatif message broker.
Instalasi #
[dependencies]
redis = { version = "0.25", features = ["tokio-comp", "connection-manager"] }
deadpool-redis = "0.15"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Koneksi dan Connection Pool #
use deadpool_redis::{Config, Pool, Runtime};
use redis::AsyncCommands;
fn buat_pool(url: &str) -> Pool {
let cfg = Config::from_url(url);
cfg.create_pool(Some(Runtime::Tokio1))
.expect("Gagal membuat Redis pool")
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = buat_pool("redis://localhost:6379");
// Ambil koneksi dari pool
let mut conn = pool.get().await?;
// Ping untuk verifikasi
let pong: String = redis::cmd("PING").query_async(&mut conn).await?;
println!("Redis: {}", pong); // PONG
Ok(())
}
String — Tipe Data Dasar #
use redis::AsyncCommands;
async fn contoh_string(conn: &mut deadpool_redis::Connection)
-> Result<(), Box<dyn std::error::Error>>
{
// SET dan GET dasar
conn.set("kunci", "nilai").await?;
let nilai: String = conn.get("kunci").await?;
println!("GET: {}", nilai);
// SET dengan TTL (expire in seconds)
conn.set_ex("sesi:user-42", "data-sesi", 3600).await?; // 1 jam
// SET hanya jika belum ada (SET NX) — untuk distributed lock sederhana
let berhasil: bool = conn.set_nx("lock:resource-1", "owner-1").await?;
println!("Lock berhasil: {}", berhasil);
// INCR / DECR — atomik tanpa race condition
conn.set("counter:halaman", 0i64).await?;
let baru: i64 = conn.incr("counter:halaman", 1).await?;
println!("Counter: {}", baru);
// MSET / MGET — batch operation
conn.set_multiple(&[
("user:1:nama", "Budi"),
("user:2:nama", "Sari"),
("user:3:nama", "Joko"),
]).await?;
let nama_list: Vec<Option<String>> = conn.get(vec![
"user:1:nama", "user:2:nama", "user:3:nama", "user:99:nama"
]).await?;
println!("{:?}", nama_list); // [Some("Budi"), Some("Sari"), Some("Joko"), None]
// TTL — cek sisa waktu hidup kunci
let ttl: i64 = conn.ttl("sesi:user-42").await?;
println!("TTL sesi: {} detik", ttl);
// EXPIRE — set TTL pada kunci yang sudah ada
conn.expire("kunci", 60).await?;
Ok(())
}
Hash — Objek Terstruktur #
Hash cocok untuk menyimpan objek dengan banyak field — lebih efisien dari menyimpan JSON string karena bisa update field individual:
use redis::AsyncCommands;
async fn contoh_hash(conn: &mut deadpool_redis::Connection)
-> Result<(), Box<dyn std::error::Error>>
{
let kunci = "pengguna:1001";
// HSET — set satu atau banyak field
conn.hset_multiple(kunci, &[
("nama", "Budi Santoso"),
("email", "[email protected]"),
("peran", "admin"),
("aktif", "true"),
]).await?;
// HGET — ambil satu field
let nama: String = conn.hget(kunci, "nama").await?;
println!("Nama: {}", nama);
// HMGET — ambil banyak field sekaligus
let data: Vec<Option<String>> = conn.hget(kunci, vec!["nama", "email", "peran"]).await?;
println!("{:?}", data);
// HGETALL — ambil semua field dan nilai
let semua: std::collections::HashMap<String, String> = conn.hgetall(kunci).await?;
println!("Semua field: {:?}", semua);
// HINCRBY — increment field numerik secara atomik
conn.hset(kunci, "skor", "0").await?;
let skor: i64 = conn.hincr(kunci, "skor", 10).await?;
println!("Skor: {}", skor);
// HEXISTS — cek keberadaan field
let ada: bool = conn.hexists(kunci, "email").await?;
println!("Email ada: {}", ada);
// HDEL — hapus field
conn.hdel(kunci, "aktif").await?;
// Set TTL pada hash
conn.expire(kunci, 3600).await?;
Ok(())
}
List — Antrian dan Stack #
use redis::AsyncCommands;
async fn contoh_list(conn: &mut deadpool_redis::Connection)
-> Result<(), Box<dyn std::error::Error>>
{
let kunci = "antrian:email";
// RPUSH — tambah di kanan (untuk antrian FIFO)
conn.rpush(kunci, "email-1").await?;
conn.rpush(kunci, "email-2").await?;
conn.rpush(kunci, "email-3").await?;
// LPUSH — tambah di kiri (untuk stack LIFO)
conn.lpush("stack:undo", "aksi-1").await?;
conn.lpush("stack:undo", "aksi-2").await?;
// LLEN — panjang list
let panjang: i64 = conn.llen(kunci).await?;
println!("Panjang antrian: {}", panjang);
// LPOP — ambil dari kiri (FIFO: ambil yang pertama masuk)
let item: Option<String> = conn.lpop(kunci, None).await?;
println!("Diambil: {:?}", item);
// RPOP — ambil dari kanan (LIFO/stack)
let top: Option<String> = conn.rpop("stack:undo", None).await?;
println!("Stack pop: {:?}", top);
// LRANGE — baca semua elemen tanpa menghapus
let semua: Vec<String> = conn.lrange(kunci, 0, -1).await?;
println!("Semua: {:?}", semua);
// BLPOP — blocking pop (tunggu hingga ada elemen)
// Berguna untuk worker queue
// let (_, item): (String, String) = conn.blpop(kunci, 5.0).await?;
Ok(())
}
Set dan Sorted Set #
use redis::AsyncCommands;
async fn contoh_set(conn: &mut deadpool_redis::Connection)
-> Result<(), Box<dyn std::error::Error>>
{
// Set: kumpulan elemen unik tanpa duplikat
conn.sadd("tag:artikel-1", vec!["rust", "backend", "performance"]).await?;
conn.sadd("tag:artikel-2", vec!["rust", "webdev", "frontend"]).await?;
// SMEMBERS — semua anggota
let tag: std::collections::HashSet<String> = conn.smembers("tag:artikel-1").await?;
println!("Tag: {:?}", tag);
// SISMEMBER — cek keanggotaan
let ada: bool = conn.sismember("tag:artikel-1", "rust").await?;
println!("Ada 'rust': {}", ada);
// Operasi himpunan
let irisan: Vec<String> = conn.sinter(vec!["tag:artikel-1", "tag:artikel-2"]).await?;
println!("Irisan: {:?}", irisan); // ["rust"]
let gabungan: Vec<String> = conn.sunion(vec!["tag:artikel-1", "tag:artikel-2"]).await?;
println!("Gabungan: {:?}", gabungan);
// Sorted Set: kumpulan elemen dengan skor (float)
// Cocok untuk leaderboard, ranking, rate limiting dengan sliding window
conn.zadd("leaderboard", "Alice", 1500.0_f64).await?;
conn.zadd("leaderboard", "Bob", 1200.0_f64).await?;
conn.zadd("leaderboard", "Charlie", 1800.0_f64).await?;
conn.zadd("leaderboard", "Diana", 950.0_f64).await?;
// ZRANGE — ambil berdasarkan rank (ascending)
let top: Vec<String> = conn.zrange("leaderboard", 0, 2).await?;
println!("3 terendah: {:?}", top);
// ZREVRANGE — ambil berdasarkan rank (descending)
let top3: Vec<String> = conn.zrevrange("leaderboard", 0, 2).await?;
println!("Top 3: {:?}", top3); // ["Charlie", "Alice", "Bob"]
// ZRANK — posisi elemen (0-based, ascending)
let rank: Option<i64> = conn.zrank("leaderboard", "Alice").await?;
println!("Rank Alice: {:?}", rank);
// ZSCORE — skor elemen
let skor: Option<f64> = conn.zscore("leaderboard", "Charlie").await?;
println!("Skor Charlie: {:?}", skor);
// ZINCRBY — tambah skor secara atomik
let skor_baru: f64 = conn.zincr("leaderboard", "Bob", 300.0_f64).await?;
println!("Skor Bob baru: {}", skor_baru);
Ok(())
}
Pipeline — Batch Command #
Pipeline mengurangi round-trip network dengan mengirim banyak perintah sekaligus:
use redis::{AsyncCommands, Pipeline};
async fn contoh_pipeline(
pool: &deadpool_redis::Pool,
) -> Result<(), Box<dyn std::error::Error>> {
let mut conn = pool.get().await?;
// ANTI-PATTERN: banyak round-trip terpisah
// for i in 0..100 {
// conn.set(format!("key:{}", i), i).await?; // 100 round-trips!
// }
// BENAR: pipeline — satu round-trip untuk banyak command
let mut pipe = redis::pipe();
for i in 0..100 {
pipe.set(format!("key:{}", i), i).ignore();
}
// Tambah beberapa GET sekaligus
pipe.get("key:0").get("key:50").get("key:99");
let hasil: (String, String, String) = pipe
.query_async(&mut conn)
.await?;
println!("key:0={}, key:50={}, key:99={}", hasil.0, hasil.1, hasil.2);
Ok(())
}
Pola Caching Umum #
Cache-Aside (Lazy Loading) #
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Produk {
id: u64,
nama: String,
harga: f64,
}
// Simulasi database query yang lambat
async fn ambil_dari_db(id: u64) -> Produk {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
Produk { id, nama: format!("Produk {}", id), harga: id as f64 * 1000.0 }
}
async fn ambil_produk(
pool: &deadpool_redis::Pool,
id: u64,
) -> Result<Produk, Box<dyn std::error::Error>> {
let kunci = format!("produk:{}", id);
let mut conn = pool.get().await?;
// 1. Cek cache
let cached: Option<String> = conn.get(&kunci).await?;
if let Some(data) = cached {
println!("Cache HIT untuk produk {}", id);
let produk: Produk = serde_json::from_str(&data)?;
return Ok(produk);
}
// 2. Cache MISS — ambil dari database
println!("Cache MISS untuk produk {}", id);
let produk = ambil_dari_db(id).await;
// 3. Simpan ke cache dengan TTL
let json = serde_json::to_string(&produk)?;
conn.set_ex(&kunci, &json, 300).await?; // TTL 5 menit
Ok(produk)
}
// Invalidasi cache saat data berubah
async fn perbarui_produk(
pool: &deadpool_redis::Pool,
id: u64,
harga_baru: f64,
) -> Result<(), Box<dyn std::error::Error>> {
// Update di database...
// Hapus cache — akan di-refresh saat request berikutnya
let mut conn = pool.get().await?;
conn.del(format!("produk:{}", id)).await?;
println!("Cache produk {} diinvalidasi", id);
Ok(())
}
Distributed Lock #
use redis::AsyncCommands;
use uuid::Uuid;
struct RedisLock {
pool: deadpool_redis::Pool,
kunci: String,
token: String,
}
impl RedisLock {
async fn acquire(
pool: &deadpool_redis::Pool,
resource: &str,
ttl_ms: u64,
) -> Option<RedisLock> {
let kunci = format!("lock:{}", resource);
let token = Uuid::new_v4().to_string();
let mut conn = pool.get().await.ok()?;
// SET NX PX — atomic: set jika belum ada, dengan TTL dalam milidetik
let result: Option<String> = redis::cmd("SET")
.arg(&kunci)
.arg(&token)
.arg("NX")
.arg("PX")
.arg(ttl_ms)
.query_async(&mut conn)
.await
.ok()?;
if result.is_some() {
Some(RedisLock {
pool: pool.clone(),
kunci,
token,
})
} else {
None // Lock sudah dipegang pihak lain
}
}
async fn release(&self) -> bool {
// Script Lua untuk release atomik — hanya hapus jika token cocok
// Mencegah lock orang lain terhapus jika TTL kita sudah habis
let script = redis::Script::new(r"
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
");
let mut conn = match self.pool.get().await {
Ok(c) => c,
Err(_) => return false,
};
let result: i64 = script
.key(&self.kunci)
.arg(&self.token)
.invoke_async(&mut conn)
.await
.unwrap_or(0);
result == 1
}
}
async fn dengan_lock<F, Fut, T>(
pool: &deadpool_redis::Pool,
resource: &str,
ttl_ms: u64,
f: F,
) -> Result<T, String>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = T>,
{
let lock = RedisLock::acquire(pool, resource, ttl_ms)
.await
.ok_or_else(|| format!("Gagal mendapat lock untuk '{}'", resource))?;
let hasil = f().await;
lock.release().await;
Ok(hasil)
}
Rate Limiting dengan Sliding Window #
use redis::AsyncCommands;
async fn cek_rate_limit(
pool: &deadpool_redis::Pool,
user_id: u64,
maks_request: u64,
window_detik: u64,
) -> Result<bool, Box<dyn std::error::Error>> {
let kunci = format!("rate:{}:{}", user_id, chrono::Utc::now().timestamp() / window_detik as i64);
let mut conn = pool.get().await?;
// INCR atomik — hitung request dalam window saat ini
let jumlah: u64 = conn.incr(&kunci, 1u64).await?;
// Set TTL hanya pada request pertama dalam window
if jumlah == 1 {
conn.expire(&kunci, window_detik as i64 * 2).await?;
}
let diizinkan = jumlah <= maks_request;
if !diizinkan {
println!("Rate limit melebihi batas untuk user {}: {}/{}", user_id, jumlah, maks_request);
}
Ok(diizinkan)
}
// Rate limiting dengan Sorted Set (sliding window yang lebih akurat)
async fn rate_limit_sliding(
pool: &deadpool_redis::Pool,
identifier: &str,
maks: u64,
window_ms: u64,
) -> Result<bool, Box<dyn std::error::Error>> {
let kunci = format!("sliding:{}", identifier);
let sekarang = chrono::Utc::now().timestamp_millis() as f64;
let window_mulai = sekarang - window_ms as f64;
let mut conn = pool.get().await?;
let mut pipe = redis::pipe();
pipe.atomic()
// Hapus entry yang sudah keluar dari window
.cmd("ZREMRANGEBYSCORE").arg(&kunci).arg(0).arg(window_mulai).ignore()
// Tambah request saat ini
.cmd("ZADD").arg(&kunci).arg(sekarang).arg(sekarang).ignore()
// Hitung request dalam window
.cmd("ZCARD").arg(&kunci)
// Set TTL
.cmd("EXPIRE").arg(&kunci).arg((window_ms / 1000 + 1) as i64).ignore();
let (jumlah,): (u64,) = pipe.query_async(&mut conn).await?;
Ok(jumlah <= maks)
}
Pub/Sub — Messaging Ringan #
Redis Pub/Sub cocok untuk notifikasi real-time dalam satu datacenter — lebih sederhana dari Kafka/RabbitMQ tapi tanpa persistensi:
use redis::AsyncCommands;
async fn redis_pubsub(
pool: &deadpool_redis::Pool,
) -> Result<(), Box<dyn std::error::Error>> {
// Publisher — gunakan koneksi terpisah
let publisher_pool = pool.clone();
tokio::spawn(async move {
let mut conn = publisher_pool.get().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
for i in 1..=5 {
conn.publish("channel:notifikasi",
serde_json::json!({"id": i, "pesan": format!("Notifikasi {}", i)}).to_string()
).await.unwrap();
println!("Dikirim notifikasi {}", i);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
});
// Subscriber — butuh koneksi dedicated (tidak bisa melakukan command lain)
let mut conn = pool.get().await?;
let mut pubsub = conn.as_pubsub();
pubsub.subscribe("channel:notifikasi").await?;
println!("Subscriber aktif, menunggu notifikasi...");
for _ in 0..5 {
let pesan = pubsub.on_message().next_message().await?;
let payload: String = pesan.get_payload()?;
println!("Diterima: {}", payload);
}
Ok(())
}
Ringkasan #
- Selalu gunakan connection pool —
deadpool-redismengelola pool koneksi async. Satu koneksi per request sangat mahal; pool berbagi koneksi antar request.- Konvensi penamaan kunci dengan
:— gunakan namespace sepertipengguna:1001,sesi:abc123,rate:user-42untuk organisasi yang jelas dan bisa dikelola denganSCAN.set_exuntuk caching dengan TTL — selalu set TTL pada kunci cache agar memori tidak penuh dan data tidak stale selamanya.- Hash lebih efisien dari JSON string untuk objek — bisa update satu field tanpa serialize/deserialize ulang seluruh objek. Cocok untuk data yang sering diperbarui sebagian.
- Pipeline untuk batch operasi — mengirim banyak command dalam satu round-trip jauh lebih efisien dari command terpisah. Hemat hingga 10x latensi untuk operasi bulk.
- Distributed lock dengan
SET NX PX— atomik dan otomatis expire. Selalu gunakan token unik (UUID) dan script Lua untuk release agar tidak menghapus lock orang lain.- Sorted Set untuk leaderboard dan rate limiting —
ZADD+ZRANK+ZINCRBYuntuk leaderboard real-time;ZREMRANGEBYSCORE+ZCARDuntuk sliding window rate limiting.- Redis Pub/Sub untuk notifikasi in-process — lebih sederhana dari Kafka/RabbitMQ, tapi tanpa persistensi. Cocok untuk invalidasi cache antar instance atau notifikasi WebSocket.
- Lua script untuk operasi atomik kompleks — beberapa perintah Redis yang perlu dieksekusi secara atomik bisa dibungkus dalam script Lua tanpa perlu transaksi penuh.