Tutorial Uji MQTT Stressing Menggunakan Mosquitto Broker di Ubuntu VM dan Script Python di Windows

ABSTRAK

    Penelitian ini membahas analisis kinerja protokol Message Queuing Telemetry Transport (MQTT) dengan fokus pada pengaruh variasi struktur topik terhadap performa sistem dalam jaringan Internet of Things (IoT). MQTT dipilih karena karakteristiknya yang ringan dan efisien dalam komunikasi data antar perangkat dengan keterbatasan sumber daya. Pengujian dilakukan menggunakan arsitektur publisher-subscriber yang dijalankan pada mesin virtual (VM) berbasis Ubuntu dengan broker Mosquitto sebagai server utama. Lingkungan pengujian telah dikonfigurasi sebelumnya dengan instalasi broker, pustaka Python, serta dependensi yang diperlukan. Dalam eksperimen ini, dilakukan variasi tingkat kompleksitas topik, mulai dari struktur sederhana hingga bersarang dengan beberapa level hierarki. Dua skenario pengujian digunakan, yaitu publisher tunggal dan publisher multiproses, untuk mengukur pengaruh jumlah klien dan panjang topik terhadap parameter performa. Metrik yang dianalisis meliputi bandwidth, throughput, latensi rata-rata, serta persentase packet loss yang diukur secara otomatis dan disimpan dalam format CSV untuk analisis lanjutan.

PENDAHULUAN

    Penelitian ini terdiri atas dua tahap utama, tahap pertama bertujuan untuk mengukur kemampuan perangkat (dalam hal ini laptop penulis) dalam menangani pertumbuhan jumlah klien secara dinamis. Pada tahap ini, sistem akan menjalankan skrip publisher yang terus menambah jumlah klien secara paralel, sambil memantau kondisi CPU dan RAM. Pengujian ini menghasilkan batas maksimum jumlah klien yang masih dapat diproses oleh perangkat tanpa menyebabkan beban sistem berlebih. Tahap kedua difokuskan pada analisis pengaruh variasi panjang topik MQTT terhadap performa komunikasi. Empat metrik utama yang diamati meliputi bandwidth (bytes/s), throughput (msg/s), latensi (s), dan packet loss (%). Pengujian dilakukan dengan beberapa tingkat jumlah klien, yaitu 100, 200, 300, dan 500, serta empat variasi panjang topik sebagai berikut:

  1. Topik pendek dengan 5 karakter,

  2. Topik sedang dengan 20 karakter,

  3. Topik menengah dengan 50 karakter, dan

  4. Topik panjang dengan 100 karakter.

    Seluruh eksperimen dilakukan dalam lingkungan Virtual Machine (VM) berbasis Ubuntu, di mana broker Eclipse Mosquitto telah terpasang dan dikonfigurasi sebelumnya. Selain itu, pustaka Python yang diperlukan, termasuk paho-mqtt, telah diinstal bersama dengan modul pendukung lain untuk keperluan pemantauan sumber daya dan pencatatan hasil pengukuran. Hasil pengujian dari kedua tahap ini disimpan secara otomatis dalam format CSV untuk memudahkan analisis kuantitatif lebih lanjut. Penelitian ini diharapkan dapat memberikan wawasan empiris mengenai hubungan antara desain topik MQTT, jumlah klien aktif, dan performa komunikasi sistem. Dengan demikian, hasilnya dapat menjadi acuan dalam perancangan sistem IoT yang efisien dan skalabel.

PRAKTIK TEKNIS

Langkah 1. Pengecekan Koneksi Jaringan antar Sistem Operasi

Sebelum menjalankan pengujian MQTT, hal pertama yang perlu dipastikan adalah bahwa kedua sistem operasi, yaitu Windows (host) dan Ubuntu (guest pada Virtual Machine), berada dalam jaringan yang sama. Kondisi ini sangat penting karena komunikasi antara publisher, subscriber, dan broker hanya dapat terjadi apabila perangkat-perangkat tersebut dapat saling berinteraksi melalui alamat IP yang dapat dijangkau.

Dalam penelitian ini, pengujian dilakukan secara lokal (local area network) dengan konfigurasi jaringan berbasis bridged adapter. Mode ini memungkinkan mesin virtual (Ubuntu) untuk memperoleh alamat IP yang sama seperti perangkat fisik (Windows), sehingga keduanya terhubung langsung ke jaringan lokal yang sama.

1. Mengecek Alamat IP pada Sistem Windows

