以下由ds生成:
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <chrono>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <cstring>
// Context 结构,用于管理连接
struct Context {
int fd; // 文件描述符
std::chrono::steady_clock::time_point timestamp; // 时间戳
bool authenticated; // 是否已认证
};
// 线程池类
class ThreadPool {
public:
ThreadPool(size_t numThreads) : stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
template<class F>
void enqueue(F&& f) {
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.emplace(std::forward<F>(f));
}
condition.notify_one();
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers) {
worker.join();
}
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop;
};
// 服务器类
class Server {
public:
Server(int port, size_t poolSize) : pool(poolSize), epollFd(epoll_create1(0)) {
if (epollFd == -1) {
throw std::runtime_error("Failed to create epoll");
}
// 创建监听 socket
listenFd = socket(AF_INET, SOCK_STREAM, 0);
if (listenFd == -1) {
throw std::runtime_error("Failed to create socket");
}
// 设置非阻塞
fcntl(listenFd, F_SETFL, O_NONBLOCK);
// 绑定端口
sockaddr_in serverAddr{};
serverAddr.sin_family = AF_INET;
serverAddr.sin_addr.s_addr = INADDR_ANY;
serverAddr.sin_port = htons(port);
if (bind(listenFd, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) {
throw std::runtime_error("Failed to bind socket");
}
// 监听
if (listen(listenFd, SOMAXCONN)) {
throw std::runtime_error("Failed to listen on socket");
}
// 将监听 socket 加入 epoll
epoll_event event{};
event.events = EPOLLIN;
event.data.fd = listenFd;
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, listenFd, &event)) {
throw std::runtime_error("Failed to add listenFd to epoll");
}
}
void run() {
while (true) {
// 使用 select 监听 listenFd
fd_set readFds;
FD_ZERO(&readFds);
FD_SET(listenFd, &readFds);
timeval timeout{};
timeout.tv_sec = 1; // 1 秒超时
int ret = select(listenFd + 1, &readFds, nullptr, nullptr, &timeout);
if (ret > 0 && FD_ISSET(listenFd, &readFds)) {
// 有新连接
Context* ctx = getContext();
if (ctx) {
sockaddr_in clientAddr{};
socklen_t clientAddrLen = sizeof(clientAddr);
ctx->fd = accept(listenFd, (struct sockaddr*)&clientAddr, &clientAddrLen);
if (ctx->fd == -1) {
returnContext(ctx);
continue;
}
// 设置非阻塞
fcntl(ctx->fd, F_SETFL, O_NONBLOCK);
// 记录时间戳
ctx->timestamp = std::chrono::steady_clock::now();
ctx->authenticated = false;
// 将新连接加入 epoll
epoll_event event{};
event.events = EPOLLIN | EPOLLET;
event.data.ptr = ctx;
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, ctx->fd, &event)) {
close(ctx->fd);
returnContext(ctx);
continue;
}
// 将 context 投入线程池
pool.enqueue([this, ctx] { handleConnection(ctx); });
} else {
// context 耗尽,清理超时连接
cleanupTimeoutContexts();
}
}
}
}
private:
Context* getContext() {
std::unique_lock<std::mutex> lock(contextMutex);
if (!freeContexts.empty()) {
Context* ctx = freeContexts.front();
freeContexts.pop();
return ctx;
}
return nullptr;
}
void returnContext(Context* ctx) {
std::unique_lock<std::mutex> lock(contextMutex);
freeContexts.push(ctx);
}
void cleanupTimeoutContexts() {
std::unique_lock<std::mutex> lock(contextMutex);
auto now = std::chrono::steady_clock::now();
for (auto it = activeContexts.begin(); it != activeContexts.end();) {
if (now - (*it)->timestamp > std::chrono::seconds(5)) {
close((*it)->fd);
freeContexts.push(*it);
it = activeContexts.erase(it);
} else {
++it;
}
}
}
void handleConnection(Context* ctx) {
// 模拟认证过程
char buffer[1024];
ssize_t len = recv(ctx->fd, buffer, sizeof(buffer), 0);
if (len > 0 && std::string(buffer, len) == "password") {
ctx->authenticated = true;
// 处理业务逻辑
while (true) {
len = recv(ctx->fd, buffer, sizeof(buffer), 0);
if (len <= 0) break;
// 处理数据
}
}
// 关闭连接
close(ctx->fd);
returnContext(ctx);
}
int listenFd;
int epollFd;
ThreadPool pool;
std::queue<Context*> freeContexts;
std::vector<Context*> activeContexts;
std::mutex contextMutex;
};
int main() {
Server server(8080, 4); // 端口 8080,线程池大小 4
server.run();
return 0;
}
【 在 ylh1969 的大作中提到: 】
: 我的服务器方案,由主线程创建线程池,context等等,线程池由epoll调度。
: 主线程守护监听socket。由select监听。收到事件取一个context,然后用这个context 进行accept,并记录时间戳,,把这个context投入epoll,交给线程池处理。
: 在DDOS攻击期间,context可能被迅速耗尽,这时主线程取不出来context,就进入一个循环,不断进行timeout检测,检测发生timeout的contex,收拾出来垃圾再继续处理。
: ...................
--
FROM 14.127.25.*