Web Socket

Web Socket #

WebSocket adalah protokol yang menyediakan komunikasi dua arah (full-duplex) antara browser dan server melalui satu koneksi TCP yang persisten. Berbeda dari HTTP yang selalu client yang memulai request, di WebSocket server bisa mengirim data kapan saja tanpa menunggu client meminta — ini yang membuatnya cocok untuk chat, notifikasi real-time, live data feed, dan game multiplayer. Di Rust, crate yang paling banyak digunakan adalah tokio-tungstenite yang memberikan kontrol penuh atas protokol WebSocket di atas tokio, dan axum yang mengintegrasikan WebSocket langsung ke dalam HTTP server.

Cara Kerja WebSocket #

WebSocket dimulai sebagai HTTP request biasa, lalu di-upgrade ke protokol WebSocket melalui handshake khusus:

sequenceDiagram
    participant B as Browser/Client
    participant S as Server

    B->>S: HTTP GET /ws\nUpgrade: websocket\nSec-WebSocket-Key: abc123

    S->>B: HTTP 101 Switching Protocols\nUpgrade: websocket\nSec-WebSocket-Accept: xyz789

    Note over B,S: Koneksi TCP tetap terbuka, protokol berganti ke WebSocket

    B->>S: Frame: Text "Halo!"
    S->>B: Frame: Text "Echo: Halo!"
    S->>B: Frame: Text "Pesan push dari server"
    B->>S: Frame: Ping
    S->>B: Frame: Pong
    B->>S: Frame: Close
    S->>B: Frame: Close

Setelah handshake (HTTP 101), koneksi berubah menjadi WebSocket frame-based. Tidak ada lagi HTTP header per pesan — hanya frame kecil dengan overhead minimal.


Instalasi #

[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.21"
futures-util = "0.3"

Echo Server Dasar #

Server paling sederhana — menerima pesan dan mengembalikannya persis sama:

use futures_util::{SinkExt, StreamExt};
use tokio::net::TcpListener;
use tokio_tungstenite::{accept_async, tungstenite::Message};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:8080")
        .await
        .expect("Gagal bind ke port 8080");
    println!("WebSocket server di ws://127.0.0.1:8080");

    while let Ok((tcp_stream, addr)) = listener.accept().await {
        println!("Koneksi TCP dari {}", addr);
        tokio::spawn(async move {
            tangani_koneksi(tcp_stream, addr).await;
        });
    }
}

async fn tangani_koneksi(
    stream: tokio::net::TcpStream,
    addr: std::net::SocketAddr,
) {
    // Upgrade TCP → WebSocket
    let ws_stream = match accept_async(stream).await {
        Ok(ws) => ws,
        Err(e) => {
            eprintln!("[{}] Gagal upgrade WebSocket: {}", addr, e);
            return;
        }
    };
    println!("[{}] WebSocket terhubung", addr);

    // Pisahkan menjadi sender dan receiver
    let (mut ws_kirim, mut ws_terima) = ws_stream.split();

    // Loop terima pesan dan echo kembali
    while let Some(pesan) = ws_terima.next().await {
        match pesan {
            Ok(msg) => {
                println!("[{}] Diterima: {:?}", addr, msg);
                match msg {
                    Message::Text(teks) => {
                        let respons = Message::Text(format!("Echo: {}", teks).into());
                        if ws_kirim.send(respons).await.is_err() {
                            break;
                        }
                    }
                    Message::Binary(data) => {
                        // Echo binary apa adanya
                        if ws_kirim.send(Message::Binary(data)).await.is_err() {
                            break;
                        }
                    }
                    Message::Ping(data) => {
                        // Balas Ping dengan Pong — wajib untuk keep-alive
                        if ws_kirim.send(Message::Pong(data)).await.is_err() {
                            break;
                        }
                    }
                    Message::Close(frame) => {
                        println!("[{}] Client menutup koneksi: {:?}", addr, frame);
                        let _ = ws_kirim.send(Message::Close(None)).await;
                        break;
                    }
                    _ => {} // Message::Pong, Message::Frame — abaikan
                }
            }
            Err(e) => {
                eprintln!("[{}] Error: {}", addr, e);
                break;
            }
        }
    }

    println!("[{}] Koneksi ditutup", addr);
}

