20 #include <boost/asio.hpp>
25 #include <unordered_map>
50 std::unique_ptr<interface::TcpAcceptorInterface>
acceptor_;
62 std::unordered_map<size_t, std::shared_ptr<TcpServerSession>>
sessions_;
72 uses_global_ioc_(true),
75 max_clients_(cfg.max_connections > 0 ? static_cast<size_t>(cfg.max_connections) : 0),
78 acceptor_ = std::make_unique<BoostTcpAcceptor>(ioc_);
79 }
catch (
const std::exception& e) {
80 throw std::runtime_error(
"Failed to create TCP acceptor: " + std::string(e.what()));
87 uses_global_ioc_(false),
89 acceptor_(std::move(acceptor)),
91 max_clients_(cfg.max_connections > 0 ? static_cast<size_t>(cfg.max_connections) : 0),
94 throw std::runtime_error(
"Failed to create TCP acceptor");
104 if (ioc_thread_.joinable()) {
105 if (std::this_thread::get_id() != ioc_thread_.get_id()) {
108 ioc_thread_.detach();
121 std::lock_guard<std::mutex> lock(sessions_mutex_);
133 boost::system::error_code ec;
135 auto address = net::ip::make_address(cfg_.
bind_address, ec);
143 if (!acceptor_->is_open()) {
144 acceptor_->open(address.is_v6() ? tcp::v6() : tcp::v4(), ec);
146 UNILINK_LOG_ERROR(
"tcp_server",
"open",
"Failed to open acceptor: " + ec.message());
153 acceptor_->bind(tcp::endpoint(address, cfg_.
port), ec);
156 auto timer = std::make_shared<net::steady_timer>(ioc_);
158 timer->async_wait([
self, retry_count, timer](
const boost::system::error_code& timer_ec) {
160 auto impl =
self->get_impl();
161 if (!impl->stopping_.load()) {
162 impl->attempt_port_binding(self, retry_count + 1);
176 acceptor_->listen(boost::asio::socket_base::max_listen_connections, ec);
191 if (
stopping_.load() || !acceptor_ || !acceptor_->is_open())
return;
193 acceptor_->async_accept([
self](
auto ec, tcp::socket sock) {
194 auto impl =
self->get_impl();
195 if (impl->stopping_.load()) {
199 if (ec != boost::asio::error::operation_aborted) {
201 impl->notify_state();
204 auto timer = std::make_shared<net::steady_timer>(impl->ioc_);
205 timer->expires_after(std::chrono::milliseconds(100));
206 timer->async_wait([self, timer](const boost::system::error_code&) {
207 auto impl = self->get_impl();
208 if (!impl->stopping_.load()) {
209 impl->do_accept(self);
216 boost::system::error_code ep_ec;
217 auto rep = sock.remote_endpoint(ep_ec);
218 std::string client_info =
"unknown";
220 client_info = rep.address().to_string() +
":" +
std::to_string(rep.port());
223 if (impl->client_limit_enabled_) {
224 std::lock_guard<std::mutex> lock(impl->sessions_mutex_);
225 if (impl->sessions_.size() >= impl->max_clients_) {
226 boost::system::error_code close_ec;
227 sock.close(close_ec);
228 impl->paused_accept_ =
true;
233 auto new_session = std::make_shared<TcpServerSession>(
234 impl->ioc_, std::move(sock), impl->cfg_.backpressure_threshold, impl->cfg_.idle_timeout_ms);
238 std::lock_guard<std::mutex> lock(impl->sessions_mutex_);
239 client_id = impl->next_client_id_.fetch_add(1);
240 impl->sessions_.emplace(client_id, new_session);
241 impl->current_session_ = new_session;
244 std::weak_ptr<TcpServer> weak_self =
self;
247 auto self = weak_self.lock();
249 auto impl =
self->get_impl();
254 std::lock_guard<std::mutex> lock(impl->sessions_mutex_);
255 cb = impl->on_bytes_;
256 multi_cb = impl->on_multi_data_;
261 multi_cb(client_id, str_data);
265 if (impl->on_bp_) new_session->on_backpressure(impl->on_bp_);
267 new_session->on_close([weak_self, client_id, new_session] {
268 auto self = weak_self.lock();
270 auto impl =
self->get_impl();
271 if (impl->stopping_.load())
return;
275 std::lock_guard<std::mutex> lock(impl->sessions_mutex_);
276 disconnect_cb = impl->on_multi_disconnect_;
278 if (disconnect_cb) disconnect_cb(client_id);
280 bool was_current =
false;
282 std::lock_guard<std::mutex> lock(impl->sessions_mutex_);
283 impl->sessions_.erase(client_id);
284 if (impl->paused_accept_ && (!impl->client_limit_enabled_ || impl->sessions_.size() < impl->max_clients_)) {
285 impl->paused_accept_ =
false;
286 net::post(impl->ioc_, [
self] { self->get_impl()->do_accept(self); });
288 was_current = (impl->current_session_ == new_session);
290 if (!impl->sessions_.empty())
291 impl->current_session_ = impl->sessions_.begin()->second;
293 impl->current_session_.reset();
298 impl->notify_state();
304 std::lock_guard<std::mutex> lock(impl->sessions_mutex_);
305 connect_cb = impl->on_multi_connect_;
307 if (connect_cb) connect_cb(client_id, client_info);
310 impl->notify_state();
311 new_session->start();
312 impl->do_accept(
self);
318 boost::system::error_code ec;
319 if (acceptor_ && acceptor_->is_open()) {
320 acceptor_->close(ec);
323 std::vector<std::shared_ptr<TcpServerSession>> sessions_copy;
325 std::lock_guard<std::mutex> lock(sessions_mutex_);
326 sessions_copy.reserve(sessions_.size());
327 for (
auto& kv : sessions_) {
328 sessions_copy.push_back(kv.second);
331 current_session_.reset();
334 for (
auto& session : sessions_copy) {
346 void stop(std::shared_ptr<TcpServer>
self) {
347 if (stopping_.exchange(
true)) {
352 std::lock_guard<std::mutex> lock(sessions_mutex_);
356 on_multi_connect_ =
nullptr;
357 on_multi_data_ =
nullptr;
358 on_multi_disconnect_ =
nullptr;
361 if (ioc_.get_executor().running_in_this_thread()) {
363 if (owns_ioc_) ioc_.stop();
369 if (has_active_ioc &&
self) {
370 auto cleanup_promise = std::make_shared<std::promise<void>>();
371 auto cleanup_future = cleanup_promise->get_future();
373 std::weak_ptr<TcpServer> weak_self =
self;
374 net::dispatch(ioc_, [weak_self, cleanup_promise]() {
375 if (
auto shared_self = weak_self.lock()) {
376 shared_self->get_impl()->perform_cleanup();
378 cleanup_promise->set_value();
381 if (cleanup_future.wait_for(std::chrono::seconds(2)) == std::future_status::timeout) {
388 if (owns_ioc_ && ioc_thread_.joinable()) {
396 return std::shared_ptr<TcpServer>(
new TcpServer(cfg));
400 std::unique_ptr<interface::TcpAcceptorInterface> acceptor,
401 net::io_context& ioc) {
402 return std::shared_ptr<TcpServer>(
new TcpServer(cfg, std::move(acceptor), ioc));
407 TcpServer::TcpServer(
const config::TcpServerConfig& cfg, std::unique_ptr<interface::TcpAcceptorInterface> acceptor,
408 net::io_context& ioc)
409 : impl_(std::make_unique<Impl>(cfg, std::move(acceptor), ioc)) {}
414 impl_->stop(
nullptr);
422 auto impl = get_impl();
423 auto current = impl->state_.get_state();
428 impl->stopping_.store(
false);
430 if (impl->uses_global_ioc_) {
432 if (!manager.is_running()) {
437 if (!impl->acceptor_) {
439 impl->notify_state();
443 if (impl->owns_ioc_) {
444 impl->ioc_thread_ = std::thread([impl] { impl->ioc_.run(); });
446 auto self = shared_from_this();
447 if (impl->ioc_.get_executor().running_in_this_thread()) {
448 if (!impl->stopping_.load()) {
449 impl->attempt_port_binding(
self, 0);
452 net::dispatch(impl->ioc_, [
self] {
453 auto impl = self->get_impl();
454 if (impl->stopping_.load()) return;
455 impl->attempt_port_binding(self, 0);
463 auto impl = get_impl();
464 if (impl->stopping_.load())
return;
465 auto self = shared_from_this();
466 net::post(impl->ioc_, [
self] { self->stop(); });
470 auto impl = get_impl();
471 std::lock_guard<std::mutex> lock(impl->sessions_mutex_);
472 return impl->current_session_ && impl->current_session_->alive();
476 auto impl = get_impl();
477 if (impl->stopping_.load())
return;
478 std::shared_ptr<TcpServerSession> session;
480 std::lock_guard<std::mutex> lock(impl->sessions_mutex_);
481 session = impl->current_session_;
484 if (session && session->alive()) {
485 session->async_write_copy(data);
490 auto impl = get_impl();
491 if (impl->stopping_.load())
return;
492 std::shared_ptr<TcpServerSession> session;
494 std::lock_guard<std::mutex> lock(impl->sessions_mutex_);
495 session = impl->current_session_;
498 if (session && session->alive()) {
499 session->async_write_move(std::move(data));
504 auto impl = get_impl();
505 if (impl->stopping_.load() || !data)
return;
506 std::shared_ptr<TcpServerSession> session;
508 std::lock_guard<std::mutex> lock(impl->sessions_mutex_);
509 session = impl->current_session_;
512 if (session && session->alive()) {
513 session->async_write_shared(std::move(data));
518 std::lock_guard<std::mutex> lock(impl_->sessions_mutex_);
519 impl_->on_bytes_ = std::move(cb);
522 std::lock_guard<std::mutex> lock(impl_->sessions_mutex_);
523 impl_->on_state_ = std::move(cb);
526 auto impl = get_impl();
528 std::lock_guard<std::mutex> lock(impl->sessions_mutex_);
529 impl->on_bp_ = std::move(cb);
531 std::shared_ptr<TcpServerSession> session;
533 std::lock_guard<std::mutex> lock(impl->sessions_mutex_);
534 session = impl->current_session_;
537 if (session) session->on_backpressure(impl->on_bp_);
541 auto impl = get_impl();
542 auto shared_data = std::make_shared<const std::vector<uint8_t>>(message.begin(), message.end());
543 std::lock_guard<std::mutex> lock(impl->sessions_mutex_);
545 for (
auto& entry : impl->sessions_) {
546 auto& session = entry.second;
547 if (session && session->alive()) {
548 session->async_write_shared(shared_data);
556 auto impl = get_impl();
557 std::lock_guard<std::mutex> lock(impl->sessions_mutex_);
558 auto it = impl->sessions_.find(client_id);
559 if (it != impl->sessions_.end() && it->second && it->second->alive()) {
568 auto impl = get_impl();
569 std::lock_guard<std::mutex> lock(impl->sessions_mutex_);
571 for (
const auto& entry : impl->sessions_)
572 if (entry.second && entry.second->alive()) ++alive;
577 auto impl = get_impl();
578 std::lock_guard<std::mutex> lock(impl->sessions_mutex_);
579 std::vector<size_t> connected_clients;
580 connected_clients.reserve(impl->sessions_.size());
581 for (
const auto& entry : impl->sessions_)
582 if (entry.second && entry.second->alive()) connected_clients.push_back(entry.first);
583 return connected_clients;
587 std::lock_guard<std::mutex> l(impl_->sessions_mutex_);
588 impl_->on_multi_connect_ = std::move(h);
591 std::lock_guard<std::mutex> l(impl_->sessions_mutex_);
592 impl_->on_multi_data_ = std::move(h);
595 std::lock_guard<std::mutex> l(impl_->sessions_mutex_);
596 impl_->on_multi_disconnect_ = std::move(h);
600 auto impl = get_impl();
601 std::lock_guard<std::mutex> l(impl->sessions_mutex_);
602 impl->max_clients_ = max;
603 impl->client_limit_enabled_ =
true;
604 if (impl->paused_accept_ && impl->sessions_.size() < impl->max_clients_) {
605 impl->paused_accept_ =
false;
606 net::post(impl->ioc_, [
self = shared_from_this()] {
self->get_impl()->do_accept(
self); });
611 auto impl = get_impl();
612 std::lock_guard<std::mutex> l(impl->sessions_mutex_);
613 impl->client_limit_enabled_ =
false;
614 impl->max_clients_ = 0;
615 if (impl->paused_accept_) {
616 impl->paused_accept_ =
false;
617 net::post(impl->ioc_, [
self = shared_from_this()] {
self->get_impl()->do_accept(
self); });
static IoContextManager & instance()
void set_state(const State &new_state)
std::function< void(memory::ConstByteSpan)> OnBytes
std::function< void(base::LinkState)> OnState
std::function< void(size_t)> OnBackpressure
A C++17 compatible span-like class for safe array access.
constexpr size_type size() const noexcept
constexpr pointer data() const noexcept
Thread-safe TCP Server implementation.
void set_unlimited_clients()
TcpServer(TcpServer &&) noexcept
void on_bytes(OnBytes cb) override
void async_write_copy(memory::ConstByteSpan data) override
std::function< void(size_t client_id, const std::string &data)> MultiClientDataHandler
void on_backpressure(OnBackpressure cb) override
std::vector< size_t > get_connected_clients() const
void on_multi_disconnect(MultiClientDisconnectHandler handler)
void on_multi_connect(MultiClientConnectHandler handler)
base::LinkState get_state() const
bool broadcast(const std::string &message)
void on_multi_data(MultiClientDataHandler handler)
size_t get_client_count() const
std::function< void(size_t client_id)> MultiClientDisconnectHandler
bool send_to_client(size_t client_id, const std::string &message)
void async_write_shared(std::shared_ptr< const std::vector< uint8_t >> data) override
void on_state(OnState cb) override
std::function< void(size_t client_id, const std::string &client_info)> MultiClientConnectHandler
void async_write_move(std::vector< uint8_t > &&data) override
void set_client_limit(size_t max_clients)
bool is_connected() const override
#define UNILINK_LOG_ERROR(component, operation, message)
std::string uint8_to_string(const uint8_t *data, size_t size)
Safely convert uint8_t* to const char* for string operations.
std::pair< const uint8_t *, size_t > string_to_bytes(std::string_view str)
Safely obtain a view of std::string as byte array without allocation.
std::string to_string(ErrorCode code)
Convert ErrorCode to human-readable string.
int port_retry_interval_ms
MultiClientConnectHandler on_multi_connect_
std::atomic< size_t > next_client_id_
MultiClientDataHandler on_multi_data_
std::unique_ptr< net::io_context > owned_ioc_
std::shared_ptr< TcpServerSession > current_session_
std::unique_ptr< interface::TcpAcceptorInterface > acceptor_
std::mutex sessions_mutex_
std::atomic< bool > stopping_
bool client_limit_enabled_
void stop(std::shared_ptr< TcpServer > self)
Impl(const config::TcpServerConfig &cfg)
Impl(const config::TcpServerConfig &cfg, std::unique_ptr< interface::TcpAcceptorInterface > acceptor, net::io_context &ioc)
std::unordered_map< size_t, std::shared_ptr< TcpServerSession > > sessions_
concurrency::ThreadSafeLinkState state_
void attempt_port_binding(std::shared_ptr< TcpServer > self, int retry_count)
void do_accept(std::shared_ptr< TcpServer > self)
MultiClientDisconnectHandler on_multi_disconnect_
config::TcpServerConfig cfg_
bool client_limit_enabled_