Multi Threading

Multi Threading #

Salah satu klaim terbesar Rust adalah fearless concurrency — kamu bisa menulis kode multi-threaded tanpa khawatir tentang data race, karena compiler menolaknya di compile time, bukan di runtime. Ini bukan sekadar slogan: sistem ownership Rust membuat kategori bug yang paling sulit dilacak di C++ dan Java — race condition, use-after-free di thread lain, iterator invalidation — menjadi error kompilasi biasa. Artikel ini membahas primitif thread standar (thread::spawn, Arc, Mutex, channel), tipe atomik untuk state sederhana, paralelisasi mudah dengan rayon, serta trait Send dan Sync yang menjadi fondasi keamanan concurrent Rust.

Thread Dasar #

std::thread::spawn membuat OS thread baru. Thread menjalankan closure yang diteruskan padanya:

use std::thread;
use std::time::Duration;

fn main() {
    // Spawn thread — tanpa join, mungkin tidak selesai sebelum main berakhir
    let handle = thread::spawn(|| {
        for i in 1..=5 {
            println!("[thread] iterasi {}", i);
            thread::sleep(Duration::from_millis(10));
        }
    });

    // Main thread terus berjalan bersamaan
    for i in 1..=3 {
        println!("[main] iterasi {}", i);
        thread::sleep(Duration::from_millis(15));
    }

    // join() — blokir main sampai thread selesai
    // Tanpa ini, thread bisa terpotong saat main keluar
    handle.join().unwrap();
    println!("Semua selesai");
}

move Closure — Memindahkan Data ke Thread #

Thread membutuhkan semua data yang dipakainya untuk hidup selama thread berjalan. Karena thread bisa bertahan lebih lama dari scope asalnya, closure harus memiliki data — bukan hanya meminjamnya. Kata kunci move memindahkan ownership ke closure:

use std::thread;

fn main() {
    let pesan = String::from("Halo dari thread!");

    // ANTI-PATTERN: borrow biasa tidak bisa melewati batas thread
    // let handle = thread::spawn(|| println!("{}", pesan));
    // error: closure may outlive the current function but it borrows `pesan`

    // BENAR: move memindahkan ownership ke closure
    let handle = thread::spawn(move || {
        println!("{}", pesan);  // pesan dimiliki oleh closure ini sekarang
    });

    // println!("{}", pesan);  // error: pesan sudah di-move

    handle.join().unwrap();
}

Spawn Banyak Thread dan Kumpulkan Hasilnya #

use std::thread;

fn main() {
    let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
    let mut handles = Vec::new();

    // Bagi data ke beberapa thread untuk diproses paralel
    for chunk in data.chunks(2) {
        let chunk = chunk.to_vec();  // buat salinan untuk di-move
        let handle = thread::spawn(move || {
            let jumlah: i32 = chunk.iter().sum();
            println!("Chunk {:?} → jumlah {}", chunk, jumlah);
            jumlah
        });
        handles.push(handle);
    }

    // Kumpulkan hasil dari semua thread
    let total: i32 = handles.into_iter()
        .map(|h| h.join().unwrap())
        .sum();

    println!("Total keseluruhan: {}", total);
}

Berbagi Data: Arc<T> dan Mutex<T> #

Arc<T> (Atomic Reference Counted) memungkinkan banyak thread memiliki shared ownership ke data yang sama. Mutex<T> memastikan hanya satu thread yang mengakses data di dalamnya pada satu waktu:

