Notifications

Notifications

JEE, Kafka und Zookeeper sind großartige Technologien, benötigen aber viele Ressourcen. Unser Ziel ist es, einen Message Broker mit minimalem Ressourcenverbrauch zu entwickeln. Wir gehen von 4 GB RAM, 2 Kerneln und einem monatlichen Budget von 7 Euro aus. Es klingt unmöglich, aber wir versuchen es.

1. Investitionen

Wir kaufen 1 vServer. Als Image-Installationsserver verwenden wir Debian (Debian-1200-*-minimal - Debian Bookworm oder Debian-1300-*-minimal - Trixie).

2. Installation von Abhängigkeiten unter Linux

Zum Kompilieren von C++ benötigen wir: einen Compiler mit C++17-Unterstützung, das cmake-Dienstprogramm, die hiredis-Bibliothek (die Grundlage für Redis in C++) und den vcpkg-Paketmanager zur einfachen Installation von uWebSockets und redis-plus-plus.


# Wir aktualisieren Paketlisten und installieren grundlegende Build-Dienstprogramme.
sudo apt update
sudo apt install -y build-essential cmake git curl zip unzip tar pkg-config libssl-dev

# Installieren und starten Sie Redis (falls noch nicht installiert).
sudo apt install -y redis-server
sudo systemctl start redis-server
              

3. Installation von C++-Bibliotheken über vcpkg

Ich empfehle, Bibliotheken über vcpkg zu installieren, um das System nicht mit manueller Assemblierung zu überladen.


# Klonen Sie vcpkg in Ihr Home-Verzeichnis
cd ~
git clone https://github.com/microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh

# Installation der erforderlichen Bibliotheken
./vcpkg install uwebsockets redis-plus-plus
              

4. Erstellung einer Projektstruktur

Erstellen Sie ein Arbeitsverzeichnis für das Projekt und die zuvor besprochenen Dateien.


mkdir -p ~/cpp_push_server
cd ~/cpp_push_server
              

Erstellen Sie eine Datei CMakeLists.txt


