以下由ds生成:
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <chrono>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <cstring>
// Context 结构,用于管理连接
struct Context {
int fd; // 文件描述符
std::chrono::steady_clock::time_point timestamp; // 时间戳
bool authenticated; // 是否已认证
};
// 线程池类
class ThreadPool {
public:
ThreadPool(size_t numThreads) : stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
template<class F>
void enqueue(F&& f) {
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.emplace(std::forward<F>(f));
}
condition.notify_one();
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers) {
worker.join();
}
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop;
};
// 服务器类
class Server {
public:
Server(int port, size_t poolSize) : pool(poolSize), epollFd(epoll_create1(0)) {
if (epollFd == -1) {
throw std::runtime_error("Failed to create epoll");
}
// 创建监听 socket
listenFd = socket(AF_INET, SOCK_STREAM, 0);
if (listenFd == -1) {
throw std::runtime_error("Failed to create socket");
}
// 设置非阻塞
fcntl(listenFd, F_SETFL, O_NONBLOCK);
// 绑定端口
sockaddr_in serverAddr{};
serverAddr.sin_family = AF_INET;
serverAddr.sin_addr.s_addr = INADDR_ANY;
serverAddr.sin_port = htons(port);
if (bind(listenFd, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) {
throw std::runtime_error("Failed to bind socket");
}
// 监听
if (listen(listenFd, SOMAXCONN)) {
throw std::runtime_error("Failed to listen on socket");
}
// 将监听 socket 加入 epoll
epoll_event event{};
event.events = EPOLLIN;
event.data.fd = listenFd;
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, listenFd, &event)) {
throw std::runtime_error("Failed to add listenFd to epoll");
}
}
void run() {
while (true) {
// 使用 select 监听 listenFd
fd_set readFds;
FD_ZERO(&readFds);
FD_SET(listenFd, &readFds);
timeval timeout{};
timeout.tv_sec = 1; // 1 秒超时
int ret = select(listenFd + 1, &readFds, nullptr, nullptr, &timeout);
if (ret > 0 && FD_ISSET(listenFd, &readFds)) {
// 有新连接
Context* ctx = getContext();
if (ctx) {
sockaddr_in clientAddr{};
socklen_t clientAddrLen = sizeof(clientAddr);
ctx->fd = accept(listenFd, (struct sockaddr*)&clientAddr, &clientAddrLen);
if (ctx->fd == -1) {
returnContext(ctx);
continue;
}
// 设置非阻塞
fcntl(ctx->fd, F_SETFL, O_NONBLOCK);
// 记录时间戳
ctx->timestamp = std::chrono::steady_clock::now();
ctx->authenticated = false;
// 将新连接加入 epoll
epoll_event event{};
event.events = EPOLLIN | EPOLLET;
event.data.ptr = ctx;
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, ctx->fd, &event)) {
close(ctx->fd);
returnContext(ctx);
continue;
}
// 将 context 投入线程池
pool.enqueue([this, ctx] { handleConnection(ctx); });
} else {
// context 耗尽,清理超时连接
cleanupTimeoutContexts();
}
}
}
}
private:
Context* getContext() {
std::unique_lock<std::mutex> lock(contextMutex);
if (!freeContexts.empty()) {
Context* ctx = freeContexts.front();
freeContexts.pop();
return ctx;
}
return nullptr;
}
void returnContext(Context* ctx) {
std::unique_lock<std::mutex> lock(contextMutex);
freeContexts.push(ctx);
}
void cleanupTimeoutContexts() {
std::unique_lock<std::mutex> lock(contextMutex);
auto now = std::chrono::steady_clock::now();
for (auto it = activeContexts.begin(); it != activeContexts.end();) {
if (now - (*it)->timestamp > std::chrono::seconds(5)) {
close((*it)->fd);
freeContexts.push(*it);
it = activeContexts.erase(it);
} else {
++it;
}
}
}
void handleConnection(Context* ctx) {
// 模拟认证过程
char buffer[1024];
ssize_t len = recv(ctx->fd, buffer, sizeof(buffer), 0);
if (len > 0 && std::string(buffer, len) == "password") {
ctx->authenticated = true;
// 处理业务逻辑
while (true) {
len = recv(ctx->fd, buffer, sizeof(buffer), 0);
if (len <= 0) break;
// 处理数据
}
}
// 关闭连接
close(ctx->fd);
returnContext(ctx);
}
int listenFd;
int epollFd;
ThreadPool pool;
std::queue<Context*> freeContexts;
std::vector<Context*> activeContexts;
std::mutex contextMutex;
};
int main() {
Server server(8080, 4); // 端口 8080,线程池大小 4
server.run();
return 0;
}
【 在 ylh1969 的大作中提到: 】
: 我的服务器方案,由主线程创建线程池,context等等,线程池由epoll调度。
: 主线程守护监听socket。由select监听。收到事件取一个context,然后用这个context 进行accept,并记录时间戳,,把这个context投入epoll,交给线程池处理。
: 在DDOS攻击期间,context可能被迅速耗尽,这时主线程取不出来context,就进入一个循环,不断进行timeout检测,检测发生timeout的contex,收拾出来垃圾再继续处理。
: ...................
--
FROM 14.127.25.*
使用asio 代码清爽多了,linux裸代码 一堆fd操作 看着就头大。
#include <iostream>
#include <memory>
#include <unordered_set>
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
using namespace boost::asio;
using namespace boost::system;
using namespace boost::posix_time;
class ConnectionManager; // 前向声明
// 连接类,管理单个 socket 的生命周期和超时
class Connection : public std::enable_shared_from_this<Connection> {
public:
Connection(io_context& io, ConnectionManager& manager)
: socket_(io),
timer_(io),
manager_(manager),
authenticated_(false),
timeout_(seconds(5)) {} // 默认认证超时 5 秒
ip::tcp::socket& socket() { return socket_; }
// 启动连接(开始接收数据并设置超时)
void start() {
start_read();
start_timeout();
}
// 关闭连接
void close() {
socket_.close();
timer_.cancel();
}
private:
// 开始异步读取数据
void start_read() {
auto self = shared_from_this();
async_read_until(
socket_,
buffer_,
'\n',
[this, self](const error_code& ec, size_t bytes) {
if (ec) {
if (ec == error::operation_aborted) return; // 超时触发
handle_error(ec);
return;
}
// 处理接收到的数据
std::istream is(&buffer_);
std::string message;
std::getline(is, message);
if (!authenticated_) {
handle_authentication(message);
} else {
handle_data(message);
}
// 继续读取下一轮数据
start_read();
}
);
}
// 处理认证
void handle_authentication(const std::string& message) {
if (message == "password\n") {
authenticated_ = true;
std::cout << "Connection authenticated." << std::endl;
timeout_ = seconds(10); // 认证后数据超时 10 秒
start_timeout(); // 重置超时定时器
} else {
std::cerr << "Authentication failed. Closing connection." << std::endl;
close();
}
}
// 处理业务数据
void handle_data(const std::string& message) {
std::cout << "Received data: " << message << std::endl;
start_timeout(); // 每次收到数据后重置超时
}
// 启动超时定时器
void start_timeout() {
timer_.expires_from_now(timeout_);
auto self = shared_from_this();
timer_.async_wait([this, self](const error_code& ec) {
if (!ec) {
std::cerr << "Connection timed out. Closing." << std::endl;
close();
}
});
}
// 处理错误
void handle_error(const error_code& ec) {
std::cerr << "Connection error: " << ec.message() << std::endl;
manager_.remove(shared_from_this());
}
ip::tcp::socket socket_;
deadline_timer timer_;
ConnectionManager& manager_;
streambuf buffer_;
bool authenticated_;
seconds timeout_;
};
// 连接管理类(全局管理所有连接)
class ConnectionManager {
public:
void add(std::shared_ptr<Connection> conn) {
std::lock_guard<std::mutex> lock(mutex_);
connections_.insert(conn);
}
void remove(std::shared_ptr<Connection> conn) {
std::lock_guard<std::mutex> lock(mutex_);
connections_.erase(conn);
}
// 清理所有超时连接(DDOS 防护)
void cleanup() {
std::lock_guard<std::mutex> lock(mutex_);
auto it = connections_.begin();
while (it != connections_.end()) {
if ((*it)->socket().is_open()) {
++it;
} else {
it = connections_.erase(it);
}
}
}
size_t size() const {
return connections_.size();
}
private:
std::unordered_set<std::shared_ptr<Connection>> connections_;
mutable std::mutex mutex_;
};
// 服务器类
class Server {
public:
Server(io_context& io, unsigned short port, size_t max_connections = 100)
: acceptor_(io, ip::tcp::endpoint(ip::tcp::v4(), port)),
connection_manager_(),
max_connections_(max_connections),
cleanup_timer_(io) {
start_accept();
start_cleanup();
}
private:
// 启动异步接受新连接
void start_accept() {
auto new_conn = std::make_shared<Connection>(acceptor_.get_executor().context(), connection_manager_);
acceptor_.async_accept(
new_conn->socket(),
[this, new_conn](const error_code& ec) {
if (!ec) {
if (connection_manager_.size() < max_connections_) {
connection_manager_.add(new_conn);
new_conn->start();
} else {
std::cerr << "Connection limit reached. Delaying accept." << std::endl;
// DDOS 防护:延迟接受新连接
start_cleanup();
}
start_accept();
} else {
std::cerr << "Accept error: " << ec.message() << std::endl;
}
}
);
}
// 定时清理无效连接(DDOS 防护)
void start_cleanup() {
cleanup_timer_.expires_from_now(seconds(5));
cleanup_timer_.async_wait([this](const error_code& ec) {
if (!ec) {
connection_manager_.cleanup();
start_cleanup();
}
});
}
ip::tcp::acceptor acceptor_;
ConnectionManager connection_manager_;
size_t max_connections_;
deadline_timer cleanup_timer_;
};
int main() {
try {
io_context io;
Server server(io, 8080, 100); // 监听 8080 端口,最大连接数 100
// 启动线程池(4 个线程)
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i) {
threads.emplace_back([&io] { io.run(); });
}
// 等待所有线程结束
for (auto& t : threads) {
t.join();
}
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
return 0;
}
【 在 Algoquant 的大作中提到: 】
: ds 深度思考:
: 将上述的几个关于高性能服务器的几个对话整合,并改为使用 boost asio 网络库实现 其描述的功能
:
--
FROM 14.127.25.*