flowchart TD
    subgraph Heap
        ARC["Arc<Mutex<Data>>\nref_count = 3"]
        DATA["Data\n(terlindungi Mutex)"]
        ARC --> DATA
    end

    T1["Thread 1\nArc::clone"] --> ARC
    T2["Thread 2\nArc::clone"] --> ARC
    T3["Thread 3\nArc::clone"] --> ARC

    T1 -.->|"lock() → akses eksklusif"| DATA
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // Arc: shared ownership | Mutex: synchronized access
    let counter = Arc::new(Mutex::new(0i32));
    let mut handles = vec![];

    for id in 0..10 {
        let counter = Arc::clone(&counter);  // tambah ref count, tidak copy data
        let handle = thread::spawn(move || {
            let mut angka = counter.lock().unwrap();  // kunci mutex, tunggu jika sedang dipakai
            *angka += 1;
            println!("Thread {}: counter sekarang {}", id, *angka);
            // angka (MutexGuard) di-drop di sini → mutex otomatis dilepas
        });
        handles.push(handle);
    }

    for h in handles {
        h.join().unwrap();
    }

    println!("Nilai akhir: {}", *counter.lock().unwrap());  // 10
}

Deadlock — Jebakan Utama Mutex #

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let a = Arc::new(Mutex::new(1));
    let b = Arc::new(Mutex::new(2));

    let a1 = Arc::clone(&a);
    let b1 = Arc::clone(&b);

    // ANTI-PATTERN: deadlock klasik — dua thread mengunci dalam urutan berbeda
    // Thread 1: kunci a dulu, lalu b
    // Thread 2: kunci b dulu, lalu a
    // Keduanya menunggu selamanya

    // BENAR: selalu kunci mutex dalam urutan yang konsisten di semua thread
    // Atau gunakan try_lock() dengan fallback

    // Contoh lock aman: selalu kunci a sebelum b
    let handle1 = thread::spawn(move || {
        let _ga = a1.lock().unwrap();
        thread::sleep(std::time::Duration::from_millis(1));
        let _gb = b1.lock().unwrap();
        println!("Thread 1 selesai");
    });

    let _ga = a.lock().unwrap();
    let _gb = b.lock().unwrap();
    println!("Main selesai");
    drop(_ga); drop(_gb);

    handle1.join().unwrap();
}

RwLock — Banyak Pembaca, Satu Penulis #

RwLock<T> lebih efisien dari Mutex ketika operasi baca jauh lebih sering dari tulis:

use std::sync::{Arc, RwLock};
use std::thread;

fn main() {
    let data = Arc::new(RwLock::new(vec![1, 2, 3, 4, 5]));
    let mut handles = vec![];

    // Banyak thread pembaca bisa berjalan bersamaan
    for i in 0..5 {
        let data = Arc::clone(&data);
        handles.push(thread::spawn(move || {
            let baca = data.read().unwrap();  // read lock — bisa banyak sekaligus
            println!("Reader {}: {:?}", i, *baca);
        }));
    }

    // Satu thread penulis — tunggu semua reader selesai
    {
        let data = Arc::clone(&data);
        handles.push(thread::spawn(move || {
            let mut tulis = data.write().unwrap();  // write lock — eksklusif
            tulis.push(6);
            println!("Writer menambah 6");
        }));
    }

    for h in handles { h.join().unwrap(); }
    println!("Hasil akhir: {:?}", *data.read().unwrap());
}

Tipe Atomik — State Sederhana Tanpa Mutex #

Untuk nilai primitif yang hanya perlu increment, decrement, atau swap, tipe atomik dari std::sync::atomic jauh lebih ringan dari Mutex:

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use std::thread;

fn main() {
    let counter = Arc::new(AtomicUsize::new(0));
    let berhenti = Arc::new(AtomicBool::new(false));
    let mut handles = vec![];

    // 10 thread increment counter secara atomik — tanpa Mutex
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        handles.push(thread::spawn(move || {
            // fetch_add: atomik tambah dan kembalikan nilai lama
            counter.fetch_add(1, Ordering::SeqCst);
        }));
    }

    for h in handles { h.join().unwrap(); }
    println!("Counter: {}", counter.load(Ordering::SeqCst));  // 10

    // AtomicBool sebagai flag berhenti yang aman antar thread
    let berhenti2 = Arc::clone(&berhenti);
    let worker = thread::spawn(move || {
        let mut i = 0;
        while !berhenti2.load(Ordering::Relaxed) {
            i += 1;
            thread::sleep(std::time::Duration::from_millis(1));
        }
        println!("Worker berhenti setelah {} iterasi", i);
    });

    thread::sleep(std::time::Duration::from_millis(50));
    berhenti.store(true, Ordering::Relaxed);
    worker.join().unwrap();
}
TipeOperasiKapan digunakan
AtomicBoolload, store, swapFlag stop/start, switch mode
AtomicI32 / AtomicU32fetch_add, fetch_sub, compare_exchangeCounter sederhana
AtomicUsizefetch_add, fetch_subCounter, indeks
AtomicPtr<T>load, store, swapPointer ke data (tingkat rendah)

