Notifications
Notifications
JEE, Kafka und Zookeeper sind großartige Technologien, benötigen aber viele Ressourcen. Unser Ziel ist es, einen Message Broker mit minimalem Ressourcenverbrauch zu entwickeln. Wir gehen von 4 GB RAM, 2 Kerneln und einem monatlichen Budget von 7 Euro aus. Es klingt unmöglich, aber wir versuchen es.
1. Investitionen
Wir kaufen 1 vServer. Als Image-Installationsserver verwenden wir Debian (Debian-1200-*-minimal - Debian Bookworm oder Debian-1300-*-minimal - Trixie).
2. Installation von Abhängigkeiten unter Linux
Zum Kompilieren von C++ benötigen wir: einen Compiler mit C++17-Unterstützung, das cmake-Dienstprogramm, die hiredis-Bibliothek (die Grundlage für Redis in C++) und den vcpkg-Paketmanager zur einfachen Installation von uWebSockets und redis-plus-plus.
# Wir aktualisieren Paketlisten und installieren grundlegende Build-Dienstprogramme.
sudo apt update
sudo apt install -y build-essential cmake git curl zip unzip tar pkg-config libssl-dev
# Installieren und starten Sie Redis (falls noch nicht installiert).
sudo apt install -y redis-server
sudo systemctl start redis-server
3. Installation von C++-Bibliotheken über vcpkg
Ich empfehle, Bibliotheken über vcpkg zu installieren, um das System nicht mit manueller Assemblierung zu überladen.
# Klonen Sie vcpkg in Ihr Home-Verzeichnis
cd ~
git clone https://github.com/microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh
# Installation der erforderlichen Bibliotheken
./vcpkg install uwebsockets redis-plus-plus
4. Erstellung einer Projektstruktur
Erstellen Sie ein Arbeitsverzeichnis für das Projekt und die zuvor besprochenen Dateien.
mkdir -p ~/cpp_push_server
cd ~/cpp_push_server
Erstellen Sie eine Datei CMakeLists.txt
cmake_minimum_required(VERSION 3.15)
project(PushServerCPlusPlus LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# 1. Systemabhängigkeiten
find_package(Threads REQUIRED)
find_package(ZLIB REQUIRED)
# 2. Pfaten finden uWebSockets
find_path(UWEBSOCKETS_INCLUDE_DIR "uwebsockets/App.h" REQUIRED)
# Wir suchen die Low-Level-Bibliothek uSockets, ohne die der Linker Fehler erzeugt.
find_library(USOCKETS_LIBRARY NAMES uSockets REQUIRED)
# 3. Pfaten finden redis++
find_path(REDISPP_INCLUDE_DIR "sw/redis++/redis++.h" REQUIRED)
find_library(REDISPP_LIBRARY NAMES redis++ REQUIRED)
find_library(HIREDIS_LIBRARY NAMES hiredis REQUIRED)
add_executable(push_server main.cpp)
# Ordner mit Headerdateien verbinden
target_include_directories(push_server PRIVATE
${UWEBSOCKETS_INCLUDE_DIR}
${REDISPP_INCLUDE_DIR}
)
# Verknüpfe alle gefundenen Bibliotheken
target_link_libraries(push_server PRIVATE
Threads::Threads
ZLIB::ZLIB
${USOCKETS_LIBRARY}
${REDISPP_LIBRARY}
${HIREDIS_LIBRARY}
)
Datei main.cpp
#include <uwebsockets/App.h>
#include <sw/redis++/redis++.h>
#include <iostream>
#include <string>
#include <string_view>
#include <optional>
#include <thread>
#include <vector>
#include <algorithm>
#include <mutex>
#include <csignal>
#include <atomic>
#include <chrono> // Added for std::chrono
using namespace sw::redis;
struct UserData {
std::string user_id;
};
struct PushMessage {
std::string user_id;
std::string stream_id;
std::string text;
};
using DynamicWebSocket = uWS::WebSocket<false, true, UserData>;
static std::vector<DynamicWebSocket*> active_sockets;
static std::vector<PushMessage> internal_queue;
static std::vector<std::string> auth_signals;
static std::mutex queue_mutex;
static std::atomic<bool> server_running(true);
static uWS::Loop* main_network_loop = nullptr;
static struct us_listen_socket_t* global_listen_socket = nullptr;
std::pair<std::string, std::string> split_by_first_colon(const std::string& raw) {
size_t pos = raw.find(':');
if (pos == std::string::npos) return {raw, ""};
return {raw.substr(0, pos), raw.substr(pos + 1)};
}
void signal_handler(int signum) {
std::cout << "[System] Shutdown signal received: " << signum << std::endl;
server_running = false;
if (main_network_loop) {
main_network_loop->defer([]() {
std::cout << "[Net] Closing active sockets gracefully..." << std::endl;
auto sockets_copy = active_sockets;
for (auto* ws : sockets_copy) {
if (ws) {
ws->end(1001, "Server shutting down");
}
}
active_sockets.clear();
if (global_listen_socket) {
us_listen_socket_close(false, global_listen_socket);
global_listen_socket = nullptr;
}
});
}
}
int main() {
std::signal(SIGINT, signal_handler);
std::signal(SIGTERM, signal_handler);
ConnectionOptions connection_options;
connection_options.host = "127.0.0.1";
connection_options.port = 6379;
ConnectionPoolOptions pool_options;
pool_options.size = 5;
std::optional<Redis> redis;
try {
redis.emplace(connection_options, pool_options);
std::cout << "[Redis] Connected via Pool" << std::endl;
try {
redis->xgroup_create("global_push_stream", "push_group", "$", true);
} catch (...) {}
} catch (const Error &err) {
std::cerr << "[Redis] Error: " << err.what() << std::endl;
return 1;
}
std::thread worker_thread([connection_options, pool_options]() {
try {
Redis r_worker(connection_options, pool_options);
std::cout << "[Thread] Consumer Group Worker started" << std::endl;
while (server_running) {
bool worked = false; // Flag to track if we actually processed any data
std::vector<std::string> local_signals;
{
std::lock_guard<std::mutex> lock(queue_mutex);
if (!auth_signals.empty()) {
local_signals = std::move(auth_signals);
auth_signals.clear();
}
}
if (!local_signals.empty()) {
try {
using Item = std::pair<std::string, std::unordered_map<std::string, std::string>>;
using StreamResult = std::pair<std::string, std::vector<Item>>;
std::vector<StreamResult> pending_result;
r_worker.xreadgroup("push_group", "consumer_cxx",
"global_push_stream", "0",
std::back_inserter(pending_result));
for (const auto& stream_data : pending_result) {
for (const auto& [id, fields] : stream_data.second) {
auto it = fields.find("payload");
if (it != fields.end()) {
auto [target_user, remain] = split_by_first_colon(it->second);
if (std::find(local_signals.begin(), local_signals.end(), target_user) != local_signals.end()) {
std::lock_guard<std::mutex> lock(queue_mutex);
internal_queue.push_back({target_user, id, remain});
std::cout << "[Thread] Triggered recovery from PEL for user " << target_user << " ID: " << id << std::endl;
worked = true;
}
}
}
}
} catch (...) {}
}
using Item = std::pair<std::string, std::unordered_map<std::string, std::string>>;
using StreamResult = std::pair<std::string, std::vector<Item>>;
std::vector<StreamResult> result;
r_worker.xreadgroup("push_group", "consumer_cxx",
"global_push_stream", ">",
500,
std::back_inserter(result));
for (const auto& stream_data : result) {
for (const auto& [id, fields] : stream_data.second) {
auto it = fields.find("payload");
if (it != fields.end()) {
std::cout << "[Thread] New Message Received ID: " << id << std::endl;
auto [target_user, remain] = split_by_first_colon(it->second);
if (!target_user.empty() && !remain.empty()) {
std::lock_guard<std::mutex> lock(queue_mutex);
internal_queue.push_back({target_user, id, remain});
worked = true;
}
}
}
}
// OPTIMIZATION: If no new messages and no auth signals were processed,
// force the thread to sleep for 10ms to completely free the CPU core.
if (!worked && result.empty()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
std::cout << "[Thread] Worker stopped safely" << std::endl;
} catch (const Error &err) {
std::cerr << "[Thread] Error: " << err.what() << std::endl;
}
});
worker_thread.detach();
uWS::App app;
app.ws<UserData>("/*", {
.compression = uWS::DISABLED,
.maxPayloadLength = 16 * 1024,
.idleTimeout = 120,
.open = [](auto *ws) {
std::cout << "[Net] Socket opened. Waiting for AUTH..." << std::endl;
active_sockets.push_back(ws);
},
.message = [&redis](auto *ws, std::string_view message, uWS::OpCode opCode) {
UserData *user_data = ws->getUserData();
if (message.rfind("AUTH:", 0) == 0) {
std::string uid = std::string(message.substr(5));
user_data->user_id = uid;
std::cout << "[Net] Client authenticated as: '" << uid << "'" << std::endl;
std::lock_guard<std::mutex> lock(queue_mutex);
auth_signals.push_back(uid);
return;
}
if (message.rfind("ACK:", 0) == 0) {
std::string ack_stream_id = std::string(message.substr(4));
std::cout << "[Net] Received ACK for Stream ID: " << ack_stream_id << " from " << user_data->user_id << std::endl;
try {
redis->xack("global_push_stream", "push_group", ack_stream_id);
std::cout << "[Redis] Message " << ack_stream_id << " confirmed via XACK" << std::endl;
} catch (const Error &e) {
std::cerr << "[Redis] XACK Failed: " << e.what() << std::endl;
}
}
},
.close = [](auto *ws, int code, std::string_view message) {
UserData *user_data = ws->getUserData();
if (user_data && !user_data->user_id.empty()) {
std::cout << "[Net] User " << user_data->user_id << " disconnected" << std::endl;
}
active_sockets.erase(std::remove(active_sockets.begin(), active_sockets.end(), ws), active_sockets.end());
}
});
app.listen(9001, [](auto *listen_socket) {
if (listen_socket) {
main_network_loop = uWS::Loop::get();
global_listen_socket = listen_socket;
std::cout << "=== PRODUCTION GRADED SERVER STARTED ON PORT 9001 ===" << std::endl;
struct us_loop_t* loop = (struct us_loop_t*) uWS::Loop::get();
struct us_timer_t* timer = us_create_timer(loop, 0, 0);
us_timer_set(timer, [](struct us_timer_t* t) {
if (active_sockets.empty() && internal_queue.empty()) return;
std::vector<PushMessage> local_copy;
{
std::lock_guard<std::mutex> lock(queue_mutex);
if (internal_queue.empty()) return;
local_copy = std::move(internal_queue);
internal_queue.clear();
}
for (const auto& msg : local_copy) {
for (auto* ws : active_sockets) {
UserData* user_data = ws->getUserData();
if (user_data && user_data->user_id == msg.user_id) {
std::cout << "[Net] Routing push to socket: " << msg.stream_id << std::endl;
std::string forward_payload = msg.stream_id + ":" + msg.text;
ws->send(forward_payload, uWS::OpCode::TEXT);
}
}
}
}, 10, 10);
}
});
app.run();
std::cout << "[System] Server event loop stopped cleanly. Exiting..." << std::endl;
return 0;
}
5. Erstellen und Kompilieren eines C++-Servers
Wir erstellen ein Build-Verzeichnis und weisen CMake an, die vcpkg-Toolchain zu verwenden, damit es die Bibliotheken selbst finden kann.
mkdir build
cd build
cmake -DCMAKE_TOOLCHAIN_FILE=$HOME/vcpkg/scripts/buildsystems/vcpkg.cmake ..
cmake --build . --config Release
./push_server
Info:
- Sie sollten in der Konsole Meldungen sehen, die darauf hinweisen, dass Sie erfolgreich eine Verbindung zu Redis hergestellt haben und dass der Server auf Port 9001 gestartet wurde.
6. Python-Testskript-Client
Ein Skript, das einen echten Client (Frontend) simuliert. Es kommuniziert über das ws://-Protokoll mit einem C++-Server.
Es empfiehlt sich, venv zu verwenden.
python3 -m venv .venv
source .venv/bin/activate
Installation erforderlich websockets: pip install websockets
import asyncio
import websockets
async def listen_server():
# Wir verwenden die feste IPv4-Adresse 127.0.0.1 anstelle des Wortes localhost. (FIX IPv4)
url = "ws://127.0.0.1:9001/"
print(f"[Client] Connecting to {url}...")
try:
async with websockets.connect(url) as websocket:
print("[Client] Connection established!")
# Authorisieren Sie sich auf dem C++-Server.
await websocket.send("AUTH:user_555")
print("[Client] Sent AUTH:user_555. Ready to receive notifications...")
async for message in websocket:
print(f"[Client] RECEIVED PAYLOAD: {message}")
if ":" in message:
msg_id = message.split(':', 1)[0]
await websocket.send(f"ACK:{msg_id}")
print(f"[Client] Sent response -> ACK:{msg_id}")
except Exception as e:
print(f"[Client] Error: {e}")
if __name__ == "__main__":
try:
asyncio.run(listen_server())
except KeyboardInterrupt:
print("\nClient stopped.")
7. Wie kann man das jetzt testen?
1: Starten Sie Ihren C++-Server im ersten Terminal.
2: Führen Sie diesen neuen reinen Python-Client in einem zweiten Terminal aus.
3: Öffnen Sie ein drittes Terminal und simulieren Sie das Senden einer Nachricht im Namen Ihres Haupt-Backends über die Standard-Redis-Konsole:
redis-cli XADD global_push_stream MAXLEN 100000 '*' payload "user_555:msg_streaming_999:Vollständiger Produktionstest!"
1: Wir stoppen test.py
2: Wir senden noch ein paar Nachrichten an Redis.
3: Wir starten test.py wieder
Die Nachrichten müssen den client erreichen.
Info:
- Das war's! Wir haben ein funktionierendes Kampfsystem – wir können es einsetzen!
8. Erstelle eine Dockerfile
Diese Datei verwendet das mehrstufige Build-Muster. Die erste Stufe (Builder) lädt die Abhängigkeiten herunter und kompiliert unseren C++-Code, während die zweite Stufe (Minimal) nur die fertige Binärdatei verwendet. Der resultierende Container ist sehr klein, nur etwa 30–40 MB groß.
Erstellen Sie eine Dockerfile im Stammverzeichnis Ihres Projekts.
FROM ubuntu:24.04 AS builder
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y \
build-essential \
cmake \
git \
curl \
zip \
unzip \
tar \
pkg-config \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /opt
RUN git clone https://github.com \
&& cd vcpkg \
&& ./bootstrap-vcpkg.sh
RUN /opt/vcpkg/vcpkg install uwebsockets redis-plus-plus
WORKDIR /app
COPY CMakeLists.txt main.cpp ./
WORKDIR /app/build
RUN cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_TOOLCHAIN_FILE=/opt/vcpkg/scripts/buildsystems/vcpkg.cmake .. \
&& cmake --build . --config Release
FROM ubuntu:24.04 AS minimal
RUN apt-get update && apt-get install -y \
libssl3 \
libstdc++6 \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY --from=builder /app/build/push_server .
EXPOSE 9001
CMD ["./push_server"]
9. Erstelle eine docker-compose.yml-Datei.
Diese Datei verbindet den C++-Server mit Redis. Hinweis: Da sich beide in einem isolierten Docker-Netzwerk befinden, ändert sich die Redis-Adresse im C++-Code von 127.0.0.1 zum Systemnamen redis_db. Docker leitet den Datenverkehr automatisch um.
Erstellen Sie dort eine docker-compose.yml-Datei.
version: '3.8'
services:
redis_db:
image: redis:7.2-alpine
container_name: push_redis
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: always
push_server:
build:
context: .
dockerfile: Dockerfile
container_name: push_cxx_server
ports:
- "9001:9001"
depends_on:
- redis_db
restart: always
volumes:
redis_data:
10. Änderung in main.cpp
Um dem C++-Server den Zugriff auf Redis innerhalb von Docker zu ermöglichen, muss die Verbindungszeichenfolge geändert werden. Ersetzen Sie 127.0.0.1 durch den in der docker-compose.yml-Datei angegebenen Namen des Redis-Datenbankdienstes.
Nehmen Sie diese Änderung an zwei Stellen vor, an denen die Verbindung hergestellt wird: im Hauptpool und innerhalb von r_worker im Hintergrundthread.
// Davor: connection_options.host = "127.0.0.1";
// Danach:
connection_options.host = "redis_db";
11. Anleitung zum Starten des gesamten Systems mit einem einzigen Befehl
Sie müssen Redis nun nicht mehr manuell auf Ihrem System starten und den Code mit CMake kompilieren. Stellen Sie sicher, dass Docker Compose installiert ist (sudo apt install docker-compose-v2), navigieren Sie zum Projektordner und geben Sie Folgendes ein:
# Lokalen Redis-Prozess unter Linux stoppen, damit er nicht extern Port 6379 belegt.
sudo systemctl stop redis-server 2>/dev/null
# Führen Sie den Build aus und starten Sie die gesamte Infrastruktur im Hintergrund.
docker compose up --build -d
12. Wie man überprüft, ob alles funktioniert
Überprüfen Sie den Containerstatus: docker compose ps. Sie werden sehen, dass beide Dienste den Status `Up` haben. Überprüfen Sie die C++-Server-Logs: `docker compose logs push_server`. Dort sollte Folgendes stehen: [Redis] Connected via Pool und === PRODUCTION GRADED SERVER STARTED ON PORT 9001 ===. Führen Sie Ihr Python-Skript test.py auf Ihrem lokalen Rechner aus. Es verbindet sich erfolgreich mit dem Docker-Container unter ws://127.0.0.1:9001/. Senden Sie den Befehl über die lokale redis-cli ... – Docker hat Port 6379 extern weitergeleitet, sodass Ihr Push den Redis-Container erreicht, vom C++-Server empfangen und an Python weitergeleitet wird! Wenn Sie das gesamte System vollständig anhalten möchten, genügt es, Folgendes einzugeben: docker compose down
Info:
- Falls Sie während der Einrichtung keine Meldungen erhalten, prüfen Sie, ob Port 9001 geöffnet ist. Falls ja, beenden Sie alle Vorgänge und starten Sie sie neu.
netstat -na | grep 9001
sudo kill -9 $(sudo lsof -t -i:9001)