Langkah pertama dilakukan pada sistem operasi Windows untuk mengetahui alamat IPv4 dari host.
Buka Command Prompt (CMD) dan jalankan perintah berikut:

ipconfig

Hasil eksekusi akan menampilkan konfigurasi jaringan aktif. Cari bagian dengan deskripsi sesuai adaptor jaringan yang digunakan, pada kasus ini adaptor yang digunakan adalah Intel(R) Wireless-AC 9560. Catat nilai dari IPv4 Address, misalnya:

IPv4 Address. . . . . . . . . . . : 192.168.18.198

Alamat ini akan digunakan sebagai acuan untuk memastikan koneksi jaringan dan konfigurasi broker MQTT pada sistem Ubuntu.

2. Mengecek Alamat IP pada Sistem Ubuntu (Virtual Machine)

Selanjutnya, lakukan pengecekan pada Ubuntu yang berjalan di dalam Virtual Machine. Buka terminal dan jalankan perintah:

hostname -I

Perintah ini akan menampilkan alamat IP dari mesin Ubuntu. Pastikan alamat yang muncul memiliki prefix jaringan yang sama dengan IP Windows. Sebagai contoh, jika Windows memiliki alamat 192.168.18.198, maka Ubuntu seharusnya berada pada subnet yang sama, misalnya 192.168.18.200.

3. Mengatur Mode Jaringan ke Bridged Adapter

Apabila alamat IP pada Ubuntu tidak berada dalam subnet yang sama dengan Windows, maka perlu dilakukan penyesuaian pada pengaturan Virtual Machine.
Buka pengaturan jaringan pada VM (VirtualBox atau VMware), kemudian ubah mode jaringan menjadi Bridged Adapter, dan pastikan adaptor fisik yang dipilih sesuai dengan perangkat yang digunakan pada host (misalnya Intel(R) Wireless-AC 9560).

4. Menjalankan broker mosquitto pada VM Ubuntu

Setelah dipastikan kedua OS ini berjalan pada satu jaringan yang sama, anda dapat menjalankan broker mosquitto yang telah tetinstall sebelumnya dengan menggunakan perintah:

sudo mosquitto -c /etc/mosquitto/mosquitto.conf -v

Langkah 2. Pemeriksaan File Konfigurasi (config.py)

Sebelum menjalankan skrip pengujian, langkah berikutnya adalah memastikan bahwa seluruh parameter dasar sistem telah dikonfigurasi dengan benar melalui file config.py. File ini berperan sebagai pusat pengaturan utama yang digunakan oleh semua modul lain, baik publisher, subscriber, maupun metrics analyzer. Dengan demikian, kesalahan pada konfigurasi awal dapat memengaruhi keseluruhan proses pengujian.

Berikut adalah isi lengkap file konfigurasi:

import json
import time
import psutil

# Konfigurasi MQTT Broker
BROKER_HOST = "192.168.18.198"
BROKER_PORT = 1883

# Konfigurasi Topik
# TOPIC = "XXXXX"
# TOPIC = "XXXXX/YYYYY"
# TOPIC = "XXXXX/YYYYY/ZZZZZ/XXXXX"
# TOPIC = "XXXXX/YYYYY/ZZZZZ/XXXXX/YYYYY/ZZZZZ/XXXXX/YYYYY/ZZZZZ/XXXXX"
# TOPIC = "XXXXX/YYYYY/ZZZZZ/XXXXX/YYYYY/ZZZZZ/XXXXX/YYYYY/ZZZZZ/XXXXX/YYYYY/ZZZZZ/XXXXX/YYYYY/ZZZZZ/XXXXX/YYYYY/ZZZZZ/XXXXX/YYYYY"

# Monitoring Resource Sistem


def get_system_status():
    cpu_percent = psutil.cpu_percent(interval=0.5)
    mem = psutil.virtual_memory()
    return {
        "cpu_percent": cpu_percent,
        "ram_percent": mem.percent,
        "available_ram_mb": int(mem.available / (1024 * 1024))
    }


def is_resource_available(cpu_threshold=80.0, ram_threshold=80.0):
    status = get_system_status()
    return (status["cpu_percent"] < cpu_threshold and status["ram_percent"] < ram_threshold)

# Fungsi untuk generate payload


def generate_payload(msg_id: int, client_id: int, fixed_size: int = 200):
    payload = {
        "client_id": f"{client_id}",
        "id_messege": f"{msg_id:04d}",
        "timestamp": time.time(),
        "data": "X" * fixed_size
    }
    return json.dumps(payload, ensure_ascii=False)

