Network Traffic Analytics goflow2+kafta+clickhouse+grafana

Berikut adalah Dokumentasi Final: Pipeline Network Traffic Analytics (NOC Enterprise) dari awal hingga akhir, menggunakan arsitektur modern, ringan, dan 100% otomatis.


📋 Prasyarat Sistem

  • OS: Ubuntu Server 22.04 / 24.04 (Fresh Install)

  • Hardware: Minimal 4 Core CPU, 8GB RAM (Direkomendasikan 16GB untuk >10Gbps traffic), SSD/NVMe.

  • Software Terpasang: Docker, Docker Compose, Git, Python 3.

  • Hak Akses: User sudah dimasukkan ke grup docker (sudo usermod -aG docker $USER lalu newgrp docker).

sudo apt update
sudo apt install docker.io docker-compose-v2 -y
sudo systemctl enable --now docker

Tambahkan ini:
nano /etc/pve/lxc/CTID.conf

 

unprivileged: 0
lxc.apparmor.profile: unconfined
 

Tahap 1: Instalasi Core Services (Docker Compose)

Kita menggunakan Kafka dalam mode KRaft (tanpa Zookeeper) agar hemat RAM, GoFlow2 untuk menangkap sFlow, ClickHouse untuk analitik forensik, dan Grafana untuk visualisasi.

  1. Buat direktori proyek:

    Bash
    mkdir -p ~/flow-monitor/data/kafka
    cd ~/flow-monitor
    
  2. Buat file docker-compose.yml:

    Bash
    nano docker-compose.yml
    
  3. Copy-paste konfigurasi final ini:

    YAML
    version: '3.8'
    
    services:
      goflow2:
        image: netsampler/goflow2:latest
        restart: unless-stopped
        command: -format=json -transport=kafka -transport.kafka.brokers=kafka:29092 -transport.kafka.topic=flows
        ports:
          - "6343:6343/udp"
        depends_on:
          - kafka
    
      kafka:
        image: confluentinc/cp-kafka:7.6.0
        restart: unless-stopped
        environment:
          KAFKA_NODE_ID: 1
          KAFKA_PROCESS_ROLES: 'broker,controller'
          KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
          KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
          CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A'
          KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://127.0.0.1:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_HEAP_OPTS: "-Xmx1G -Xms512M"
          KAFKA_LOG_RETENTION_HOURS: 2
          KAFKA_LOG_RETENTION_BYTES: 1073741824
          TZ: Asia/Jakarta
        volumes:
          - ./data/kafka:/var/lib/kafka/data
        ports:
          - "127.0.0.1:9092:9092"
    
      clickhouse:
        image: clickhouse/clickhouse-server:latest
        restart: unless-stopped
        ports:
          - "8123:8123"
          - "9000:9000"
        volumes:
          - ./data/clickhouse:/var/lib/clickhouse
        depends_on:
          - kafka
    
      grafana:
        image: grafana/grafana:latest
        restart: unless-stopped
        ports:
          - "3000:3000"
        volumes:
          - ./data/grafana:/var/lib/grafana
        depends_on:
          - clickhouse
    
  4. Jalankan seluruh sistem:

    Bash
    docker compose up -d
    

Tahap 2: Konfigurasi Pipeline ClickHouse (Optimized)

