使用asio 代码清爽多了,linux裸代码 一堆fd操作 看着就头大。
#include <iostream>
#include <memory>
#include <unordered_set>
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
using namespace boost::asio;
using namespace boost::system;
using namespace boost::posix_time;
class ConnectionManager; // 前向声明
// 连接类,管理单个 socket 的生命周期和超时
class Connection : public std::enable_shared_from_this<Connection> {
public:
Connection(io_context& io, ConnectionManager& manager)
: socket_(io),
timer_(io),
manager_(manager),
authenticated_(false),
timeout_(seconds(5)) {} // 默认认证超时 5 秒
ip::tcp::socket& socket() { return socket_; }
// 启动连接(开始接收数据并设置超时)
void start() {
start_read();
start_timeout();
}
// 关闭连接
void close() {
socket_.close();
timer_.cancel();
}
private:
// 开始异步读取数据
void start_read() {
auto self = shared_from_this();
async_read_until(
socket_,
buffer_,
'\n',
[this, self](const error_code& ec, size_t bytes) {
if (ec) {
if (ec == error::operation_aborted) return; // 超时触发
handle_error(ec);
return;
}
// 处理接收到的数据
std::istream is(&buffer_);
std::string message;
std::getline(is, message);
if (!authenticated_) {
handle_authentication(message);
} else {
handle_data(message);
}
// 继续读取下一轮数据
start_read();
}
);
}
// 处理认证
void handle_authentication(const std::string& message) {
if (message == "password\n") {
authenticated_ = true;
std::cout << "Connection authenticated." << std::endl;
timeout_ = seconds(10); // 认证后数据超时 10 秒
start_timeout(); // 重置超时定时器
} else {
std::cerr << "Authentication failed. Closing connection." << std::endl;
close();
}
}
// 处理业务数据
void handle_data(const std::string& message) {
std::cout << "Received data: " << message << std::endl;
start_timeout(); // 每次收到数据后重置超时
}
// 启动超时定时器
void start_timeout() {
timer_.expires_from_now(timeout_);
auto self = shared_from_this();
timer_.async_wait([this, self](const error_code& ec) {
if (!ec) {
std::cerr << "Connection timed out. Closing." << std::endl;
close();
}
});
}
// 处理错误
void handle_error(const error_code& ec) {
std::cerr << "Connection error: " << ec.message() << std::endl;
manager_.remove(shared_from_this());
}
ip::tcp::socket socket_;
deadline_timer timer_;
ConnectionManager& manager_;
streambuf buffer_;
bool authenticated_;
seconds timeout_;
};
// 连接管理类(全局管理所有连接)
class ConnectionManager {
public:
void add(std::shared_ptr<Connection> conn) {
std::lock_guard<std::mutex> lock(mutex_);
connections_.insert(conn);
}
void remove(std::shared_ptr<Connection> conn) {
std::lock_guard<std::mutex> lock(mutex_);
connections_.erase(conn);
}
// 清理所有超时连接(DDOS 防护)
void cleanup() {
std::lock_guard<std::mutex> lock(mutex_);
auto it = connections_.begin();
while (it != connections_.end()) {
if ((*it)->socket().is_open()) {
++it;
} else {
it = connections_.erase(it);
}
}
}
size_t size() const {
return connections_.size();
}
private:
std::unordered_set<std::shared_ptr<Connection>> connections_;
mutable std::mutex mutex_;
};
// 服务器类
class Server {
public:
Server(io_context& io, unsigned short port, size_t max_connections = 100)
: acceptor_(io, ip::tcp::endpoint(ip::tcp::v4(), port)),
connection_manager_(),
max_connections_(max_connections),
cleanup_timer_(io) {
start_accept();
start_cleanup();
}
private:
// 启动异步接受新连接
void start_accept() {
auto new_conn = std::make_shared<Connection>(acceptor_.get_executor().context(), connection_manager_);
acceptor_.async_accept(
new_conn->socket(),
[this, new_conn](const error_code& ec) {
if (!ec) {
if (connection_manager_.size() < max_connections_) {
connection_manager_.add(new_conn);
new_conn->start();
} else {
std::cerr << "Connection limit reached. Delaying accept." << std::endl;
// DDOS 防护:延迟接受新连接
start_cleanup();
}
start_accept();
} else {
std::cerr << "Accept error: " << ec.message() << std::endl;
}
}
);
}
// 定时清理无效连接(DDOS 防护)
void start_cleanup() {
cleanup_timer_.expires_from_now(seconds(5));
cleanup_timer_.async_wait([this](const error_code& ec) {
if (!ec) {
connection_manager_.cleanup();
start_cleanup();
}
});
}
ip::tcp::acceptor acceptor_;
ConnectionManager connection_manager_;
size_t max_connections_;
deadline_timer cleanup_timer_;
};
int main() {
try {
io_context io;
Server server(io, 8080, 100); // 监听 8080 端口,最大连接数 100
// 启动线程池(4 个线程)
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i) {
threads.emplace_back([&io] { io.run(); });
}
// 等待所有线程结束
for (auto& t : threads) {
t.join();
}
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
return 0;
}
【 在 Algoquant 的大作中提到: 】
: ds 深度思考:
: 将上述的几个关于高性能服务器的几个对话整合,并改为使用 boost asio 网络库实现 其描述的功能
:
--
FROM 14.127.25.*