Penjelasan Kode:

  1. Inisialisasi Library
    Program menggunakan tiga pustaka utama:

    • json untuk melakukan serialisasi data ke format JSON sebelum dikirim melalui broker MQTT,

    • time untuk mencatat waktu pengiriman pesan, dan

    • psutil untuk memantau status sumber daya sistem seperti CPU dan RAM.

  2. Konfigurasi Broker MQTT
    Dua parameter berikut digunakan untuk mengatur koneksi ke broker:

    BROKER_HOST = "192.168.18.198"
    BROKER_PORT = 1883

    Nilai BROKER_HOST diisi dengan alamat IP dari sistem Ubuntu tempat broker Mosquitto dijalankan, sedangkan BROKER_PORT menggunakan port default MQTT, yaitu 1883.

  3. Konfigurasi Topik
    Bagian ini berisi beberapa contoh bentuk hierarki topik yang digunakan untuk pengujian. Variasi panjang topik diatur berdasarkan jumlah karakter yang berbeda, yakni 5, 10, 20, 50, dan 100 karakter. Setiap struktur topik mewakili tingkat kompleksitas berbeda yang akan diuji pengaruhnya terhadap performa komunikasi.

    Contoh bentuk topik yang digunakan:

    • TOPIC = "XXXXX" topik sepanjang 5 karakter

    • TOPIC = "XXXXXXXXXX" topik sepanjang 10 karakter

    • TOPIC = "XXXXXXXXXXXXXXXXXXXX" topik sepanjang 20 karakter

    • TOPIC = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" topik sepanjang 50 karakter

    • TOPIC = "X" * 100 → topik sepanjang 100 karakter

    Dengan variasi ini, sistem dapat mengamati bagaimana panjang topik memengaruhi latensi, throughput, bandwidth, dan packet loss selama proses publish–subscribe.

  4. Pemantauan Sumber Daya Sistem
    Fungsi berikut digunakan untuk mendapatkan status CPU dan RAM secara real-time:

    def get_system_status():
        cpu_percent = psutil.cpu_percent(interval=0.5)
        mem = psutil.virtual_memory()
        return {
            "cpu_percent": cpu_percent,
            "ram_percent": mem.percent,
            "available_ram_mb": int(mem.available / (1024 * 1024))
        }

    Fungsi ini mengembalikan tiga parameter:

    • cpu_percent: persentase penggunaan CPU dalam interval 0,5 detik,

    • ram_percent: persentase penggunaan RAM,

    • available_ram_mb: kapasitas RAM yang masih tersedia dalam satuan megabyte.

    Selanjutnya, fungsi is_resource_available() digunakan untuk menentukan apakah sistem masih memiliki sumber daya yang cukup untuk menambah klien baru. Fungsi ini akan mengembalikan nilai True jika pemakaian CPU dan RAM berada di bawah ambang batas yang ditentukan (default: 80%).

  5. Fungsi generate_payload()
    Fungsi ini membentuk payload pesan yang akan dikirim oleh setiap publisher. Struktur data dikonversi ke format JSON sebelum dikirim ke broker.

    def generate_payload(msg_id: int, client_id: int, fixed_size: int = 200):
        payload = {
            "client_id": f"{client_id}",
            "id_messege": f"{msg_id:04d}",
            "timestamp": time.time(),
            "data": "X" * fixed_size
        }
        return json.dumps(payload, ensure_ascii=False)

    Elemen yang dikirim mencakup:

    • client_id: identitas klien pengirim,

    • id_messege: nomor pesan yang diurutkan secara bertahap,

    • timestamp: waktu pengiriman pesan (dalam detik sejak epoch),

    • data: konten pesan dengan ukuran tetap (200 karakter).

    Dengan struktur ini, setiap pesan yang dikirim memiliki ukuran yang seragam, sehingga hasil pengujian dapat difokuskan pada perbedaan performa akibat panjang topik, bukan ukuran payload.


Langkah 3: Menjalankan Script Growth Client Testing

Setelah memastikan file konfigurasi telah disesuaikan dengan alamat broker dan topik MQTT yang digunakan, langkah berikutnya adalah menjalankan script pengujian pertumbuhan klien (growth client testing).
Pengujian ini bertujuan untuk mengetahui kemampuan perangkat (laptop) dalam menangani jumlah klien yang terus meningkat secara paralel, sehingga dapat diketahui kapasitas maksimum sistem sebelum mengalami degradasi performa.