Masuk ke terminal ClickHouse untuk membangun skema database, tabel antrean, pipa pemroses, dan kamus resolusi.

  1. Masuk ke konsol ClickHouse:

    Bash
    docker exec -it flow-monitor-clickhouse-1 clickhouse-client
    
  2. Eksekusi baris SQL ini secara berurutan:

    A. Persiapan Database & Keamanan

    SQL
    CREATE DATABASE IF NOT EXISTS netflow;
    CREATE USER IF NOT EXISTS grafana IDENTIFIED WITH no_password;
    GRANT SELECT ON netflow.* TO grafana;
    GRANT dictGet ON netflow.* TO grafana;
    

    B. Kamus Router & Sampling (Kalibrasi Data Asli)

    SQL
    CREATE TABLE netflow.routers (
        sampler_address String,
        name String,
        sampling UInt32
    ) ENGINE = Dictionary;
    
    CREATE DICTIONARY netflow.router_dict (
        sampler_address String,
        name String,
        sampling UInt32
    ) PRIMARY KEY sampler_address
    SOURCE(CLICKHOUSE(DB 'netflow' TABLE 'routers'))
    LIFETIME(MIN 300 MAX 360) LAYOUT(COMPLEX_KEY_HASHED());
    
    -- Daftarkan IP Router Anda di sini beserta rate sampling-nya
    INSERT INTO netflow.routers VALUES ('103.170.100.226', 'Router-Core-1', 4096);
    

    C. Pipa Utama (Real Traffic)

    SQL
    -- Tabel Utama
    CREATE TABLE netflow.flows (
        Date Date,
        TimeReceived DateTime,
        sampler_address String,
        src_addr String,
        dst_addr String,
        src_port UInt32,
        dst_port UInt32,
        tcp_flags UInt32,
        bytes UInt64,
        packets UInt64,
        proto String
    ) ENGINE = MergeTree()
    PARTITION BY Date
    ORDER BY (TimeReceived, sampler_address)
    TTL Date + INTERVAL 7 DAY; -- Auto-delete raw data setelah 7 hari
    
    -- Corong Kafka (Tanpa kolom mati)
    CREATE TABLE netflow.kafka_flows (
        sampler_address String,
        src_addr String,
        dst_addr String,
        src_port UInt32,
        dst_port UInt32,
        tcp_flags UInt32,
        proto String,
        bytes UInt64,
        packets UInt64
    ) ENGINE = Kafka
    SETTINGS kafka_broker_list = 'kafka:29092',
             kafka_topic_list = 'flows',
             kafka_group_name = 'clickhouse_reader',
             kafka_format = 'JSONEachRow';
    
    -- Pipa Penghubung (MV) dengan Auto-Multiplication Sampling
    CREATE MATERIALIZED VIEW netflow.flows_mv TO netflow.flows AS
    SELECT
        toDate(now()) AS Date,
        now() AS TimeReceived,
        sampler_address,
        src_addr,
        dst_addr,
        src_port,
        dst_port,
        tcp_flags,
        bytes * dictGetOrDefault('netflow.router_dict', 'sampling', tuple(sampler_address), toUInt32(1)) AS bytes,
        packets * dictGetOrDefault('netflow.router_dict', 'sampling', tuple(sampler_address), toUInt32(1)) AS packets,
        proto
    FROM netflow.kafka_flows;
    

    D. Kamus ASN & Tabel Historis (Agregasi 1 Tahun)

    SQL
    -- Tabel Penampung ASN
    CREATE TABLE netflow.asn_data (
        ip_range String,
        asn UInt32,
        asn_name String
    ) ENGINE = Dictionary;
    
    -- Kamus ASN Cerdas
    CREATE DICTIONARY netflow.asn_dict (
        ip_range String,
        asn UInt32,
        asn_name String
    ) PRIMARY KEY ip_range
    SOURCE(CLICKHOUSE(DB 'netflow' TABLE 'asn_data'))
    LIFETIME(MIN 3600 MAX 7200) LAYOUT(ip_trie());
    
    -- Tabel Historis Ringan (1 Tahun)
    CREATE TABLE netflow.flows_hourly (
        Date Date,
        Hour DateTime,
        sampler_address String,
        src_asn_name String,
        dst_asn_name String,
        proto String,
        total_bytes UInt64,
        total_packets UInt64
    ) ENGINE = SummingMergeTree()
    ORDER BY (Date, Hour, sampler_address, src_asn_name, dst_asn_name, proto)
    TTL Date + INTERVAL 1 YEAR;
    
    -- MV untuk Menerjemahkan IP ke ASN sebelum disimpan
    CREATE MATERIALIZED VIEW netflow.flows_hourly_mv TO netflow.flows_hourly AS
    SELECT
        toDate(TimeReceived) AS Date,
        toStartOfHour(TimeReceived) AS Hour,
        sampler_address,
        dictGetOrDefault('netflow.asn_dict', 'asn_name', tuple(toIPv4OrDefault(src_addr)), 'Unknown ASN') AS src_asn_name,
        dictGetOrDefault('netflow.asn_dict', 'asn_name', tuple(toIPv4OrDefault(dst_addr)), 'Unknown ASN') AS dst_asn_name,
        proto,
        sum(bytes) AS total_bytes,
        sum(packets) AS total_packets
    FROM netflow.flows
    GROUP BY Date, Hour, sampler_address, src_asn_name, dst_asn_name, proto;
    

    Ketik exit untuk keluar dari ClickHouse.


Tahap 3: Otomatisasi Update ASN (Cronjob)

Agar tabel ASN selalu valid, kita buat script yang mengunduh database BGP global dan menyuntikkannya ke ClickHouse setiap malam.

  1. Buat script bash:

    Bash
    nano ~/flow-monitor/update_asn.sh
    
  2. Isi dengan kode berikut (pastikan file update_asn.py sudah ada di folder yang sama):

    Bash
    #!/bin/bash
    cd /home/$USER/flow-monitor
    echo "Mulai Update ASN: $(date)"
    
    # 1. Eksekusi script Python untuk generate asn_data.csv
    /usr/bin/python3 update_asn.py
    
    # 2. Inject ke ClickHouse tanpa sudo
    docker exec -i flow-monitor-clickhouse-1 clickhouse-client --query="TRUNCATE TABLE netflow.asn_data"
    cat asn_data.csv | docker exec -i flow-monitor-clickhouse-1 clickhouse-client --query="INSERT INTO netflow.asn_data FORMAT CSV"
    
    # 3. Reload kamus di RAM
    docker exec -i flow-monitor-clickhouse-1 clickhouse-client --query="SYSTEM RELOAD DICTIONARY netflow.asn_dict"
    
    echo "Selesai Update ASN: $(date)"
    echo "----------------------------------------"
    
  3. Berikan hak eksekusi:

    Bash
    chmod +x ~/flow-monitor/update_asn.sh
    
  4. Jadwalkan di Cron (Berjalan setiap jam 02:00 pagi):

    Bash
    crontab -e
    

    Tambahkan baris ini di paling bawah:

    Plaintext
    0 2 * * * /home/user/flow-monitor/update_asn.sh >> /home/user/flow-monitor/asn_update.log 2>&1
    

Tahap 4: Visualisasi Grafana

  1. Buka Grafana di http://<IP_SERVER>:3000 (User/Pass default: admin / admin).

  2. Masuk ke Connections > Data sources > Add data source.

  3. Cari ClickHouse (jika tidak ada, install plugin Grafana ClickHouse via CLI: docker exec -it flow-monitor-grafana-1 grafana-cli plugins install grafana-clickhouse-datasource, lalu restart container grafana).

  4. Konfigurasi Data Source:

    • Server address: clickhouse

    • Server port: 9000

    • Username: grafana

    • Default database: netflow

    • Klik Save & Test.

  5. Import kode JSON dashboard Final yang sudah dilengkapi dengan variabel Dropdown Router dan pemisahan Trafik Inbound/Outbound (berdasarkan IP Prefix internal Anda).