cmake_minimum_required(VERSION 3.15)
project(PushServerCPlusPlus LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

# 1. Systemabhängigkeiten
find_package(Threads REQUIRED)
find_package(ZLIB REQUIRED)

# 2. Pfaten finden uWebSockets
find_path(UWEBSOCKETS_INCLUDE_DIR "uwebsockets/App.h" REQUIRED)

# Wir suchen die Low-Level-Bibliothek uSockets, ohne die der Linker Fehler erzeugt.
find_library(USOCKETS_LIBRARY NAMES uSockets REQUIRED)

# 3. Pfaten finden redis++
find_path(REDISPP_INCLUDE_DIR "sw/redis++/redis++.h" REQUIRED)
find_library(REDISPP_LIBRARY NAMES redis++ REQUIRED)
find_library(HIREDIS_LIBRARY NAMES hiredis REQUIRED)

add_executable(push_server main.cpp)

# Ordner mit Headerdateien verbinden
target_include_directories(push_server PRIVATE
    ${UWEBSOCKETS_INCLUDE_DIR}
    ${REDISPP_INCLUDE_DIR}
)

# Verknüpfe alle gefundenen Bibliotheken
target_link_libraries(push_server PRIVATE
    Threads::Threads
    ZLIB::ZLIB
    ${USOCKETS_LIBRARY}
    ${REDISPP_LIBRARY}
    ${HIREDIS_LIBRARY}
)
              

Datei main.cpp


#include <uwebsockets/App.h>
#include <sw/redis++/redis++.h>
#include <iostream>
#include <string>
#include <string_view>
#include <optional>
#include <thread>
#include <vector>
#include <algorithm>
#include <mutex>
#include <csignal>
#include <atomic>
#include <chrono> // Added for std::chrono

using namespace sw::redis;

struct UserData {
    std::string user_id;
};

struct PushMessage {
    std::string user_id;
    std::string stream_id;
    std::string text;
};

using DynamicWebSocket = uWS::WebSocket<false, true, UserData>;

static std::vector<DynamicWebSocket*> active_sockets;
static std::vector<PushMessage> internal_queue;
static std::vector<std::string> auth_signals;
static std::mutex queue_mutex;
static std::atomic<bool> server_running(true);
static uWS::Loop* main_network_loop = nullptr;
static struct us_listen_socket_t* global_listen_socket = nullptr;

std::pair<std::string, std::string> split_by_first_colon(const std::string& raw) {
    size_t pos = raw.find(':');
    if (pos == std::string::npos) return {raw, ""};
    return {raw.substr(0, pos), raw.substr(pos + 1)};
}

void signal_handler(int signum) {
    std::cout << "[System] Shutdown signal received: " << signum << std::endl;
    server_running = false;

    if (main_network_loop) {
        main_network_loop->defer([]() {
            std::cout << "[Net] Closing active sockets gracefully..." << std::endl;

            auto sockets_copy = active_sockets;
            for (auto* ws : sockets_copy) {
                if (ws) {
                    ws->end(1001, "Server shutting down");
                }
            }
            active_sockets.clear();

            if (global_listen_socket) {
                us_listen_socket_close(false, global_listen_socket);
                global_listen_socket = nullptr;
            }
        });
    }
}

int main() {
    std::signal(SIGINT, signal_handler);
    std::signal(SIGTERM, signal_handler);

    ConnectionOptions connection_options;
    connection_options.host = "127.0.0.1";
    connection_options.port = 6379;

    ConnectionPoolOptions pool_options;
    pool_options.size = 5;

    std::optional<Redis> redis;
    try {
        redis.emplace(connection_options, pool_options);
        std::cout << "[Redis] Connected via Pool" << std::endl;

        try {
            redis->xgroup_create("global_push_stream", "push_group", "$", true);
        } catch (...) {}
    } catch (const Error &err) {
        std::cerr << "[Redis] Error: " << err.what() << std::endl;
        return 1;
    }

    std::thread worker_thread([connection_options, pool_options]() {
        try {
            Redis r_worker(connection_options, pool_options);
            std::cout << "[Thread] Consumer Group Worker started" << std::endl;

            while (server_running) {
                bool worked = false; // Flag to track if we actually processed any data
                std::vector<std::string> local_signals;
                {
                    std::lock_guard<std::mutex> lock(queue_mutex);
                    if (!auth_signals.empty()) {
                        local_signals = std::move(auth_signals);
                        auth_signals.clear();
                    }
                }

                if (!local_signals.empty()) {
                    try {
                        using Item = std::pair<std::string, std::unordered_map<std::string, std::string>>;
                        using StreamResult = std::pair<std::string, std::vector<Item>>;
                        std::vector<StreamResult> pending_result;

                        r_worker.xreadgroup("push_group", "consumer_cxx",
                                            "global_push_stream", "0",
                                            std::back_inserter(pending_result));

                        for (const auto& stream_data : pending_result) {
                            for (const auto& [id, fields] : stream_data.second) {
                                auto it = fields.find("payload");
                                if (it != fields.end()) {
                                    auto [target_user, remain] = split_by_first_colon(it->second);

                                    if (std::find(local_signals.begin(), local_signals.end(), target_user) != local_signals.end()) {
                                        std::lock_guard<std::mutex> lock(queue_mutex);
                                        internal_queue.push_back({target_user, id, remain});
                                        std::cout << "[Thread] Triggered recovery from PEL for user " << target_user << " ID: " << id << std::endl;
                                        worked = true;
                                    }
                                }
                            }
                        }
                    } catch (...) {}
                }

                using Item = std::pair<std::string, std::unordered_map<std::string, std::string>>;
                using StreamResult = std::pair<std::string, std::vector<Item>>;
                std::vector<StreamResult> result;

                r_worker.xreadgroup("push_group", "consumer_cxx",
                                    "global_push_stream", ">",
                                    500,
                                    std::back_inserter(result));

                for (const auto& stream_data : result) {
                    for (const auto& [id, fields] : stream_data.second) {
                        auto it = fields.find("payload");
                        if (it != fields.end()) {
                            std::cout << "[Thread] New Message Received ID: " << id << std::endl;

                            auto [target_user, remain] = split_by_first_colon(it->second);
                            if (!target_user.empty() && !remain.empty()) {
                                std::lock_guard<std::mutex> lock(queue_mutex);
                                internal_queue.push_back({target_user, id, remain});
                                worked = true;
                            }
                        }
                    }
                }

                // OPTIMIZATION: If no new messages and no auth signals were processed,
                // force the thread to sleep for 10ms to completely free the CPU core.
                if (!worked && result.empty()) {
                    std::this_thread::sleep_for(std::chrono::milliseconds(10));
                }
            }
            std::cout << "[Thread] Worker stopped safely" << std::endl;
        } catch (const Error &err) {
            std::cerr << "[Thread] Error: " << err.what() << std::endl;
        }
    });

    worker_thread.detach();

    uWS::App app;

    app.ws<UserData>("/*", {
        .compression = uWS::DISABLED,
        .maxPayloadLength = 16 * 1024,
        .idleTimeout = 120,

        .open = [](auto *ws) {
            std::cout << "[Net] Socket opened. Waiting for AUTH..." << std::endl;
            active_sockets.push_back(ws);
        },

        .message = [&redis](auto *ws, std::string_view message, uWS::OpCode opCode) {
            UserData *user_data = ws->getUserData();

            if (message.rfind("AUTH:", 0) == 0) {
                std::string uid = std::string(message.substr(5));
                user_data->user_id = uid;
                std::cout << "[Net] Client authenticated as: '" << uid << "'" << std::endl;

                std::lock_guard<std::mutex> lock(queue_mutex);
                auth_signals.push_back(uid);
                return;
            }

            if (message.rfind("ACK:", 0) == 0) {
                std::string ack_stream_id = std::string(message.substr(4));
                std::cout << "[Net] Received ACK for Stream ID: " << ack_stream_id << " from " << user_data->user_id << std::endl;

                try {
                    redis->xack("global_push_stream", "push_group", ack_stream_id);
                    std::cout << "[Redis] Message " << ack_stream_id << " confirmed via XACK" << std::endl;
                } catch (const Error &e) {
                    std::cerr << "[Redis] XACK Failed: " << e.what() << std::endl;
                }
            }
        },

        .close = [](auto *ws, int code, std::string_view message) {
            UserData *user_data = ws->getUserData();
            if (user_data && !user_data->user_id.empty()) {
                std::cout << "[Net] User " << user_data->user_id << " disconnected" << std::endl;
            }
            active_sockets.erase(std::remove(active_sockets.begin(), active_sockets.end(), ws), active_sockets.end());
        }
    });

    app.listen(9001, [](auto *listen_socket) {
        if (listen_socket) {
            main_network_loop = uWS::Loop::get();
            global_listen_socket = listen_socket;
            std::cout << "=== PRODUCTION GRADED SERVER STARTED ON PORT 9001 ===" << std::endl;

            struct us_loop_t* loop = (struct us_loop_t*) uWS::Loop::get();
            struct us_timer_t* timer = us_create_timer(loop, 0, 0);

            us_timer_set(timer, [](struct us_timer_t* t) {
                if (active_sockets.empty() && internal_queue.empty()) return;

                std::vector<PushMessage> local_copy;
                {
                    std::lock_guard<std::mutex> lock(queue_mutex);
                    if (internal_queue.empty()) return;

                    local_copy = std::move(internal_queue);
                    internal_queue.clear();
                }

                for (const auto& msg : local_copy) {
                    for (auto* ws : active_sockets) {
                        UserData* user_data = ws->getUserData();
                        if (user_data && user_data->user_id == msg.user_id) {
                            std::cout << "[Net] Routing push to socket: " << msg.stream_id << std::endl;
                            std::string forward_payload = msg.stream_id + ":" + msg.text;
                            ws->send(forward_payload, uWS::OpCode::TEXT);
                        }
                    }
                }
            }, 10, 10);
        }
    });
    app.run();
    std::cout << "[System] Server event loop stopped cleanly. Exiting..." << std::endl;
    return 0;
}
              

5. Erstellen und Kompilieren eines C++-Servers

Wir erstellen ein Build-Verzeichnis und weisen CMake an, die vcpkg-Toolchain zu verwenden, damit es die Bibliotheken selbst finden kann.


mkdir build
cd build
cmake -DCMAKE_TOOLCHAIN_FILE=$HOME/vcpkg/scripts/buildsystems/vcpkg.cmake ..
cmake --build . --config Release
./push_server

Info:

  • Sie sollten in der Konsole Meldungen sehen, die darauf hinweisen, dass Sie erfolgreich eine Verbindung zu Redis hergestellt haben und dass der Server auf Port 9001 gestartet wurde.

6. Python-Testskript-Client

Ein Skript, das einen echten Client (Frontend) simuliert. Es kommuniziert über das ws://-Protokoll mit einem C++-Server.

Es empfiehlt sich, venv zu verwenden.


python3 -m venv .venv
source .venv/bin/activate
              

Installation erforderlich websockets: pip install websockets


import asyncio
import websockets

async def listen_server():
    # Wir verwenden die feste IPv4-Adresse 127.0.0.1 anstelle des Wortes localhost. (FIX IPv4)
    url = "ws://127.0.0.1:9001/"
    print(f"[Client] Connecting to {url}...")

    try:
        async with websockets.connect(url) as websocket:
            print("[Client] Connection established!")

            # Authorisieren Sie sich auf dem C++-Server.
            await websocket.send("AUTH:user_555")
            print("[Client] Sent AUTH:user_555. Ready to receive notifications...")

            async for message in websocket:
                print(f"[Client] RECEIVED PAYLOAD: {message}")

                if ":" in message:
                    msg_id = message.split(':', 1)[0]
                    await websocket.send(f"ACK:{msg_id}")
                    print(f"[Client] Sent response -> ACK:{msg_id}")

    except Exception as e:
        print(f"[Client] Error: {e}")

if __name__ == "__main__":
    try:
        asyncio.run(listen_server())
    except KeyboardInterrupt:
        print("\nClient stopped.")
              

7. Wie kann man das jetzt testen?

1: Starten Sie Ihren C++-Server im ersten Terminal.
2: Führen Sie diesen neuen reinen Python-Client in einem zweiten Terminal aus.
3: Öffnen Sie ein drittes Terminal und simulieren Sie das Senden einer Nachricht im Namen Ihres Haupt-Backends über die Standard-Redis-Konsole:


redis-cli XADD global_push_stream MAXLEN 100000 '*' payload "user_555:msg_streaming_999:Vollständiger Produktionstest!"
              

1: Wir stoppen test.py
2: Wir senden noch ein paar Nachrichten an Redis.
3: Wir starten test.py wieder

Die Nachrichten müssen den client erreichen.

Info:

  • Das war's! Wir haben ein funktionierendes Kampfsystem – wir können es einsetzen!

8. Erstelle eine Dockerfile

Diese Datei verwendet das mehrstufige Build-Muster. Die erste Stufe (Builder) lädt die Abhängigkeiten herunter und kompiliert unseren C++-Code, während die zweite Stufe (Minimal) nur die fertige Binärdatei verwendet. Der resultierende Container ist sehr klein, nur etwa 30–40 MB groß.

Erstellen Sie eine Dockerfile im Stammverzeichnis Ihres Projekts.


FROM ubuntu:24.04 AS builder

ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update && apt-get install -y \
    build-essential \
    cmake \
    git \
    curl \
    zip \
    unzip \
    tar \
    pkg-config \
    libssl-dev \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /opt
RUN git clone https://github.com \
    && cd vcpkg \
    && ./bootstrap-vcpkg.sh

RUN /opt/vcpkg/vcpkg install uwebsockets redis-plus-plus

WORKDIR /app
COPY CMakeLists.txt main.cpp ./

WORKDIR /app/build
RUN cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_TOOLCHAIN_FILE=/opt/vcpkg/scripts/buildsystems/vcpkg.cmake .. \
    && cmake --build . --config Release

FROM ubuntu:24.04 AS minimal

RUN apt-get update && apt-get install -y \
    libssl3 \
    libstdc++6 \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app
COPY --from=builder /app/build/push_server .

EXPOSE 9001

CMD ["./push_server"]
              

9. Erstelle eine docker-compose.yml-Datei.

Diese Datei verbindet den C++-Server mit Redis. Hinweis: Da sich beide in einem isolierten Docker-Netzwerk befinden, ändert sich die Redis-Adresse im C++-Code von 127.0.0.1 zum Systemnamen redis_db. Docker leitet den Datenverkehr automatisch um.

Erstellen Sie dort eine docker-compose.yml-Datei.


version: '3.8'

services:
  redis_db:
    image: redis:7.2-alpine
    container_name: push_redis
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    restart: always

  push_server:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: push_cxx_server
    ports:
      - "9001:9001"
    depends_on:
      - redis_db
    restart: always

volumes:
  redis_data:
              

10. Änderung in main.cpp

Um dem C++-Server den Zugriff auf Redis innerhalb von Docker zu ermöglichen, muss die Verbindungszeichenfolge geändert werden. Ersetzen Sie 127.0.0.1 durch den in der docker-compose.yml-Datei angegebenen Namen des Redis-Datenbankdienstes.

Nehmen Sie diese Änderung an zwei Stellen vor, an denen die Verbindung hergestellt wird: im Hauptpool und innerhalb von r_worker im Hintergrundthread.


// Davor: connection_options.host = "127.0.0.1";
// Danach:
connection_options.host = "redis_db";
              

11. Anleitung zum Starten des gesamten Systems mit einem einzigen Befehl

Sie müssen Redis nun nicht mehr manuell auf Ihrem System starten und den Code mit CMake kompilieren. Stellen Sie sicher, dass Docker Compose installiert ist (sudo apt install docker-compose-v2), navigieren Sie zum Projektordner und geben Sie Folgendes ein:


# Lokalen Redis-Prozess unter Linux stoppen, damit er nicht extern Port 6379 belegt.
sudo systemctl stop redis-server 2>/dev/null
# Führen Sie den Build aus und starten Sie die gesamte Infrastruktur im Hintergrund.
docker compose up --build -d
              

12. Wie man überprüft, ob alles funktioniert

Überprüfen Sie den Containerstatus: docker compose ps. Sie werden sehen, dass beide Dienste den Status `Up` haben. Überprüfen Sie die C++-Server-Logs: `docker compose logs push_server`. Dort sollte Folgendes stehen: [Redis] Connected via Pool und === PRODUCTION GRADED SERVER STARTED ON PORT 9001 ===. Führen Sie Ihr Python-Skript test.py auf Ihrem lokalen Rechner aus. Es verbindet sich erfolgreich mit dem Docker-Container unter ws://127.0.0.1:9001/. Senden Sie den Befehl über die lokale redis-cli ... – Docker hat Port 6379 extern weitergeleitet, sodass Ihr Push den Redis-Container erreicht, vom C++-Server empfangen und an Python weitergeleitet wird! Wenn Sie das gesamte System vollständig anhalten möchten, genügt es, Folgendes einzugeben: docker compose down

Info:

  • Falls Sie während der Einrichtung keine Meldungen erhalten, prüfen Sie, ob Port 9001 geöffnet ist. Falls ja, beenden Sie alle Vorgänge und starten Sie sie neu.
    netstat -na | grep 9001
    sudo kill -9 $(sudo lsof -t -i:9001)
Diese Website verwendet den Browser-Cache für den Offline-Modus und verarbeitet Ihre Daten aus dem Kontaktformular gemäß unserer Datenschutzerklärung.