Perintah Eksekusi

Program dapat dijalankan menggunakan perintah berikut di terminal atau command prompt:

python -m core.subscriber

Perintah tersebut memanggil modul subscriber di dalam folder core, yang akan berfungsi sebagai subscriber utama untuk menerima pesan dari berbagai klien publisher yang dijalankan secara simultan.

Berikut adalah source code lengkap beserta fungsi utamanya:

import os
import csv
import json
import time
import paho.mqtt.client as mqtt
import matplotlib.pyplot as plt
from config.config import BROKER_HOST, BROKER_PORT, TOPIC

# Path hasil logging
RESULT_DIR = "result/sample1"
LOG_FILE = os.path.join(RESULT_DIR, "logs.csv")
PLOT_FILE = os.path.join(RESULT_DIR, "plot.png")

os.makedirs(RESULT_DIR, exist_ok=True)

# Buat file CSV dengan header hanya sekali
if not os.path.exists(LOG_FILE):
    with open(LOG_FILE, mode="w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["Timestamp", "Client ID",
                        "Msg ID", "Data Len", "Counter"])

# Global counter
msg_counter = 0


def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("[INFO] Subscriber berhasil terhubung ke broker.")
        client.subscribe(TOPIC)
        print(f"[INFO] Subscribed ke topik: {TOPIC}")
    else:
        print(f"[ERROR] Gagal koneksi ke broker. Kode: {rc}")


def on_message(client, userdata, msg):
    global msg_counter
    try:
        payload = msg.payload.decode("utf-8")
        data = json.loads(payload)

        timestamp = data.get("timestamp", time.time())
        client_id = data.get("client_id", "unknown")
        msg_id = data.get("id_messege", "----")
        data_length = len(data.get("data", ""))

        msg_counter += 1  # increment counter

        # Tulis sesuai urutan header
        row = [timestamp, client_id, msg_id, data_length, msg_counter]

        with open(LOG_FILE, mode="a", newline="") as f:
            writer = csv.writer(f)
            writer.writerow(row)

        print(
            f"[OK] Pesan diterima -> Client: {client_id}, MsgID: {msg_id}, Counter: {msg_counter}")

    except Exception as e:
        print(f"[ERROR] Gagal parsing pesan: {e}")


def generate_plot():
    client_ids, counters = [], []
    try:
        with open(LOG_FILE, mode="r") as f:
            reader = csv.DictReader(f)
            for row in reader:
                client_ids.append(int(row["Client ID"]))
                counters.append(int(row["Counter"]))

        if not client_ids:
            print("[WARN] Tidak ada data untuk divisualisasikan.")
            return

        plt.figure(figsize=(10, 6))
        plt.plot(client_ids, counters, marker="o", linestyle="-", color="g")
        plt.title("Jumlah Pesan Diterima per Client ID")
        plt.xlabel("Client ID")
        plt.ylabel("Jumlah Pesan (Counter)")
        plt.grid(True)
        plt.tight_layout()
        plt.savefig(PLOT_FILE)
        plt.close()

        print(f"[INFO] Grafik berhasil disimpan ke {PLOT_FILE}")

    except Exception as e:
        print(f"[ERROR] Gagal membuat grafik: {e}")


def main():
    client = mqtt.Client(client_id="subscriber_1")
    client.on_connect = on_connect
    client.on_message = on_message

    print("[INFO] Menghubungkan subscriber...")
    client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)

    try:
        client.loop_forever()
    except KeyboardInterrupt:
        print("\n[STOP] Subscriber dihentikan manual.")
        generate_plot()   # tetap buat grafik saat keluar
    finally:
        client.disconnect()
        print("[INFO] Koneksi ke broker ditutup.")


if __name__ == "__main__":
    main()

