Multi Threading #
Salah satu klaim terbesar Rust adalah fearless concurrency — kamu bisa menulis kode multi-threaded tanpa khawatir tentang data race, karena compiler menolaknya di compile time, bukan di runtime. Ini bukan sekadar slogan: sistem ownership Rust membuat kategori bug yang paling sulit dilacak di C++ dan Java — race condition, use-after-free di thread lain, iterator invalidation — menjadi error kompilasi biasa. Artikel ini membahas primitif thread standar (thread::spawn, Arc, Mutex, channel), tipe atomik untuk state sederhana, paralelisasi mudah dengan rayon, serta trait Send dan Sync yang menjadi fondasi keamanan concurrent Rust.
Thread Dasar #
std::thread::spawn membuat OS thread baru. Thread menjalankan closure yang diteruskan padanya:
use std::thread;
use std::time::Duration;
fn main() {
// Spawn thread — tanpa join, mungkin tidak selesai sebelum main berakhir
let handle = thread::spawn(|| {
for i in 1..=5 {
println!("[thread] iterasi {}", i);
thread::sleep(Duration::from_millis(10));
}
});
// Main thread terus berjalan bersamaan
for i in 1..=3 {
println!("[main] iterasi {}", i);
thread::sleep(Duration::from_millis(15));
}
// join() — blokir main sampai thread selesai
// Tanpa ini, thread bisa terpotong saat main keluar
handle.join().unwrap();
println!("Semua selesai");
}
move Closure — Memindahkan Data ke Thread
#
Thread membutuhkan semua data yang dipakainya untuk hidup selama thread berjalan. Karena thread bisa bertahan lebih lama dari scope asalnya, closure harus memiliki data — bukan hanya meminjamnya. Kata kunci move memindahkan ownership ke closure:
use std::thread;
fn main() {
let pesan = String::from("Halo dari thread!");
// ANTI-PATTERN: borrow biasa tidak bisa melewati batas thread
// let handle = thread::spawn(|| println!("{}", pesan));
// error: closure may outlive the current function but it borrows `pesan`
// BENAR: move memindahkan ownership ke closure
let handle = thread::spawn(move || {
println!("{}", pesan); // pesan dimiliki oleh closure ini sekarang
});
// println!("{}", pesan); // error: pesan sudah di-move
handle.join().unwrap();
}
Spawn Banyak Thread dan Kumpulkan Hasilnya #
use std::thread;
fn main() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
let mut handles = Vec::new();
// Bagi data ke beberapa thread untuk diproses paralel
for chunk in data.chunks(2) {
let chunk = chunk.to_vec(); // buat salinan untuk di-move
let handle = thread::spawn(move || {
let jumlah: i32 = chunk.iter().sum();
println!("Chunk {:?} → jumlah {}", chunk, jumlah);
jumlah
});
handles.push(handle);
}
// Kumpulkan hasil dari semua thread
let total: i32 = handles.into_iter()
.map(|h| h.join().unwrap())
.sum();
println!("Total keseluruhan: {}", total);
}
Berbagi Data: Arc<T> dan Mutex<T>
#
Arc<T> (Atomic Reference Counted) memungkinkan banyak thread memiliki shared ownership ke data yang sama. Mutex<T> memastikan hanya satu thread yang mengakses data di dalamnya pada satu waktu:
flowchart TD
subgraph Heap
ARC["Arc<Mutex<Data>>\nref_count = 3"]
DATA["Data\n(terlindungi Mutex)"]
ARC --> DATA
end
T1["Thread 1\nArc::clone"] --> ARC
T2["Thread 2\nArc::clone"] --> ARC
T3["Thread 3\nArc::clone"] --> ARC
T1 -.->|"lock() → akses eksklusif"| DATAuse std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// Arc: shared ownership | Mutex: synchronized access
let counter = Arc::new(Mutex::new(0i32));
let mut handles = vec![];
for id in 0..10 {
let counter = Arc::clone(&counter); // tambah ref count, tidak copy data
let handle = thread::spawn(move || {
let mut angka = counter.lock().unwrap(); // kunci mutex, tunggu jika sedang dipakai
*angka += 1;
println!("Thread {}: counter sekarang {}", id, *angka);
// angka (MutexGuard) di-drop di sini → mutex otomatis dilepas
});
handles.push(handle);
}
for h in handles {
h.join().unwrap();
}
println!("Nilai akhir: {}", *counter.lock().unwrap()); // 10
}
Deadlock — Jebakan Utama Mutex #
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let a = Arc::new(Mutex::new(1));
let b = Arc::new(Mutex::new(2));
let a1 = Arc::clone(&a);
let b1 = Arc::clone(&b);
// ANTI-PATTERN: deadlock klasik — dua thread mengunci dalam urutan berbeda
// Thread 1: kunci a dulu, lalu b
// Thread 2: kunci b dulu, lalu a
// Keduanya menunggu selamanya
// BENAR: selalu kunci mutex dalam urutan yang konsisten di semua thread
// Atau gunakan try_lock() dengan fallback
// Contoh lock aman: selalu kunci a sebelum b
let handle1 = thread::spawn(move || {
let _ga = a1.lock().unwrap();
thread::sleep(std::time::Duration::from_millis(1));
let _gb = b1.lock().unwrap();
println!("Thread 1 selesai");
});
let _ga = a.lock().unwrap();
let _gb = b.lock().unwrap();
println!("Main selesai");
drop(_ga); drop(_gb);
handle1.join().unwrap();
}
RwLock — Banyak Pembaca, Satu Penulis
#
RwLock<T> lebih efisien dari Mutex ketika operasi baca jauh lebih sering dari tulis:
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let data = Arc::new(RwLock::new(vec![1, 2, 3, 4, 5]));
let mut handles = vec![];
// Banyak thread pembaca bisa berjalan bersamaan
for i in 0..5 {
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let baca = data.read().unwrap(); // read lock — bisa banyak sekaligus
println!("Reader {}: {:?}", i, *baca);
}));
}
// Satu thread penulis — tunggu semua reader selesai
{
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let mut tulis = data.write().unwrap(); // write lock — eksklusif
tulis.push(6);
println!("Writer menambah 6");
}));
}
for h in handles { h.join().unwrap(); }
println!("Hasil akhir: {:?}", *data.read().unwrap());
}
Tipe Atomik — State Sederhana Tanpa Mutex #
Untuk nilai primitif yang hanya perlu increment, decrement, atau swap, tipe atomik dari std::sync::atomic jauh lebih ringan dari Mutex:
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use std::thread;
fn main() {
let counter = Arc::new(AtomicUsize::new(0));
let berhenti = Arc::new(AtomicBool::new(false));
let mut handles = vec![];
// 10 thread increment counter secara atomik — tanpa Mutex
for _ in 0..10 {
let counter = Arc::clone(&counter);
handles.push(thread::spawn(move || {
// fetch_add: atomik tambah dan kembalikan nilai lama
counter.fetch_add(1, Ordering::SeqCst);
}));
}
for h in handles { h.join().unwrap(); }
println!("Counter: {}", counter.load(Ordering::SeqCst)); // 10
// AtomicBool sebagai flag berhenti yang aman antar thread
let berhenti2 = Arc::clone(&berhenti);
let worker = thread::spawn(move || {
let mut i = 0;
while !berhenti2.load(Ordering::Relaxed) {
i += 1;
thread::sleep(std::time::Duration::from_millis(1));
}
println!("Worker berhenti setelah {} iterasi", i);
});
thread::sleep(std::time::Duration::from_millis(50));
berhenti.store(true, Ordering::Relaxed);
worker.join().unwrap();
}
| Tipe | Operasi | Kapan digunakan |
|---|---|---|
AtomicBool | load, store, swap | Flag stop/start, switch mode |
AtomicI32 / AtomicU32 | fetch_add, fetch_sub, compare_exchange | Counter sederhana |
AtomicUsize | fetch_add, fetch_sub | Counter, indeks |
AtomicPtr<T> | load, store, swap | Pointer ke data (tingkat rendah) |
Channel — Komunikasi Antar Thread #
Channel adalah cara idiomatis Rust untuk komunikasi antar thread: “don’t communicate by sharing memory, share memory by communicating”. Rust standard library menyediakan MPSC (Multiple Producer, Single Consumer):
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel::<String>();
// Producer thread — kirim beberapa pesan
let tx1 = tx.clone();
thread::spawn(move || {
let pesan = ["halo", "dunia", "dari", "thread"];
for p in pesan {
tx1.send(p.to_string()).unwrap();
thread::sleep(Duration::from_millis(10));
}
// tx1 di-drop saat thread selesai → channel tahu producer ini tutup
});
// Producer kedua — MPSC: banyak producer
thread::spawn(move || {
for i in 1..=3 {
tx.send(format!("angka-{}", i)).unwrap();
thread::sleep(Duration::from_millis(15));
}
});
// Receiver — iterasi sampai semua producer tutup
for pesan in rx { // rx.recv() dalam loop, berhenti saat channel closed
println!("Diterima: {}", pesan);
}
println!("Semua producer selesai");
}
Channel dengan Bounded Buffer #
Standard library hanya menyediakan unbounded channel. Untuk bounded channel (backpressure), gunakan sync_channel:
use std::sync::mpsc;
use std::thread;
fn main() {
// sync_channel dengan buffer 3 — sender akan blokir jika buffer penuh
let (tx, rx) = mpsc::sync_channel::<u32>(3);
thread::spawn(move || {
for i in 0..10 {
println!("Kirim {}", i);
tx.send(i).unwrap(); // blokir jika buffer penuh (kapasitas 3)
}
});
for val in rx {
thread::sleep(std::time::Duration::from_millis(50)); // simulasi proses lambat
println!("Proses: {}", val);
}
}
Paralelisasi Data dengan rayon
#
Untuk tugas data parallelism — memproses elemen-elemen koleksi secara paralel — crate rayon jauh lebih mudah dari mengelola thread secara manual:
[dependencies]
rayon = "1"
use rayon::prelude::*;
fn hitung_prima(n: u64) -> bool {
if n < 2 { return false; }
(2..=(n as f64).sqrt() as u64).all(|i| n % i != 0)
}
fn main() {
let angka: Vec<u64> = (1..=1_000_000).collect();
// Sequential — satu per satu
let t = std::time::Instant::now();
let prima_seq: Vec<u64> = angka.iter()
.copied()
.filter(|&n| hitung_prima(n))
.collect();
println!("Sequential: {} prima dalam {:?}", prima_seq.len(), t.elapsed());
// Parallel — cukup ganti .iter() dengan .par_iter()
let t = std::time::Instant::now();
let prima_par: Vec<u64> = angka.par_iter() // ← satu kata kunci!
.copied()
.filter(|&n| hitung_prima(n))
.collect();
println!("Parallel: {} prima dalam {:?}", prima_par.len(), t.elapsed());
// par_iter juga mendukung map, filter, sum, dll.
let total: u64 = (1u64..=1_000_000).into_par_iter()
.filter(|&n| hitung_prima(n))
.sum();
println!("Jumlah semua prima: {}", total);
}
rayon menggunakan thread pool yang dikonfigurasi otomatis berdasarkan jumlah CPU yang tersedia. Tidak perlu mengelola thread secara manual.
Trait Send dan Sync — Fondasi Keamanan Thread
#
Dua trait marker ini adalah mekanisme yang membuat compiler Rust bisa memverifikasi keamanan thread:
flowchart TD
Send["Send\nTipe bisa dipindah ke thread lain\nMost types: ✓\nRc<T>: ✗ (gunakan Arc<T>)\n*mut T: ✗ (raw pointer)"]
Sync["Sync\nTipe bisa diakses dari banyak thread\nT Sync jika &T Send\nMutex<T>: ✓\nCell<T>, RefCell<T>: ✗"]use std::sync::{Arc, Mutex};
use std::rc::Rc; // Rc tidak Send — tidak thread-safe
fn butuh_send<T: Send>(_: T) {}
fn butuh_sync<T: Sync>(_: &T) {}
fn main() {
let owned = String::from("aman");
butuh_send(owned); // ✓ String: Send
// Rc tidak Send — tidak bisa dikirim ke thread lain
let rc = Rc::new(42);
// butuh_send(rc); // error: Rc<i32> cannot be sent between threads safely
// Arc adalah Send — gunakan Arc bukan Rc di multi-threaded code
let arc = Arc::new(42);
butuh_send(arc); // ✓
// Mutex<T> adalah Sync jika T: Send
let mutex = Mutex::new(String::new());
butuh_sync(&mutex); // ✓
// ANTI-PATTERN: mencoba kirim RefCell ke thread lain
use std::cell::RefCell;
let rc_ref = RefCell::new(0);
// thread::spawn(move || { rc_ref.borrow_mut(); });
// error: RefCell<i32> cannot be sent between threads safely
}
Ringkasan: Pilih Primitif yang Tepat #
Situasi → Solusi
Shared immutable data → Arc<T>
Shared mutable data → Arc<Mutex<T>>
Banyak baca, jarang tulis → Arc<RwLock<T>>
Counter / flag sederhana → AtomicUsize / AtomicBool
Kirim data antar thread → mpsc::channel
Kirim data dengan backpressure → mpsc::sync_channel
Data parallelism mudah → rayon::par_iter()
Koordinasi banyak thread → Barrier, Condvar
Ringkasan #
thread::spawn+moveclosure — cara dasar membuat thread.movememindahkan ownership ke thread karena thread bisa bertahan lebih lama dari scope asalnya.- Selalu
join()thread penting — tanpajoin(), thread bisa terpotong saat main berakhir. KumpulkanJoinHandledan panggiljoin()pada akhir.Arc<Mutex<T>>untuk shared mutable state —Arcuntuk shared ownership lintas thread,Mutexuntuk akses eksklusif. Jangan lupa bahwaMutexGuarddilepas saat di-drop.RwLockjika banyak baca, sedikit tulis — lebih efisien dariMutexkarena banyak reader bisa aktif bersamaan.- Tipe atomik untuk primitif sederhana —
AtomicUsize,AtomicBooljauh lebih ringan dariMutexuntuk counter dan flag.- Channel untuk komunikasi, bukan shared memory —
mpsc::channel()adalah cara idiomatis Rust: kirim data melalui channel daripada berbagi pointer.rayonuntuk data parallelism — ganti.iter()dengan.par_iter()untuk memproses koleksi secara paralel dengan thread pool otomatis.SenddanSyncdijaga compiler — compiler menolak kode yang mencoba mengirimRcatauRefCellke thread lain. GunakanArcdanMutex/RwLocksebagai gantinya.- Hindari deadlock — kunci mutex dalam urutan yang konsisten di semua thread, atau gunakan
try_lock()dengan fallback daripadalock()yang memblokir selamanya.