Channel — Komunikasi Antar Thread #

Channel adalah cara idiomatis Rust untuk komunikasi antar thread: “don’t communicate by sharing memory, share memory by communicating”. Rust standard library menyediakan MPSC (Multiple Producer, Single Consumer):

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel::<String>();

    // Producer thread — kirim beberapa pesan
    let tx1 = tx.clone();
    thread::spawn(move || {
        let pesan = ["halo", "dunia", "dari", "thread"];
        for p in pesan {
            tx1.send(p.to_string()).unwrap();
            thread::sleep(Duration::from_millis(10));
        }
        // tx1 di-drop saat thread selesai → channel tahu producer ini tutup
    });

    // Producer kedua — MPSC: banyak producer
    thread::spawn(move || {
        for i in 1..=3 {
            tx.send(format!("angka-{}", i)).unwrap();
            thread::sleep(Duration::from_millis(15));
        }
    });

    // Receiver — iterasi sampai semua producer tutup
    for pesan in rx {  // rx.recv() dalam loop, berhenti saat channel closed
        println!("Diterima: {}", pesan);
    }

    println!("Semua producer selesai");
}

Channel dengan Bounded Buffer #

Standard library hanya menyediakan unbounded channel. Untuk bounded channel (backpressure), gunakan sync_channel:

use std::sync::mpsc;
use std::thread;

fn main() {
    // sync_channel dengan buffer 3 — sender akan blokir jika buffer penuh
    let (tx, rx) = mpsc::sync_channel::<u32>(3);

    thread::spawn(move || {
        for i in 0..10 {
            println!("Kirim {}", i);
            tx.send(i).unwrap();  // blokir jika buffer penuh (kapasitas 3)
        }
    });

    for val in rx {
        thread::sleep(std::time::Duration::from_millis(50));  // simulasi proses lambat
        println!("Proses: {}", val);
    }
}

Paralelisasi Data dengan rayon #

Untuk tugas data parallelism — memproses elemen-elemen koleksi secara paralel — crate rayon jauh lebih mudah dari mengelola thread secara manual:

[dependencies]
rayon = "1"
use rayon::prelude::*;

fn hitung_prima(n: u64) -> bool {
    if n < 2 { return false; }
    (2..=(n as f64).sqrt() as u64).all(|i| n % i != 0)
}

fn main() {
    let angka: Vec<u64> = (1..=1_000_000).collect();

    // Sequential — satu per satu
    let t = std::time::Instant::now();
    let prima_seq: Vec<u64> = angka.iter()
        .copied()
        .filter(|&n| hitung_prima(n))
        .collect();
    println!("Sequential: {} prima dalam {:?}", prima_seq.len(), t.elapsed());

    // Parallel — cukup ganti .iter() dengan .par_iter()
    let t = std::time::Instant::now();
    let prima_par: Vec<u64> = angka.par_iter()  // ← satu kata kunci!
        .copied()
        .filter(|&n| hitung_prima(n))
        .collect();
    println!("Parallel:   {} prima dalam {:?}", prima_par.len(), t.elapsed());

    // par_iter juga mendukung map, filter, sum, dll.
    let total: u64 = (1u64..=1_000_000).into_par_iter()
        .filter(|&n| hitung_prima(n))
        .sum();
    println!("Jumlah semua prima: {}", total);
}