Penjelasan Fungsi Utama

  1. Inisialisasi Broker dan Direktori Hasil

    • Script akan membuat folder hasil (result/sample1) untuk menyimpan file log (logs.csv) dan grafik (plot.png).

  2. Fungsi on_connect()

    • Menjalin koneksi dengan broker MQTT dan melakukan subscribe ke topik yang telah ditentukan di file config/config.py.

  3. Fungsi on_message()

    • Menerima dan mendekode setiap pesan yang dikirim oleh klien publisher.

    • Data seperti timestamp, client_id, msg_id, dan panjang data (data_len) disimpan dalam file CSV agar dapat dianalisis lebih lanjut.

  4. Fungsi generate_plot()

    • Membuat grafik hubungan antara Client ID dan jumlah pesan yang diterima (Counter) untuk menggambarkan performa sistem dalam menerima pesan dari banyak klien secara paralel.

  5. Fungsi main()

    • Mengatur jalannya seluruh proses, mulai dari koneksi ke broker, menerima pesan, hingga menghentikan program dan menghasilkan grafik visualisasi.

 Langkah 4: Menjalankan Script Publisher1

    Setelah script subscriber dijalankan dan siap menerima pesan dari berbagai klien, tahap selanjutnya adalah menjalankan script publisher yang berfungsi mengirimkan data secara dinamis dari sejumlah klien yang terus bertambah. Tujuan pengujian ini adalah untuk mengetahui kemampuan perangkat (laptop) dalam menangani pertumbuhan jumlah klien pengirim pesan secara bersamaan hingga mencapai ambang batas penggunaan sumber daya sistem (CPU dan RAM).

Perintah Eksekusi

Program dijalankan menggunakan perintah berikut:

python -m core.publisher

Perintah ini akan menjalankan modul publisher yang berfungsi sebagai pengirim pesan dinamis (dynamic publisher manager). Prosesnya akan berjalan paralel menggunakan beberapa proses Python melalui modul multiprocessing. Berikut adalah source code yang digunakan:

import multiprocessing
import time
from config.config import BROKER_HOST, BROKER_PORT, TOPIC, generate_payload, get_system_status
import paho.mqtt.client as mqtt
from multiprocessing import Value, Event


# Fungsi dasar: publisher client
def publisher_client(client_id: int, global_msg_id, trigger_event: Event, stop_event: Event): # type: ignore
    client = mqtt.Client(client_id=f"pub_{client_id}")
    client.connect(BROKER_HOST, BROKER_PORT)
    client.loop_start()

    print(f"[READY] Publisher {client_id} siap menerima trigger.")
    try:
        while not stop_event.is_set():
            # Tunggu sinyal dari manager untuk kirim pesan
            if trigger_event.wait(timeout=1.0):
                # Dapat trigger → kirim pesan
                with global_msg_id.get_lock():
                    global_msg_id.value += 1
                    msg_id = global_msg_id.value

                payload = generate_payload(msg_id, client_id)
                client.publish(TOPIC, payload)
                print(
                    f"[SEND] Publisher {client_id} mengirim pesan ID {msg_id}")

                # Tunggu trigger berikutnya
                trigger_event.clear()

    except KeyboardInterrupt:
        print(f"[STOP] Publisher {client_id} dihentikan manual.")
    finally:
        client.loop_stop()
        client.disconnect()
        print(f"[EXIT] Publisher {client_id} selesai.")


# Manager: menambah publisher dan memicu pengiriman bersama
def run_dynamic_publishers(global_msg_id, cpu_threshold=95.0, ram_threshold=95.0, spawn_interval=3.0):
    publisher_processes = []
    client_count = 0

    trigger_event = multiprocessing.Event()
    stop_event = multiprocessing.Event()

    print("[INFO] Memulai dynamic publisher manager (logika absen bebas).")
    print(f"[INFO] Broker: {BROKER_HOST}:{BROKER_PORT}, Topik: {TOPIC}")

    try:
        while True:
            # Ambil status sistem
            sys_status = get_system_status()
            cpu_percent = sys_status['cpu_percent']
            ram_percent = sys_status['ram_percent']
            print(f"[DEBUG] CPU: {cpu_percent}%, RAM: {ram_percent}%")

            # Jika salah satu melebihi threshold → hentikan semua
            if cpu_percent >= cpu_threshold or ram_percent >= ram_threshold:
                print(
                    f"[STOP] CPU/RAM melebihi threshold! CPU={cpu_percent}%, RAM={ram_percent}%")
                stop_event.set()
                break

            # Tambah publisher baru tanpa batas
            client_count += 1
            p = multiprocessing.Process(
                target=publisher_client,
                args=(client_count, global_msg_id, trigger_event, stop_event)
            )
            p.daemon = True
            p.start()
            publisher_processes.append(p)
            print(
                f"[OK] Publisher ke-{client_count} ditambahkan (total aktif: {len(publisher_processes)})")

            # Setelah menambah publisher baru → trigger semua publisher untuk kirim pesan sekali
            print("[TRIGGER] Semua publisher aktif mengirim 1 pesan.")
            trigger_event.set()

            time.sleep(spawn_interval)

    except KeyboardInterrupt:
        print("\n[STOP] Pengujian dihentikan manual oleh user.")
        stop_event.set()

    finally:
        for p in publisher_processes:
            if p.is_alive():
                p.terminate()
        print("[INFO] Semua publisher dihentikan.")