Tipe Pesan WebSocket #

TipeKapan digunakan
Message::Text(String)Pesan teks — JSON, plain text
Message::Binary(Vec<u8>)Data biner — gambar, audio, protobuf
Message::Ping(Vec<u8>)Cek koneksi masih hidup
Message::Pong(Vec<u8>)Balasan Ping — balas segera
Message::Close(Option<CloseFrame>)Tutup koneksi dengan kode dan alasan

Chat Room — Broadcast ke Semua Client #

Kasus penggunaan nyata WebSocket: satu client kirim pesan, semua client menerimanya. Ini membutuhkan shared state berisi semua koneksi aktif:

use futures_util::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::{mpsc, Mutex};
use tokio_tungstenite::{accept_async, tungstenite::Message};

// Setiap client punya ID unik dan channel untuk menerima pesan broadcast
type ClientId = u64;
type Pengirim = mpsc::UnboundedSender<Message>;
type DaftarClient = Arc<Mutex<HashMap<ClientId, Pengirim>>>;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
    let klien: DaftarClient = Arc::new(Mutex::new(HashMap::new()));
    let mut id_berikutnya: ClientId = 1;

    println!("Chat server di ws://127.0.0.1:8080");

    while let Ok((stream, addr)) = listener.accept().await {
        let id = id_berikutnya;
        id_berikutnya += 1;
        let klien = Arc::clone(&klien);

        tokio::spawn(async move {
            tangani_chat(stream, addr, id, klien).await;
        });
    }
}

async fn tangani_chat(
    stream: tokio::net::TcpStream,
    addr: std::net::SocketAddr,
    id: ClientId,
    klien: DaftarClient,
) {
    let ws = match accept_async(stream).await {
        Ok(ws) => ws,
        Err(_) => return,
    };

    // Channel untuk menerima pesan yang akan di-broadcast ke client ini
    let (tx, mut rx) = mpsc::unbounded_channel::<Message>();

    // Daftar client ini
    klien.lock().await.insert(id, tx);
    println!("Client {} ({}) terhubung. Total: {}", id, addr, klien.lock().await.len());

    // Kirim pesan bergabung ke semua
    broadcast(&klien, id, format!("Client {} bergabung", id)).await;

    let (mut ws_kirim, mut ws_terima) = ws.split();

    // Task untuk meneruskan pesan broadcast ke WebSocket client ini
    let kirim_task = tokio::spawn(async move {
        while let Some(pesan) = rx.recv().await {
            if ws_kirim.send(pesan).await.is_err() {
                break;
            }
        }
    });

    // Loop terima pesan dari client dan broadcast ke semua
    while let Some(Ok(pesan)) = ws_terima.next().await {
        if let Message::Text(teks) = pesan {
            let teks_str = teks.to_string();
            println!("Client {}: {}", id, teks_str);
            let pesan_chat = format!("[Client {}]: {}", id, teks_str);
            broadcast(&klien, 0, pesan_chat).await; // 0 = dari server (broadcast ke semua)
        }
    }

    // Client disconnect — bersihkan
    kirim_task.abort();
    klien.lock().await.remove(&id);
    println!("Client {} terputus. Sisa: {}", id, klien.lock().await.len());
    broadcast(&klien, id, format!("Client {} keluar", id)).await;
}

// Kirim pesan ke semua client kecuali pengirimnya (jika kecuali_id != 0)
async fn broadcast(klien: &DaftarClient, kecuali_id: ClientId, pesan: String) {
    let klien_terkunci = klien.lock().await;
    for (&id, tx) in klien_terkunci.iter() {
        if id != kecuali_id {
            let _ = tx.send(Message::Text(pesan.clone().into()));
        }
    }
}

WebSocket Client #

Untuk testing atau komunikasi server-to-server, kamu bisa membuat WebSocket client dengan tokio-tungstenite:

use futures_util::{SinkExt, StreamExt};
use tokio_tungstenite::{connect_async, tungstenite::Message};

#[tokio::main]
async fn main() {
    let url = "ws://127.0.0.1:8080";

    // Hubungkan ke server
    let (ws_stream, respons) = connect_async(url)
        .await
        .expect("Gagal terhubung ke server WebSocket");

    println!("Terhubung! Status HTTP: {}", respons.status());

    let (mut kirim, mut terima) = ws_stream.split();

    // Task penerima — tampilkan semua pesan dari server
    let task_terima = tokio::spawn(async move {
        while let Some(pesan) = terima.next().await {
            match pesan {
                Ok(Message::Text(teks)) => println!("← Server: {}", teks),
                Ok(Message::Binary(data)) => println!("← Binary: {} byte", data.len()),
                Ok(Message::Ping(_)) => println!("← Ping"),
                Ok(Message::Close(_)) => {
                    println!("Server menutup koneksi");
                    break;
                }
                Err(e) => {
                    eprintln!("Error: {}", e);
                    break;
                }
                _ => {}
            }
        }
    });

    // Kirim beberapa pesan ke server
    let pesan_list = ["Halo server!", "Ini pesan kedua", "Sampai jumpa"];
    for pesan in &pesan_list {
        println!("→ Kirim: {}", pesan);
        kirim.send(Message::Text((*pesan).into())).await.unwrap();
        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
    }

    // Tutup koneksi dengan bersih
    kirim.send(Message::Close(None)).await.unwrap();
    task_terima.await.unwrap();
}

Heartbeat — Ping/Pong untuk Keep-Alive #

Koneksi WebSocket bisa terputus tanpa notifikasi jika melewati proxy atau firewall yang menutup koneksi idle. Heartbeat berkala mencegah ini:

use futures_util::{SinkExt, StreamExt};
use tokio::time::{interval, Duration};
use tokio_tungstenite::{accept_async, tungstenite::Message};

async fn tangani_dengan_heartbeat(stream: tokio::net::TcpStream) {
    let ws = accept_async(stream).await.unwrap();
    let (mut kirim, mut terima) = ws.split();

    let mut ping_interval = interval(Duration::from_secs(30));
    let mut menunggu_pong = false;

    loop {
        tokio::select! {
            // Terima pesan dari client
            pesan = terima.next() => {
                match pesan {
                    Some(Ok(Message::Pong(_))) => {
                        menunggu_pong = false;  // Pong diterima — koneksi masih hidup
                    }
                    Some(Ok(Message::Text(t))) => {
                        let _ = kirim.send(Message::Text(format!("Echo: {}", t).into())).await;
                    }
                    Some(Ok(Message::Close(_))) | None => break,
                    _ => {}
                }
            }

            // Kirim Ping setiap 30 detik
            _ = ping_interval.tick() => {
                if menunggu_pong {
                    // Pong tidak datang setelah Ping sebelumnya — putuskan koneksi
                    println!("Timeout: tidak ada Pong, putuskan koneksi");
                    break;
                }
                if kirim.send(Message::Ping(vec![])).await.is_err() {
                    break;
                }
                menunggu_pong = true;
            }
        }
    }
}

WebSocket dengan Axum #

axum menyediakan integrasi WebSocket yang lebih ergonomis dibanding tokio-tungstenite langsung, termasuk routing HTTP dan WebSocket dalam satu server:

[dependencies]
axum = { version = "0.7", features = ["ws"] }
tokio = { version = "1", features = ["full"] }
use axum::{
    extract::ws::{Message, WebSocket, WebSocketUpgrade},
    response::IntoResponse,
    routing::get,
    Router,
};
use futures_util::{SinkExt, StreamExt};

#[tokio::main]
async fn main() {
    let app = Router::new()
        .route("/", get(|| async { "HTTP server aktif" }))
        .route("/ws", get(handler_ws));

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    println!("Server di http://127.0.0.1:3000");
    println!("WebSocket di ws://127.0.0.1:3000/ws");

    axum::serve(listener, app).await.unwrap();
}

// Axum otomatis menangani HTTP Upgrade handshake
async fn handler_ws(ws: WebSocketUpgrade) -> impl IntoResponse {
    ws.on_upgrade(tangani_ws_axum)
}

async fn tangani_ws_axum(mut socket: WebSocket) {
    // Kirim pesan selamat datang
    if socket.send(Message::Text("Selamat datang!".into())).await.is_err() {
        return;
    }

    // Loop pesan
    while let Some(Ok(pesan)) = socket.recv().await {
        match pesan {
            Message::Text(teks) => {
                println!("Diterima: {}", teks);
                let balas = format!("Axum echo: {}", teks);
                if socket.send(Message::Text(balas.into())).await.is_err() {
                    break;
                }
            }
            Message::Close(_) => break,
            _ => {}
        }
    }
}

WSS — WebSocket dengan TLS #

Untuk produksi, selalu gunakan wss:// (WebSocket over TLS). Cara termudah adalah menaruh reverse proxy seperti Nginx atau Caddy di depan server WebSocket dan biarkan proxy yang menangani TLS:

[Browser] ──wss://──→ [Nginx/Caddy: TLS termination] ──ws://──→ [Rust WS Server]

Konfigurasi Nginx untuk terminasi TLS WebSocket:

server {
    listen 443 ssl;
    server_name contoh.com;

    ssl_certificate     /etc/nginx/ssl/cert.pem;
    ssl_certificate_key /etc/nginx/ssl/key.pem;

    location /ws {
        proxy_pass         http://127.0.0.1:8080;
        proxy_http_version 1.1;
        proxy_set_header   Upgrade $http_upgrade;
        proxy_set_header   Connection "upgrade";
        proxy_set_header   Host $host;
        proxy_read_timeout 3600s;  # jangan putus koneksi idle
    }
}

Dengan pendekatan ini, kode Rust tidak perlu menangani sertifikat sama sekali.


Ringkasan #

  • WebSocket dimulai sebagai HTTP request yang di-upgrade — accept_async() dari tokio-tungstenite menangani seluruh proses handshake secara otomatis.
  • Selalu tangani semua tipe MessageText, Binary, Ping, Pong, dan Close. Terutama Ping harus segera dibalas dengan Pong untuk menjaga koneksi.
  • split() untuk baca dan tulis bersamaan — memisahkan WebSocketStream menjadi SinkHalf (kirim) dan StreamHalf (terima) yang bisa dioperasikan dari task berbeda.
  • Gunakan mpsc::unbounded_channel untuk broadcast — setiap client punya channel pribadi; untuk broadcast, iterasi semua sender dan kirim salinan pesan ke masing-masing.
  • Heartbeat Ping/Pong mencegah koneksi idle terputus — kirim Ping tiap 30 detik, putuskan jika tidak ada Pong yang kembali dalam interval berikutnya.
  • axum untuk server HTTP + WebSocket terintegrasi — lebih ergonomis dari tokio-tungstenite langsung jika server sudah menggunakan axum untuk routing HTTP.
  • Untuk produksi, gunakan reverse proxy untuk TLS — Nginx atau Caddy menangani wss:// → Rust server menerima ws:// biasa. Lebih mudah dari mengurus sertifikat di kode Rust.
  • tokio::select! untuk heartbeat bersamaan dengan baca pesan — memungkinkan menangani dua sumber event (channel timer dan WebSocket stream) secara bersamaan tanpa blocking.

← Sebelumnya: Socket   Berikutnya: Web Server →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact