19 #if defined(__GNUC__) || defined(__clang__)
20 #pragma GCC diagnostic ignored "-Wsign-conversion"
26 #include <boost/asio.hpp>
35 #include <type_traits>
39 #if defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__)
40 #include <sys/socket.h>
41 #include <sys/types.h>
63 using namespace common;
68 net::io_context* ioc_ =
nullptr;
69 net::strand<net::io_context::executor_type>
strand_;
70 std::unique_ptr<net::executor_work_guard<net::io_context::executor_type>>
work_guard_;
72 std::atomic<uint64_t> lifecycle_seq_{0};
73 std::atomic<uint64_t> stop_seq_{0};
74 std::atomic<uint64_t> current_seq_{0};
80 bool owns_ioc_ =
true;
81 std::atomic<bool> stop_requested_{
false};
82 std::atomic<bool> stopping_{
false};
83 std::atomic<bool> terminal_state_notified_{
false};
84 std::atomic<bool> reconnect_pending_{
false};
86 std::array<uint8_t, common::constants::DEFAULT_READ_BUFFER_SIZE> rx_{};
87 std::deque<BufferVariant>
tx_;
89 bool writing_ =
false;
90 size_t queue_bytes_ = 0;
94 bool backpressure_active_ =
false;
95 unsigned first_retry_interval_ms_ = 100;
101 std::atomic<bool> connected_{
false};
103 int retry_attempts_ = 0;
104 uint32_t reconnect_attempt_count_{0};
111 : owned_ioc_(ioc_ptr ? nullptr : std::make_unique<
net::io_context>()),
112 ioc_(ioc_ptr ? ioc_ptr : owned_ioc_.get()),
113 strand_(
net::make_strand(*ioc_)),
117 retry_timer_(strand_),
118 connect_timer_(strand_),
120 bp_high_(cfg.backpressure_threshold) {
129 recalculate_backpressure_bounds();
130 first_retry_interval_ms_ = std::min(first_retry_interval_ms_, cfg_.
retry_interval_ms);
133 void do_resolve_connect(std::shared_ptr<TcpClient>
self, uint64_t seq);
134 void schedule_retry(std::shared_ptr<TcpClient>
self, uint64_t seq);
135 void start_read(std::shared_ptr<TcpClient>
self, uint64_t seq);
136 void do_write(std::shared_ptr<TcpClient>
self, uint64_t seq);
137 void handle_close(std::shared_ptr<TcpClient>
self, uint64_t seq,
const boost::system::error_code& ec = {});
138 void transition_to(
LinkState next,
const boost::system::error_code& ec = {});
139 void perform_stop_cleanup();
140 void reset_start_state();
141 void join_ioc_thread(
bool allow_detach);
143 void recalculate_backpressure_bounds();
144 void report_backpressure(
size_t queued_bytes);
146 void reset_io_objects();
148 const boost::system::error_code& ec, std::string_view msg,
bool retryable, uint32_t retry_count);
152 return std::shared_ptr<TcpClient>(
new TcpClient(cfg));
156 return std::shared_ptr<TcpClient>(
new TcpClient(cfg, ioc));
161 : impl_(std::make_unique<Impl>(cfg, &ioc)) {}
165 impl_->join_ioc_thread(
true);
167 impl_->on_bytes_ =
nullptr;
168 impl_->on_state_ =
nullptr;
169 impl_->on_bp_ =
nullptr;
175 std::optional<diagnostics::ErrorInfo>
TcpClient::last_error_info()
const {
176 std::lock_guard<std::mutex> lock(impl_->last_err_mtx_);
177 return impl_->last_error_info_;
181 auto current_state = impl_->state_.get_state();
182 if (current_state == LinkState::Connecting || current_state == LinkState::Connected) {
183 UNILINK_LOG_DEBUG(
"tcp_client",
"start",
"Start called while already active, ignoring");
191 impl_->recalculate_backpressure_bounds();
193 if (impl_->ioc_ && impl_->ioc_->stopped()) {
194 UNILINK_LOG_DEBUG(
"tcp_client",
"start",
"io_context stopped; restarting before start");
195 impl_->ioc_->restart();
198 if (impl_->ioc_thread_.joinable()) {
199 impl_->join_ioc_thread(
false);
202 const auto seq = impl_->lifecycle_seq_.fetch_add(1) + 1;
203 impl_->current_seq_.store(seq);
205 if (impl_->owns_ioc_ && impl_->ioc_) {
207 std::make_unique<net::executor_work_guard<net::io_context::executor_type>>(impl_->ioc_->get_executor());
208 impl_->ioc_thread_ = std::thread([
this]() {
211 }
catch (
const std::exception& e) {
212 UNILINK_LOG_ERROR(
"tcp_client",
"io_context",
"IO context error: " + std::string(e.what()));
214 "Exception in IO context: " + std::string(e.what()));
219 auto weak_self = weak_from_this();
221 net::dispatch(impl_->strand_, [weak_self, seq] {
222 if (auto self = weak_self.lock()) {
223 if (seq <= self->impl_->stop_seq_.load()) {
226 self->impl_->reset_start_state();
227 self->impl_->connected_.store(false);
228 self->impl_->reset_io_objects();
229 self->impl_->transition_to(LinkState::Connecting);
230 self->impl_->do_resolve_connect(self, seq);
239 if (impl_->stop_requested_.exchange(
true)) {
243 impl_->stopping_.store(
true);
244 impl_->stop_seq_.store(impl_->current_seq_.load());
245 auto weak_self = weak_from_this();
250 if (
auto self = weak_self.lock()) {
251 net::post(impl_->strand_, [
self]() { self->impl_->perform_stop_cleanup(); });
254 impl_->join_ioc_thread(
false);
260 if (impl_->stop_requested_.load() || impl_->state_.is_state(LinkState::Closed) ||
261 impl_->state_.is_state(LinkState::Error) || !impl_->ioc_) {
265 size_t size = data.
size();
273 "Write size exceeds maximum allowed (" +
std::to_string(size) +
" bytes)");
280 if (pooled_buffer.
valid()) {
282 const auto added = pooled_buffer.
size();
283 net::dispatch(impl_->strand_, [
self = shared_from_this(), buf = std::move(pooled_buffer), added]()
mutable {
284 if (self->impl_->stop_requested_.load() || self->impl_->state_.is_state(LinkState::Closed) ||
285 self->impl_->state_.is_state(LinkState::Error)) {
289 if (self->impl_->queue_bytes_ + added > self->impl_->bp_limit_) {
290 UNILINK_LOG_ERROR(
"tcp_client",
"async_write_copy",
291 "Queue limit exceeded (" + std::to_string(self->impl_->queue_bytes_ + added) +
" bytes)");
292 self->impl_->record_error(diagnostics::ErrorLevel::ERROR, diagnostics::ErrorCategory::COMMUNICATION,
293 "async_write_copy", boost::asio::error::no_buffer_space,
"Queue limit exceeded",
295 self->impl_->connected_.store(false);
296 self->impl_->close_socket();
297 self->impl_->tx_.clear();
298 self->impl_->queue_bytes_ = 0;
299 self->impl_->writing_ = false;
300 self->impl_->backpressure_active_ = false;
301 self->impl_->transition_to(LinkState::Error);
305 self->impl_->queue_bytes_ += added;
306 self->impl_->tx_.emplace_back(std::move(buf));
307 self->impl_->report_backpressure(self->impl_->queue_bytes_);
308 if (!self->impl_->writing_) self->impl_->do_write(
self, self->impl_->current_seq_.load());
312 }
catch (
const std::exception& e) {
313 UNILINK_LOG_ERROR(
"tcp_client",
"async_write_copy",
"Failed to acquire pooled buffer: " + std::string(e.what()));
317 std::vector<uint8_t> fallback(data.
begin(), data.
end());
318 const auto added = fallback.size();
320 net::dispatch(impl_->strand_, [
self = shared_from_this(), buf = std::move(fallback), added]()
mutable {
321 if (self->impl_->stop_requested_.load() || self->impl_->state_.is_state(LinkState::Closed) ||
322 self->impl_->state_.is_state(LinkState::Error)) {
326 if (self->impl_->queue_bytes_ + added > self->impl_->bp_limit_) {
327 UNILINK_LOG_ERROR(
"tcp_client",
"async_write_copy",
328 "Queue limit exceeded (" + std::to_string(self->impl_->queue_bytes_ + added) +
" bytes)");
329 self->impl_->record_error(diagnostics::ErrorLevel::ERROR, diagnostics::ErrorCategory::COMMUNICATION,
330 "async_write_copy", boost::asio::error::no_buffer_space,
"Queue limit exceeded", false,
332 self->impl_->connected_.store(false);
333 self->impl_->close_socket();
334 self->impl_->tx_.clear();
335 self->impl_->queue_bytes_ = 0;
336 self->impl_->writing_ = false;
337 self->impl_->backpressure_active_ = false;
338 self->impl_->transition_to(LinkState::Error);
342 self->impl_->queue_bytes_ += added;
343 self->impl_->tx_.emplace_back(std::move(buf));
344 self->impl_->report_backpressure(self->impl_->queue_bytes_);
345 if (!self->impl_->writing_) self->impl_->do_write(
self, self->impl_->current_seq_.load());
349 void TcpClient::async_write_move(std::vector<uint8_t>&& data) {
350 if (impl_->stop_requested_.load() || impl_->state_.is_state(LinkState::Closed) ||
351 impl_->state_.is_state(LinkState::Error) || !impl_->ioc_) {
354 const auto size = data.size();
361 "Write size exceeds maximum allowed (" +
std::to_string(size) +
" bytes)");
365 const auto added = size;
366 net::dispatch(impl_->strand_, [
self = shared_from_this(), buf = std::move(data), added]()
mutable {
367 if (self->impl_->stop_requested_.load() || self->impl_->state_.is_state(LinkState::Closed) ||
368 self->impl_->state_.is_state(LinkState::Error)) {
372 if (self->impl_->queue_bytes_ + added > self->impl_->bp_limit_) {
373 UNILINK_LOG_ERROR(
"tcp_client",
"async_write_move",
374 "Queue limit exceeded (" + std::to_string(self->impl_->queue_bytes_ + added) +
" bytes)");
375 self->impl_->record_error(diagnostics::ErrorLevel::ERROR, diagnostics::ErrorCategory::COMMUNICATION,
376 "async_write_move", boost::asio::error::no_buffer_space,
"Queue limit exceeded", false,
378 self->impl_->connected_.store(false);
379 self->impl_->close_socket();
380 self->impl_->tx_.clear();
381 self->impl_->queue_bytes_ = 0;
382 self->impl_->writing_ = false;
383 self->impl_->backpressure_active_ = false;
384 self->impl_->transition_to(LinkState::Error);
388 self->impl_->queue_bytes_ += added;
389 self->impl_->tx_.emplace_back(std::move(buf));
390 self->impl_->report_backpressure(self->impl_->queue_bytes_);
391 if (!self->impl_->writing_) self->impl_->do_write(
self, self->impl_->current_seq_.load());
395 void TcpClient::async_write_shared(std::shared_ptr<
const std::vector<uint8_t>> data) {
396 if (impl_->stop_requested_.load() || impl_->state_.is_state(LinkState::Closed) ||
397 impl_->state_.is_state(LinkState::Error) || !impl_->ioc_) {
400 if (!data || data->empty()) {
404 const auto size = data->size();
407 "Write size exceeds maximum allowed (" +
std::to_string(size) +
" bytes)");
411 const auto added = size;
412 net::dispatch(impl_->strand_, [
self = shared_from_this(), buf = std::move(data), added]()
mutable {
413 if (self->impl_->stop_requested_.load() || self->impl_->state_.is_state(LinkState::Closed) ||
414 self->impl_->state_.is_state(LinkState::Error)) {
418 if (self->impl_->queue_bytes_ + added > self->impl_->bp_limit_) {
419 UNILINK_LOG_ERROR(
"tcp_client",
"async_write_shared",
420 "Queue limit exceeded (" + std::to_string(self->impl_->queue_bytes_ + added) +
" bytes)");
421 self->impl_->record_error(diagnostics::ErrorLevel::ERROR, diagnostics::ErrorCategory::COMMUNICATION,
422 "async_write_shared", boost::asio::error::no_buffer_space,
"Queue limit exceeded",
424 self->impl_->connected_.store(false);
425 self->impl_->close_socket();
426 self->impl_->tx_.clear();
427 self->impl_->queue_bytes_ = 0;
428 self->impl_->writing_ = false;
429 self->impl_->backpressure_active_ = false;
430 self->impl_->transition_to(LinkState::Error);
434 self->impl_->queue_bytes_ += added;
435 self->impl_->tx_.emplace_back(std::move(buf));
436 self->impl_->report_backpressure(self->impl_->queue_bytes_);
437 if (!self->impl_->writing_) self->impl_->do_write(
self, self->impl_->current_seq_.load());
442 std::lock_guard<std::mutex> lock(impl_->callback_mtx_);
443 impl_->on_bytes_ = std::move(cb);
446 std::lock_guard<std::mutex> lock(impl_->callback_mtx_);
447 impl_->on_state_ = std::move(cb);
450 std::lock_guard<std::mutex> lock(impl_->callback_mtx_);
451 impl_->on_bp_ = std::move(cb);
456 impl_->reconnect_policy_ = std::move(policy);
458 impl_->reconnect_policy_ = std::nullopt;
464 void TcpClient::Impl::do_resolve_connect(std::shared_ptr<TcpClient>
self, uint64_t seq) {
465 resolver_.async_resolve(
467 if (ec == net::error::operation_aborted || seq != self->impl_->current_seq_.load()) {
470 if (self->impl_->stop_requested_.load() || self->impl_->stopping_.load()) {
474 uint32_t current_attempts =
self->impl_->reconnect_policy_
475 ?
self->impl_->reconnect_attempt_count_
476 :
static_cast<uint32_t
>(
self->impl_->retry_attempts_);
478 ec,
"Resolution failed: " + ec.message(),
480 self->impl_->schedule_retry(
self, seq);
483 self->impl_->connect_timer_.expires_after(std::chrono::milliseconds(self->impl_->cfg_.connection_timeout_ms));
484 self->impl_->connect_timer_.async_wait([
self, seq](
const boost::system::error_code& timer_ec) {
485 if (timer_ec == net::error::operation_aborted || seq != self->impl_->current_seq_.load()) {
488 if (!timer_ec && !self->impl_->stop_requested_.load() && !self->impl_->stopping_.load()) {
490 "tcp_client",
"connect_timeout",
491 "Connection timed out after " + std::to_string(self->impl_->cfg_.connection_timeout_ms) +
"ms");
492 uint32_t current_attempts = self->impl_->reconnect_policy_
493 ? self->impl_->reconnect_attempt_count_
494 : static_cast<uint32_t>(self->impl_->retry_attempts_);
495 self->impl_->record_error(diagnostics::ErrorLevel::ERROR, diagnostics::ErrorCategory::CONNECTION,
"connect",
496 boost::asio::error::timed_out,
"Connection timed out",
497 diagnostics::is_retryable_tcp_connect_error(boost::asio::error::timed_out),
499 self->impl_->handle_close(self, seq, boost::asio::error::timed_out);
503 net::async_connect(self->impl_->socket_, results, [
self, seq](
auto ec2,
const auto&) {
504 if (ec2 == net::error::operation_aborted || seq != self->impl_->current_seq_.load()) {
507 if (self->impl_->stop_requested_.load() || self->impl_->stopping_.load()) {
508 self->impl_->close_socket();
509 self->impl_->connect_timer_.cancel();
513 self->impl_->connect_timer_.cancel();
514 uint32_t current_attempts =
self->impl_->reconnect_policy_
515 ?
self->impl_->reconnect_attempt_count_
516 :
static_cast<uint32_t
>(
self->impl_->retry_attempts_);
518 ec2,
"Connection failed: " + ec2.message(),
520 self->impl_->schedule_retry(
self, seq);
523 self->impl_->connect_timer_.cancel();
524 self->impl_->retry_attempts_ = 0;
525 self->impl_->reconnect_attempt_count_ = 0;
526 self->impl_->connected_.store(
true);
528 #if defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__)
530 (void)::setsockopt(self->impl_->socket_.native_handle(), SOL_SOCKET, SO_NOSIGPIPE, &yes,
sizeof(yes));
533 self->impl_->transition_to(LinkState::Connected);
534 boost::system::error_code ep_ec;
535 auto rep =
self->impl_->socket_.remote_endpoint(ep_ec);
538 "Connected to " + rep.address().to_string() +
":" +
std::to_string(rep.port()));
540 self->impl_->start_read(
self, seq);
541 self->impl_->do_write(
self, seq);
546 void TcpClient::Impl::schedule_retry(std::shared_ptr<TcpClient>
self, uint64_t seq) {
547 connected_.store(
false);
548 if (stop_requested_.load() || stopping_.load()) {
553 if (reconnect_pending_.exchange(
true)) {
557 std::optional<diagnostics::ErrorInfo> last_err;
559 std::lock_guard<std::mutex> lock(last_err_mtx_);
560 last_err = last_error_info_;
565 "tcp_client",
"schedule_retry",
"Unknown error",
566 make_error_code(boost::asio::error::not_connected),
true);
570 uint32_t current_attempts = reconnect_policy_ ? reconnect_attempt_count_ :
static_cast<uint32_t
>(retry_attempts_);
574 if (!decision.should_retry) {
575 UNILINK_LOG_INFO(
"tcp_client",
"retry",
"Reconnect stopped by policy/config");
576 transition_to(LinkState::Error);
577 reconnect_pending_.store(
false);
582 std::chrono::milliseconds delay = decision.delay.value_or(std::chrono::milliseconds(cfg_.retry_interval_ms));
583 if (reconnect_policy_) {
584 reconnect_attempt_count_++;
588 if (retry_attempts_ == 1) {
589 delay = std::chrono::milliseconds(first_retry_interval_ms_);
593 transition_to(LinkState::Connecting);
596 "Scheduling retry in " +
std::to_string(
static_cast<double>(delay.count()) / 1000.0) +
"s");
598 retry_timer_.expires_after(delay);
599 retry_timer_.async_wait([
self, seq](
const boost::system::error_code& ec) {
601 self->impl_->reconnect_pending_.store(
false);
603 if (ec == net::error::operation_aborted || seq != self->impl_->current_seq_.load()) {
606 if (!ec && !self->impl_->stop_requested_.load() && !self->impl_->stopping_.load())
607 self->impl_->do_resolve_connect(
self, seq);
611 void TcpClient::Impl::start_read(std::shared_ptr<TcpClient>
self, uint64_t seq) {
612 socket_.async_read_some(net::buffer(rx_.data(), rx_.size()), [
self, seq](
auto ec, std::size_t n) {
613 if (ec == net::error::operation_aborted || seq != self->impl_->current_seq_.load()) {
616 if (self->impl_->stop_requested_.load()) {
620 self->impl_->handle_close(
self, seq, ec);
625 std::lock_guard<std::mutex> lock(self->impl_->callback_mtx_);
626 on_bytes =
self->impl_->on_bytes_;
632 }
catch (
const std::exception& e) {
633 UNILINK_LOG_ERROR(
"tcp_client",
"on_bytes",
"Exception in on_bytes callback: " + std::string(e.what()));
635 boost::asio::error::connection_aborted,
636 "Exception in on_bytes: " + std::string(e.what()),
false, 0);
637 self->impl_->handle_close(
self, seq, make_error_code(boost::asio::error::connection_aborted));
640 UNILINK_LOG_ERROR(
"tcp_client",
"on_bytes",
"Unknown exception in on_bytes callback");
641 self->impl_->handle_close(
self, seq, make_error_code(boost::asio::error::connection_aborted));
645 self->impl_->start_read(
self, seq);
649 void TcpClient::Impl::do_write(std::shared_ptr<TcpClient>
self, uint64_t seq) {
650 if (stop_requested_.load()) {
654 report_backpressure(queue_bytes_);
658 if (!connected_.load()) {
663 if (tx_.empty() || state_.is_state(LinkState::Closed) || state_.is_state(LinkState::Error)) {
669 current_write_buffer_ = std::move(tx_.front());
672 auto& current = *current_write_buffer_;
674 auto queued_bytes = std::visit(
675 [](
auto&& buf) ->
size_t {
676 using Buffer = std::decay_t<decltype(buf)>;
677 if constexpr (std::is_same_v<Buffer, std::shared_ptr<
const std::vector<uint8_t>>>) {
678 return buf ? buf->size() : 0;
685 auto on_write = [
self, queued_bytes, seq](
auto ec, std::size_t) {
686 if (ec == net::error::operation_aborted || seq != self->impl_->current_seq_.load()) {
687 self->impl_->current_write_buffer_.reset();
688 self->impl_->queue_bytes_ =
689 (
self->impl_->queue_bytes_ > queued_bytes) ? (self->impl_->queue_bytes_ - queued_bytes) : 0;
690 self->impl_->report_backpressure(self->impl_->queue_bytes_);
691 self->impl_->writing_ =
false;
696 if (self->impl_->current_write_buffer_) {
697 self->impl_->tx_.push_front(std::move(*self->impl_->current_write_buffer_));
699 self->impl_->current_write_buffer_.reset();
703 "Write failed: " + ec.message(),
false, 0);
704 self->impl_->writing_ =
false;
705 self->impl_->handle_close(
self, seq, ec);
709 self->impl_->current_write_buffer_.reset();
710 self->impl_->queue_bytes_ =
711 (
self->impl_->queue_bytes_ > queued_bytes) ? (self->impl_->queue_bytes_ - queued_bytes) : 0;
712 self->impl_->report_backpressure(self->impl_->queue_bytes_);
714 if (self->impl_->stop_requested_.load() ||
self->impl_->state_.is_state(LinkState::Closed) ||
715 self->impl_->state_.is_state(LinkState::Error)) {
716 self->impl_->writing_ =
false;
720 self->impl_->do_write(
self, seq);
724 [&](
const auto& buf) {
725 using T = std::decay_t<decltype(buf)>;
726 if constexpr (std::is_same_v<T, std::shared_ptr<
const std::vector<uint8_t>>>) {
727 net::async_write(socket_, net::buffer(buf->data(), buf->size()), on_write);
729 net::async_write(socket_, net::buffer(buf.data(), buf.size()), on_write);
735 void TcpClient::Impl::handle_close(std::shared_ptr<TcpClient>
self, uint64_t seq,
const boost::system::error_code& ec) {
736 if (ec == net::error::operation_aborted || seq != current_seq_.load()) {
739 UNILINK_LOG_INFO(
"tcp_client",
"handle_close",
"Closing connection. Error: " + ec.message());
742 const uint32_t current_attempts =
743 reconnect_policy_ ? reconnect_attempt_count_ :
static_cast<uint32_t
>(retry_attempts_);
746 "Connection closed with error: " + ec.message(), retryable, current_attempts);
748 connected_.store(
false);
750 connect_timer_.cancel();
752 if (stop_requested_.load() || stopping_.load() || state_.is_state(LinkState::Closed)) {
753 transition_to(LinkState::Closed, ec);
756 transition_to(LinkState::Connecting, ec);
757 schedule_retry(
self, seq);
760 void TcpClient::Impl::close_socket() {
761 boost::system::error_code ec;
762 socket_.shutdown(tcp::socket::shutdown_both, ec);
766 void TcpClient::Impl::recalculate_backpressure_bounds() {
767 bp_high_ = cfg_.backpressure_threshold;
768 bp_low_ = bp_high_ > 1 ? bp_high_ / 2 : bp_high_;
774 if (bp_limit_ < bp_high_) {
775 bp_limit_ = bp_high_;
777 backpressure_active_ =
false;
780 void TcpClient::Impl::report_backpressure(
size_t queued_bytes) {
781 if (stop_requested_.load() || stopping_.load())
return;
785 std::lock_guard<std::mutex> lock(callback_mtx_);
790 if (!backpressure_active_ && queued_bytes >= bp_high_) {
791 backpressure_active_ =
true;
794 }
catch (
const std::exception& e) {
796 "Exception in backpressure callback: " + std::string(e.what()));
798 UNILINK_LOG_ERROR(
"tcp_client",
"on_backpressure",
"Unknown exception in backpressure callback");
800 }
else if (backpressure_active_ && queued_bytes <= bp_low_) {
801 backpressure_active_ =
false;
804 }
catch (
const std::exception& e) {
806 "Exception in backpressure callback: " + std::string(e.what()));
808 UNILINK_LOG_ERROR(
"tcp_client",
"on_backpressure",
"Unknown exception in backpressure callback");
813 void TcpClient::Impl::transition_to(
LinkState next,
const boost::system::error_code& ec) {
814 if (ec == net::error::operation_aborted) {
818 const auto current = state_.get_state();
819 const bool retrying_same_state = (next == LinkState::Connecting && current == LinkState::Connecting);
820 if ((current == LinkState::Closed || current == LinkState::Error) &&
821 (next == LinkState::Closed || next == LinkState::Error)) {
825 if (next == LinkState::Closed || next == LinkState::Error) {
826 if (terminal_state_notified_.exchange(
true)) {
829 }
else if (current == next && !retrying_same_state) {
833 state_.set_state(next);
837 void TcpClient::Impl::perform_stop_cleanup() {
839 retry_timer_.cancel();
840 connect_timer_.cancel();
842 boost::system::error_code ec_cancel;
843 socket_.cancel(ec_cancel);
848 connected_.store(
false);
849 backpressure_active_ =
false;
854 transition_to(LinkState::Closed);
855 }
catch (
const std::exception& e) {
856 UNILINK_LOG_ERROR(
"tcp_client",
"stop_cleanup",
"Cleanup error: " + std::string(e.what()));
858 "Cleanup error: " + std::string(e.what()),
false, 0);
860 "Exception in stop cleanup: " + std::string(e.what()));
862 UNILINK_LOG_ERROR(
"tcp_client",
"stop_cleanup",
"Unknown error in stop cleanup");
867 void TcpClient::Impl::reset_start_state() {
868 stop_requested_.store(
false);
869 stopping_.store(
false);
870 terminal_state_notified_.store(
false);
871 reconnect_pending_.store(
false);
873 reconnect_attempt_count_ = 0;
874 connected_.store(
false);
877 backpressure_active_ =
false;
878 state_.set_state(LinkState::Idle);
881 void TcpClient::Impl::join_ioc_thread(
bool allow_detach) {
882 if (!owns_ioc_ || !ioc_thread_.joinable()) {
886 if (std::this_thread::get_id() == ioc_thread_.get_id()) {
888 ioc_thread_.detach();
895 }
catch (
const std::exception& e) {
896 UNILINK_LOG_ERROR(
"tcp_client",
"join",
"Join failed: " + std::string(e.what()));
902 void TcpClient::Impl::notify_state() {
903 if (stop_requested_.load() || stopping_.load())
return;
907 std::lock_guard<std::mutex> lock(callback_mtx_);
908 on_state = on_state_;
910 if (!on_state)
return;
913 on_state(state_.get_state());
914 }
catch (
const std::exception& e) {
915 UNILINK_LOG_ERROR(
"tcp_client",
"on_state",
"Exception in state callback: " + std::string(e.what()));
917 UNILINK_LOG_ERROR(
"tcp_client",
"on_state",
"Unknown exception in state callback");
922 std::string_view operation,
const boost::system::error_code& ec,
923 std::string_view msg,
bool retryable, uint32_t retry_count) {
924 std::lock_guard<std::mutex> lock(last_err_mtx_);
927 last_error_info_ = info;
930 void TcpClient::Impl::reset_io_objects() {
932 boost::system::error_code ec_cancel;
933 socket_.cancel(ec_cancel);
935 socket_ = tcp::socket(strand_);
937 resolver_ = tcp::resolver(strand_);
938 retry_timer_ = net::steady_timer(strand_);
939 connect_timer_ = net::steady_timer(strand_);
943 backpressure_active_ =
false;
944 }
catch (
const std::exception& e) {
945 UNILINK_LOG_ERROR(
"tcp_client",
"reset_io_objects",
"Reset error: " + std::string(e.what()));
947 "Reset error: " + std::string(e.what()),
false, 0);
949 "Exception while resetting io objects: " + std::string(e.what()));
953 "Unknown error while resetting io objects");
std::function< void(memory::ConstByteSpan)> OnBytes
std::function< void(base::LinkState)> OnState
std::function< void(size_t)> OnBackpressure
RAII wrapper for memory pool buffers with enhanced safety.
A C++17 compatible span-like class for safe array access.
constexpr size_type size() const noexcept
constexpr pointer data() const noexcept
constexpr iterator begin() const noexcept
constexpr iterator end() const noexcept
static std::shared_ptr< TcpClient > create(const TcpClientConfig &cfg)
TcpClient(TcpClient &&) noexcept
TcpClient & set_retry_interval(std::chrono::milliseconds interval)
bool is_connected() const override
#define UNILINK_LOG_WARNING(component, operation, message)
#define UNILINK_LOG_INFO(component, operation, message)
#define UNILINK_LOG_ERROR(component, operation, message)
#define UNILINK_LOG_DEBUG(component, operation, message)
Convenience macros for logging.
constexpr size_t MAX_BUFFER_SIZE
constexpr size_t DEFAULT_BACKPRESSURE_THRESHOLD
void safe_memcpy(uint8_t *dest, const uint8_t *src, size_t size)
Safely copy memory with bounds checking.
ThreadSafeState< base::LinkState > ThreadSafeLinkState
void report_system_error(std::string_view component, std::string_view operation, std::string_view message, const boost::system::error_code &ec)
Report system-level error.
ErrorCategory
Error categories for classification.
ErrorLevel
Error severity levels.
bool is_retryable_tcp_connect_error(const boost::system::error_code &ec)
Determines if a TCP connection error is retryable.
ReconnectLogicDecision decide_reconnect(const config::TcpClientConfig &cfg, const diagnostics::ErrorInfo &error_info, uint32_t attempt_count, const std::optional< ReconnectPolicy > &policy)
Determines whether a reconnection attempt should be made and the base delay to use.
std::function< ReconnectDecision(const diagnostics::ErrorInfo &, uint32_t)> ReconnectPolicy
Function type for determining reconnection policy.
wrapper::TcpClient TcpClient
std::string to_string(ErrorCode code)
Convert ErrorCode to human-readable string.
void validate_and_clamp()
unsigned retry_interval_ms
Comprehensive error information structure.
std::deque< BufferVariant > tx_
net::steady_timer connect_timer_
std::optional< ReconnectPolicy > reconnect_policy_
Impl(const TcpClientConfig &cfg, net::io_context *ioc_ptr)
net::steady_timer retry_timer_
std::optional< diagnostics::ErrorInfo > last_error_info_
net::strand< net::io_context::executor_type > strand_
std::optional< BufferVariant > current_write_buffer_
std::unique_ptr< net::executor_work_guard< net::io_context::executor_type > > work_guard_
std::unique_ptr< net::io_context > owned_ioc_
std::unique_ptr< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_guard_