# Entry point
if __name__ == "__main__":
    multiprocessing.freeze_support()
    global_msg_id = Value('i', 0)
    run_dynamic_publishers(global_msg_id, cpu_threshold=95,
                           ram_threshold=95, spawn_interval=3.0)
Penjelasan fungsi:
  1. publisher_client()
    Fungsi ini mendefinisikan satu klien publisher MQTT.

    • Setiap klien memiliki client_id unik (pub_1, pub_2, dst).

    • Publisher akan terus menunggu trigger dari fungsi manajer (run_dynamic_publishers) untuk mengirim pesan.

    • Payload dikirim dalam format JSON yang dihasilkan dari fungsi generate_payload() di file config.py.

    • Setelah setiap pengiriman, klien menunggu sinyal berikutnya.

  2. run_dynamic_publishers()
    Fungsi ini bertugas menambah jumlah klien publisher secara bertahap (dynamic spawning).

    • Setiap interval (default 3 detik), satu klien baru dibuat dan dijalankan.

    • Setelah setiap penambahan, semua klien yang aktif mengirimkan pesan serentak melalui trigger_event.

    • Pemantauan CPU dan RAM dilakukan secara real-time.

    • Jika penggunaan CPU atau RAM melebihi 95%, semua proses dihentikan otomatis.

  3. get_system_status() (dari config)
    Fungsi ini memantau penggunaan sumber daya perangkat (CPU dan RAM) untuk memastikan beban sistem masih dalam batas aman.

  4. global_msg_id dan Sinkronisasi

    • Value('i', 0) digunakan untuk menyimpan ID pesan global yang aman digunakan oleh banyak proses.

    • Sinkronisasi dilakukan dengan get_lock() agar tidak terjadi race condition antar proses publisher.

  5. Parameter Penting

    • cpu_threshold dan ram_threshold: batas aman maksimal penggunaan sumber daya sebelum semua publisher dihentikan.

    • spawn_interval: jeda waktu antar penambahan klien publisher (default 3 detik).

Pengujian ini menghasilkan data:

  • Jumlah maksimum klien publisher yang dapat aktif sebelum sistem mencapai batas sumber daya (CPU/RAM 95%).

  • Performa sistem dalam menanggapi pertumbuhan klien yang dinamis.

  • Stabilitas koneksi antar klien publisher dan broker MQTT.

Data dari pengujian ini akan menjadi dasar penentuan jumlah klien uji (100, 200, 300, 500) yang digunakan pada tahap selanjutnya yaitu pengujian pengaruh variasi topik MQTT terhadap metrik performa bandwidth, latency, throughput, dan efisiensi transmisi data.


Langkah 5: Menjalankan Script Analisis Metrik MQTT

Tahap ini bertujuan untuk mengukur pengaruh variasi panjang topik (topic length) terhadap kinerja protokol Message Queuing Telemetry Transport (MQTT) pada jaringan Internet of Things (IoT).
Pengujian difokuskan pada empat parameter performa utama:

  1. Bandwidth, total kapasitas data yang ditransfer per detik.

  2. Throughput, jumlah pesan yang diterima per detik.

  3. Latency, waktu tunda dari pengiriman hingga penerimaan pesan.

  4. Packet Loss, jumlah pesan yang hilang selama proses transmisi.

Pengujian dilakukan untuk lima variasi panjang topik, masing-masing dengan jumlah karakter (5, 10, 20, 50, dan 100 karakter). Setiap variasi topik diuji pada empat level jumlah klien publisher yaitu (100, 200, 300, dan 500 klien), dengan konfigurasi broker, port, dan payload yang sama seperti tahap sebelumnya.

Pengambilan metrik dijalankan melalui perintah berikut:

python -m analysis.metrics

Perintah ini mengeksekusi modul metrics.py yang berfungsi sebagai subscriber analisis performa MQTT, mencatat seluruh data pesan yang diterima dari broker untuk kemudian menghitung seluruh metrik performa. Berikut source code lengkap:

import paho.mqtt.client as mqtt
import time
import json
import csv
import os
from datetime import datetime

from config.config import BROKER_HOST, BROKER_PORT, TOPIC

# Parameter tetap
NUM_MESSAGES_PER_CLIENT = 10

# Variabel global
message_count = 0
byte_count = 0
latencies = []
received_ids = {}
start_time = None
end_time = None


def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print(f"[INFO] Terhubung ke broker {BROKER_HOST}:{BROKER_PORT}")
        client.subscribe(TOPIC)
    else:
        print(f"[ERROR] Gagal koneksi, code {rc}")


def on_message(client, userdata, msg):
    global message_count, byte_count, start_time, end_time

    if start_time is None:
        start_time = time.time()  # waktu pesan pertama diterima

    end_time = time.time()  # update terus sampai pesan terakhir

    payload = msg.payload.decode("utf-8")
    byte_count += len(payload)
    message_count += 1

    try:
        data = json.loads(payload)
        client_id = data.get("client_id")
        msg_id = data.get("id_messege")
        ts_sent = data.get("timestamp")

        # Simpan id untuk packet loss analisis
        if client_id not in received_ids:
            received_ids[client_id] = set()
        received_ids[client_id].add(msg_id)

        # Hitung latency
        ts_received = time.time()
        latency = ts_received - float(ts_sent)
        latencies.append(latency)

    except Exception as e:
        print(f"[WARNING] Error parsing payload: {e}")


def save_results():
    global start_time, end_time

    duration = (end_time - start_time) if start_time and end_time else 1
    bandwidth = byte_count / duration  # bytes per second
    throughput = message_count / duration  # messages per second
    avg_latency = sum(latencies) / len(latencies) if latencies else 0
    max_latency = max(latencies) if latencies else 0
    min_latency = min(latencies) if latencies else 0

    # Hitung packet loss dengan membandingkan id_messege
    total_expected = 0
    total_received = 0
    lost_messages = 0
    detail_loss = {}

    expected_ids = {f"{i:04d}" for i in range(1, NUM_MESSAGES_PER_CLIENT + 1)}

    for cid, ids in received_ids.items():
        total_expected += NUM_MESSAGES_PER_CLIENT
        total_received += len(ids)

        missing = expected_ids - ids  # pesan yang hilang
        lost_messages += len(missing)
        detail_loss[cid] = missing

    packet_loss_percent = (lost_messages / total_expected *
                           100) if total_expected > 0 else 0

    # Simpan ke CSV ringkasan
    os.makedirs("analysis/results", exist_ok=True)
    filename = f"analysis/results/metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"

    with open(filename, mode="w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["Total Pesan Diterima", "Total Bytes", "Durasi (s)",
                         "Bandwidth (bytes/s)", "Throughput (msg/s)",
                         "Latency Rata-rata (s)", "Latency Min (s)", "Latency Max (s)",
                         "Total Pesan Expected", "Pesan Hilang", "Packet Loss (%)"])
        writer.writerow([message_count, byte_count, f"{duration:.2f}",
                         f"{bandwidth:.2f}", f"{throughput:.2f}",
                         f"{avg_latency:.6f}", f"{min_latency:.6f}", f"{max_latency:.6f}",
                         total_expected, lost_messages, f"{packet_loss_percent:.2f}"])

    print(f"[INFO] Hasil ringkasan metrik disimpan di {filename}")

    # Simpan detail loss per client ke file terpisah
    loss_file = f"analysis/results/packet_loss_detail_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
    with open(loss_file, mode="w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["Client ID", "Pesan Hilang"])
        for cid, missing in detail_loss.items():
            writer.writerow(
                [cid, ", ".join(sorted(missing)) if missing else "NONE"])

    print(f"[INFO] Detail packet loss disimpan di {loss_file}")


def main():
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message

    client.connect(BROKER_HOST, BROKER_PORT, 60)

    try:
        print("[INFO] Metrics subscriber berjalan... tekan CTRL+C untuk berhenti.")
        client.loop_forever()
    except KeyboardInterrupt:
        print("\n[INFO] Analisis dihentikan manual.")
    finally:
        save_results()


if __name__ == "__main__":
    main()


Penjelasan program:

1. Proses Koneksi dan Subskripsi

Fungsi on_connect() memastikan bahwa subscriber berhasil terhubung ke broker MQTT yang telah ditentukan pada file konfigurasi (config.py) dan otomatis berlangganan ke topik uji saat koneksi berhasil.

2. Pengumpulan Data Pesan

Setiap pesan yang diterima oleh broker akan memicu fungsi on_message(). Pada tahap ini:

  • Pesan didekode dari format JSON.

  • Dihitung jumlah byte yang diterima (byte_count).

  • Dicatat waktu penerimaan dan perhitungan latensi (latencies).

  • Dicatat juga client_id dan id_messege untuk mendeteksi pesan yang hilang (packet loss).

3. Perhitungan Metrik Performa

Ketika proses dihentikan (melalui CTRL + C), fungsi save_results() dijalankan untuk menghitung metrik-metrik berikut:

  • Durasi transmisi (s)

  • Total pesan diterima

  • Total bytes diterima

  • Bandwidth (bytes/s)

  • Throughput (pesan/s)

  • Latency rata-rata, minimum, dan maksimum (s)

  • Packet Loss (%)
    dihitung dari selisih antara jumlah pesan yang diharapkan dan yang benar-benar diterima untuk setiap klien.


Langkah 6: Menjalankan script publisher2

Perintah eksekusi dilakukan melalui terminal menggunakan format:

python -m core.publisher2 --clients <jumlah_client>

Sebagai contoh, untuk menjalankan uji dengan 100 klien, perintahnya adalah:

python -m core.publisher2 --clients 100

Berikut adalah source code yang digunakan:

import multiprocessing
import time
import argparse
import paho.mqtt.client as mqtt

from config.config import BROKER_HOST, BROKER_PORT, TOPIC, generate_payload, get_system_status

NUM_MESSAGES = 10
LEVEL_TIMEOUT = 60


def publisher_client(client_id: int):
    client = mqtt.Client(client_id=f"client-{client_id}")
    client.connect(BROKER_HOST, BROKER_PORT, 60)

    for msg_id in range(1, NUM_MESSAGES + 1):
        payload = generate_payload(msg_id, client_id)
        client.publish(TOPIC, payload, qos=0)
    client.disconnect()


def run_level(client_count: int):
    print(f"\n[INFO] Menjalankan test dengan {client_count} client...")

    processes = []
    start_time = time.time()

    # Buat semua client dalam proses terpisah
    for cid in range(1, client_count + 1):
        p = multiprocessing.Process(target=publisher_client, args=(cid,))
        processes.append(p)
        p.start()

    # Tunggu proses dengan timeout
    for p in processes:
        p.join(timeout=LEVEL_TIMEOUT)

    end_time = time.time()
    duration = end_time - start_time

    # Ambil status resource di akhir level
    system_status = get_system_status()

    print(
        f"[INFO] Test {client_count} client selesai dalam {duration:.2f} detik.")
    print(f"[INFO] Status CPU: {system_status['cpu_percent']}% | RAM: {system_status['ram_percent']}% "
          f"({system_status['available_ram_mb']} MB tersedia)")


def main():
    parser = argparse.ArgumentParser(description="MQTT Publisher Load Test")
    parser.add_argument("--clients", type=int, required=True,
                        help="Jumlah client yang akan dijalankan")
    args = parser.parse_args()

    run_level(args.clients)


if __name__ == "__main__":
    main()

Setiap klien yang dijalankan akan mengirimkan 10 pesan ke broker MQTT dengan topik yang telah ditentukan dalam file konfigurasi. Kode di atas terdiri dari tiga bagian utama:

  1. Inisialisasi Publisher Client

  2. Manajemen Level Pengujian

  3. Eksekusi Utama (Main)

Seluruh data hasil pengukuran dikumpulkan oleh skrip analysis.metrics yang dijalankan secara paralel sebelum memulai pengujian. Skrip tersebut menghitung bandwidth, throughput, latency, serta packet loss dari seluruh pesan yang diterima. Data hasil uji kemudian secara otomatis disimpan dalam folder:

dalam format CSV dengan dua file utama:

  1. File ringkasan metrik (metrics_YYYYMMDD_HHMMSS.csv)

  2. File detail kehilangan paket (packet_loss_detail_YYYYMMDD_HHMMSS.csv)

Hasil ini digunakan untuk melakukan analisis lanjutan terkait pengaruh panjang topik dan jumlah klien terhadap performa komunikasi MQTT.

VIDIO UJI COBA


Komentar

Postingan populer dari blog ini

Mengenal Metode Komunikasi Long Range (LoRa)

Tutorial Instalasi MicroPython di Mikrokontroler ESP32

Tutorial Mengirim Data Ke Server Client (ChirpStack) Via LoRa