rayon menggunakan thread pool yang dikonfigurasi otomatis berdasarkan jumlah CPU yang tersedia. Tidak perlu mengelola thread secara manual.


Trait Send dan Sync — Fondasi Keamanan Thread #

Dua trait marker ini adalah mekanisme yang membuat compiler Rust bisa memverifikasi keamanan thread:

flowchart TD
    Send["Send\nTipe bisa dipindah ke thread lain\nMost types: ✓\nRc<T>: ✗ (gunakan Arc<T>)\n*mut T: ✗ (raw pointer)"]
    Sync["Sync\nTipe bisa diakses dari banyak thread\nT Sync jika &T Send\nMutex<T>: ✓\nCell<T>, RefCell<T>: ✗"]
use std::sync::{Arc, Mutex};
use std::rc::Rc;  // Rc tidak Send — tidak thread-safe

fn butuh_send<T: Send>(_: T) {}
fn butuh_sync<T: Sync>(_: &T) {}

fn main() {
    let owned = String::from("aman");
    butuh_send(owned);  // ✓ String: Send

    // Rc tidak Send — tidak bisa dikirim ke thread lain
    let rc = Rc::new(42);
    // butuh_send(rc);  // error: Rc<i32> cannot be sent between threads safely

    // Arc adalah Send — gunakan Arc bukan Rc di multi-threaded code
    let arc = Arc::new(42);
    butuh_send(arc);  // ✓

    // Mutex<T> adalah Sync jika T: Send
    let mutex = Mutex::new(String::new());
    butuh_sync(&mutex);  // ✓

    // ANTI-PATTERN: mencoba kirim RefCell ke thread lain
    use std::cell::RefCell;
    let rc_ref = RefCell::new(0);
    // thread::spawn(move || { rc_ref.borrow_mut(); });
    // error: RefCell<i32> cannot be sent between threads safely
}

Ringkasan: Pilih Primitif yang Tepat #

Situasi                          → Solusi

Shared immutable data            → Arc<T>
Shared mutable data              → Arc<Mutex<T>>
Banyak baca, jarang tulis        → Arc<RwLock<T>>
Counter / flag sederhana         → AtomicUsize / AtomicBool
Kirim data antar thread          → mpsc::channel
Kirim data dengan backpressure   → mpsc::sync_channel
Data parallelism mudah           → rayon::par_iter()
Koordinasi banyak thread         → Barrier, Condvar

Ringkasan #

  • thread::spawn + move closure — cara dasar membuat thread. move memindahkan ownership ke thread karena thread bisa bertahan lebih lama dari scope asalnya.
  • Selalu join() thread penting — tanpa join(), thread bisa terpotong saat main berakhir. Kumpulkan JoinHandle dan panggil join() pada akhir.
  • Arc<Mutex<T>> untuk shared mutable stateArc untuk shared ownership lintas thread, Mutex untuk akses eksklusif. Jangan lupa bahwa MutexGuard dilepas saat di-drop.
  • RwLock jika banyak baca, sedikit tulis — lebih efisien dari Mutex karena banyak reader bisa aktif bersamaan.
  • Tipe atomik untuk primitif sederhanaAtomicUsize, AtomicBool jauh lebih ringan dari Mutex untuk counter dan flag.
  • Channel untuk komunikasi, bukan shared memorympsc::channel() adalah cara idiomatis Rust: kirim data melalui channel daripada berbagi pointer.
  • rayon untuk data parallelism — ganti .iter() dengan .par_iter() untuk memproses koleksi secara paralel dengan thread pool otomatis.
  • Send dan Sync dijaga compiler — compiler menolak kode yang mencoba mengirim Rc atau RefCell ke thread lain. Gunakan Arc dan Mutex/RwLock sebagai gantinya.
  • Hindari deadlock — kunci mutex dalam urutan yang konsisten di semua thread, atau gunakan try_lock() dengan fallback daripada lock() yang memblokir selamanya.

← Sebelumnya: Crates   Berikutnya: I/O →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact