Web Socket #
WebSocket adalah protokol yang menyediakan komunikasi dua arah (full-duplex) antara browser dan server melalui satu koneksi TCP yang persisten. Berbeda dari HTTP yang selalu client yang memulai request, di WebSocket server bisa mengirim data kapan saja tanpa menunggu client meminta — ini yang membuatnya cocok untuk chat, notifikasi real-time, live data feed, dan game multiplayer. Di Rust, crate yang paling banyak digunakan adalah tokio-tungstenite yang memberikan kontrol penuh atas protokol WebSocket di atas tokio, dan axum yang mengintegrasikan WebSocket langsung ke dalam HTTP server.
Cara Kerja WebSocket #
WebSocket dimulai sebagai HTTP request biasa, lalu di-upgrade ke protokol WebSocket melalui handshake khusus:
sequenceDiagram
participant B as Browser/Client
participant S as Server
B->>S: HTTP GET /ws\nUpgrade: websocket\nSec-WebSocket-Key: abc123
S->>B: HTTP 101 Switching Protocols\nUpgrade: websocket\nSec-WebSocket-Accept: xyz789
Note over B,S: Koneksi TCP tetap terbuka, protokol berganti ke WebSocket
B->>S: Frame: Text "Halo!"
S->>B: Frame: Text "Echo: Halo!"
S->>B: Frame: Text "Pesan push dari server"
B->>S: Frame: Ping
S->>B: Frame: Pong
B->>S: Frame: Close
S->>B: Frame: CloseSetelah handshake (HTTP 101), koneksi berubah menjadi WebSocket frame-based. Tidak ada lagi HTTP header per pesan — hanya frame kecil dengan overhead minimal.
Instalasi #
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.21"
futures-util = "0.3"
Echo Server Dasar #
Server paling sederhana — menerima pesan dan mengembalikannya persis sama:
use futures_util::{SinkExt, StreamExt};
use tokio::net::TcpListener;
use tokio_tungstenite::{accept_async, tungstenite::Message};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:8080")
.await
.expect("Gagal bind ke port 8080");
println!("WebSocket server di ws://127.0.0.1:8080");
while let Ok((tcp_stream, addr)) = listener.accept().await {
println!("Koneksi TCP dari {}", addr);
tokio::spawn(async move {
tangani_koneksi(tcp_stream, addr).await;
});
}
}
async fn tangani_koneksi(
stream: tokio::net::TcpStream,
addr: std::net::SocketAddr,
) {
// Upgrade TCP → WebSocket
let ws_stream = match accept_async(stream).await {
Ok(ws) => ws,
Err(e) => {
eprintln!("[{}] Gagal upgrade WebSocket: {}", addr, e);
return;
}
};
println!("[{}] WebSocket terhubung", addr);
// Pisahkan menjadi sender dan receiver
let (mut ws_kirim, mut ws_terima) = ws_stream.split();
// Loop terima pesan dan echo kembali
while let Some(pesan) = ws_terima.next().await {
match pesan {
Ok(msg) => {
println!("[{}] Diterima: {:?}", addr, msg);
match msg {
Message::Text(teks) => {
let respons = Message::Text(format!("Echo: {}", teks).into());
if ws_kirim.send(respons).await.is_err() {
break;
}
}
Message::Binary(data) => {
// Echo binary apa adanya
if ws_kirim.send(Message::Binary(data)).await.is_err() {
break;
}
}
Message::Ping(data) => {
// Balas Ping dengan Pong — wajib untuk keep-alive
if ws_kirim.send(Message::Pong(data)).await.is_err() {
break;
}
}
Message::Close(frame) => {
println!("[{}] Client menutup koneksi: {:?}", addr, frame);
let _ = ws_kirim.send(Message::Close(None)).await;
break;
}
_ => {} // Message::Pong, Message::Frame — abaikan
}
}
Err(e) => {
eprintln!("[{}] Error: {}", addr, e);
break;
}
}
}
println!("[{}] Koneksi ditutup", addr);
}
Tipe Pesan WebSocket #
| Tipe | Kapan digunakan |
|---|---|
Message::Text(String) | Pesan teks — JSON, plain text |
Message::Binary(Vec<u8>) | Data biner — gambar, audio, protobuf |
Message::Ping(Vec<u8>) | Cek koneksi masih hidup |
Message::Pong(Vec<u8>) | Balasan Ping — balas segera |
Message::Close(Option<CloseFrame>) | Tutup koneksi dengan kode dan alasan |
Chat Room — Broadcast ke Semua Client #
Kasus penggunaan nyata WebSocket: satu client kirim pesan, semua client menerimanya. Ini membutuhkan shared state berisi semua koneksi aktif:
use futures_util::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::{mpsc, Mutex};
use tokio_tungstenite::{accept_async, tungstenite::Message};
// Setiap client punya ID unik dan channel untuk menerima pesan broadcast
type ClientId = u64;
type Pengirim = mpsc::UnboundedSender<Message>;
type DaftarClient = Arc<Mutex<HashMap<ClientId, Pengirim>>>;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
let klien: DaftarClient = Arc::new(Mutex::new(HashMap::new()));
let mut id_berikutnya: ClientId = 1;
println!("Chat server di ws://127.0.0.1:8080");
while let Ok((stream, addr)) = listener.accept().await {
let id = id_berikutnya;
id_berikutnya += 1;
let klien = Arc::clone(&klien);
tokio::spawn(async move {
tangani_chat(stream, addr, id, klien).await;
});
}
}
async fn tangani_chat(
stream: tokio::net::TcpStream,
addr: std::net::SocketAddr,
id: ClientId,
klien: DaftarClient,
) {
let ws = match accept_async(stream).await {
Ok(ws) => ws,
Err(_) => return,
};
// Channel untuk menerima pesan yang akan di-broadcast ke client ini
let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
// Daftar client ini
klien.lock().await.insert(id, tx);
println!("Client {} ({}) terhubung. Total: {}", id, addr, klien.lock().await.len());
// Kirim pesan bergabung ke semua
broadcast(&klien, id, format!("Client {} bergabung", id)).await;
let (mut ws_kirim, mut ws_terima) = ws.split();
// Task untuk meneruskan pesan broadcast ke WebSocket client ini
let kirim_task = tokio::spawn(async move {
while let Some(pesan) = rx.recv().await {
if ws_kirim.send(pesan).await.is_err() {
break;
}
}
});
// Loop terima pesan dari client dan broadcast ke semua
while let Some(Ok(pesan)) = ws_terima.next().await {
if let Message::Text(teks) = pesan {
let teks_str = teks.to_string();
println!("Client {}: {}", id, teks_str);
let pesan_chat = format!("[Client {}]: {}", id, teks_str);
broadcast(&klien, 0, pesan_chat).await; // 0 = dari server (broadcast ke semua)
}
}
// Client disconnect — bersihkan
kirim_task.abort();
klien.lock().await.remove(&id);
println!("Client {} terputus. Sisa: {}", id, klien.lock().await.len());
broadcast(&klien, id, format!("Client {} keluar", id)).await;
}
// Kirim pesan ke semua client kecuali pengirimnya (jika kecuali_id != 0)
async fn broadcast(klien: &DaftarClient, kecuali_id: ClientId, pesan: String) {
let klien_terkunci = klien.lock().await;
for (&id, tx) in klien_terkunci.iter() {
if id != kecuali_id {
let _ = tx.send(Message::Text(pesan.clone().into()));
}
}
}
WebSocket Client #
Untuk testing atau komunikasi server-to-server, kamu bisa membuat WebSocket client dengan tokio-tungstenite:
use futures_util::{SinkExt, StreamExt};
use tokio_tungstenite::{connect_async, tungstenite::Message};
#[tokio::main]
async fn main() {
let url = "ws://127.0.0.1:8080";
// Hubungkan ke server
let (ws_stream, respons) = connect_async(url)
.await
.expect("Gagal terhubung ke server WebSocket");
println!("Terhubung! Status HTTP: {}", respons.status());
let (mut kirim, mut terima) = ws_stream.split();
// Task penerima — tampilkan semua pesan dari server
let task_terima = tokio::spawn(async move {
while let Some(pesan) = terima.next().await {
match pesan {
Ok(Message::Text(teks)) => println!("← Server: {}", teks),
Ok(Message::Binary(data)) => println!("← Binary: {} byte", data.len()),
Ok(Message::Ping(_)) => println!("← Ping"),
Ok(Message::Close(_)) => {
println!("Server menutup koneksi");
break;
}
Err(e) => {
eprintln!("Error: {}", e);
break;
}
_ => {}
}
}
});
// Kirim beberapa pesan ke server
let pesan_list = ["Halo server!", "Ini pesan kedua", "Sampai jumpa"];
for pesan in &pesan_list {
println!("→ Kirim: {}", pesan);
kirim.send(Message::Text((*pesan).into())).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}
// Tutup koneksi dengan bersih
kirim.send(Message::Close(None)).await.unwrap();
task_terima.await.unwrap();
}
Heartbeat — Ping/Pong untuk Keep-Alive #
Koneksi WebSocket bisa terputus tanpa notifikasi jika melewati proxy atau firewall yang menutup koneksi idle. Heartbeat berkala mencegah ini:
use futures_util::{SinkExt, StreamExt};
use tokio::time::{interval, Duration};
use tokio_tungstenite::{accept_async, tungstenite::Message};
async fn tangani_dengan_heartbeat(stream: tokio::net::TcpStream) {
let ws = accept_async(stream).await.unwrap();
let (mut kirim, mut terima) = ws.split();
let mut ping_interval = interval(Duration::from_secs(30));
let mut menunggu_pong = false;
loop {
tokio::select! {
// Terima pesan dari client
pesan = terima.next() => {
match pesan {
Some(Ok(Message::Pong(_))) => {
menunggu_pong = false; // Pong diterima — koneksi masih hidup
}
Some(Ok(Message::Text(t))) => {
let _ = kirim.send(Message::Text(format!("Echo: {}", t).into())).await;
}
Some(Ok(Message::Close(_))) | None => break,
_ => {}
}
}
// Kirim Ping setiap 30 detik
_ = ping_interval.tick() => {
if menunggu_pong {
// Pong tidak datang setelah Ping sebelumnya — putuskan koneksi
println!("Timeout: tidak ada Pong, putuskan koneksi");
break;
}
if kirim.send(Message::Ping(vec![])).await.is_err() {
break;
}
menunggu_pong = true;
}
}
}
}
WebSocket dengan Axum #
axum menyediakan integrasi WebSocket yang lebih ergonomis dibanding tokio-tungstenite langsung, termasuk routing HTTP dan WebSocket dalam satu server:
[dependencies]
axum = { version = "0.7", features = ["ws"] }
tokio = { version = "1", features = ["full"] }
use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade},
response::IntoResponse,
routing::get,
Router,
};
use futures_util::{SinkExt, StreamExt};
#[tokio::main]
async fn main() {
let app = Router::new()
.route("/", get(|| async { "HTTP server aktif" }))
.route("/ws", get(handler_ws));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
println!("Server di http://127.0.0.1:3000");
println!("WebSocket di ws://127.0.0.1:3000/ws");
axum::serve(listener, app).await.unwrap();
}
// Axum otomatis menangani HTTP Upgrade handshake
async fn handler_ws(ws: WebSocketUpgrade) -> impl IntoResponse {
ws.on_upgrade(tangani_ws_axum)
}
async fn tangani_ws_axum(mut socket: WebSocket) {
// Kirim pesan selamat datang
if socket.send(Message::Text("Selamat datang!".into())).await.is_err() {
return;
}
// Loop pesan
while let Some(Ok(pesan)) = socket.recv().await {
match pesan {
Message::Text(teks) => {
println!("Diterima: {}", teks);
let balas = format!("Axum echo: {}", teks);
if socket.send(Message::Text(balas.into())).await.is_err() {
break;
}
}
Message::Close(_) => break,
_ => {}
}
}
}
WSS — WebSocket dengan TLS #
Untuk produksi, selalu gunakan wss:// (WebSocket over TLS). Cara termudah adalah menaruh reverse proxy seperti Nginx atau Caddy di depan server WebSocket dan biarkan proxy yang menangani TLS:
[Browser] ──wss://──→ [Nginx/Caddy: TLS termination] ──ws://──→ [Rust WS Server]
Konfigurasi Nginx untuk terminasi TLS WebSocket:
server {
listen 443 ssl;
server_name contoh.com;
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
location /ws {
proxy_pass http://127.0.0.1:8080;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_read_timeout 3600s; # jangan putus koneksi idle
}
}
Dengan pendekatan ini, kode Rust tidak perlu menangani sertifikat sama sekali.
Ringkasan #
- WebSocket dimulai sebagai HTTP request yang di-upgrade —
accept_async()daritokio-tungstenitemenangani seluruh proses handshake secara otomatis.- Selalu tangani semua tipe Message —
Text,Binary,Ping,Pong, danClose. Terutama Ping harus segera dibalas dengan Pong untuk menjaga koneksi.split()untuk baca dan tulis bersamaan — memisahkanWebSocketStreammenjadiSinkHalf(kirim) danStreamHalf(terima) yang bisa dioperasikan dari task berbeda.- Gunakan
mpsc::unbounded_channeluntuk broadcast — setiap client punya channel pribadi; untuk broadcast, iterasi semua sender dan kirim salinan pesan ke masing-masing.- Heartbeat Ping/Pong mencegah koneksi idle terputus — kirim Ping tiap 30 detik, putuskan jika tidak ada Pong yang kembali dalam interval berikutnya.
axumuntuk server HTTP + WebSocket terintegrasi — lebih ergonomis daritokio-tungstenitelangsung jika server sudah menggunakan axum untuk routing HTTP.- Untuk produksi, gunakan reverse proxy untuk TLS — Nginx atau Caddy menangani
wss://→ Rust server menerimaws://biasa. Lebih mudah dari mengurus sertifikat di kode Rust.tokio::select!untuk heartbeat bersamaan dengan baca pesan — memungkinkan menangani dua sumber event (channel timer dan WebSocket stream) secara bersamaan tanpa blocking.