unilink  0.4.3
A simple C++ library for unified async communication
tcp_client.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2025 Jinwoo Sung
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #if defined(__GNUC__) || defined(__clang__)
20 #pragma GCC diagnostic ignored "-Wsign-conversion"
21 #endif
22 
23 #include <algorithm>
24 #include <array>
25 #include <atomic>
26 #include <boost/asio.hpp>
27 #include <cstdint>
28 #include <cstring>
29 #include <deque>
30 #include <iostream>
31 #include <memory>
32 #include <optional>
33 #include <string>
34 #include <thread>
35 #include <type_traits>
36 #include <variant>
37 #include <vector>
38 
39 #if defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__)
40 #include <sys/socket.h>
41 #include <sys/types.h>
42 #endif
43 
52 
53 namespace unilink {
54 namespace transport {
55 
56 namespace net = boost::asio;
57 using tcp = net::ip::tcp;
58 
59 using base::LinkState;
62 using interface::Channel;
63 using namespace common; // For error_reporting namespace
64 
66  // Members moved from TcpClient
67  std::unique_ptr<net::io_context> owned_ioc_;
68  net::io_context* ioc_ = nullptr;
69  net::strand<net::io_context::executor_type> strand_;
70  std::unique_ptr<net::executor_work_guard<net::io_context::executor_type>> work_guard_;
71  std::thread ioc_thread_;
72  std::atomic<uint64_t> lifecycle_seq_{0};
73  std::atomic<uint64_t> stop_seq_{0};
74  std::atomic<uint64_t> current_seq_{0};
75  tcp::resolver resolver_;
76  tcp::socket socket_;
78  net::steady_timer retry_timer_;
79  net::steady_timer connect_timer_;
80  bool owns_ioc_ = true;
81  std::atomic<bool> stop_requested_{false};
82  std::atomic<bool> stopping_{false};
83  std::atomic<bool> terminal_state_notified_{false};
84  std::atomic<bool> reconnect_pending_{false};
85 
86  std::array<uint8_t, common::constants::DEFAULT_READ_BUFFER_SIZE> rx_{};
87  std::deque<BufferVariant> tx_;
88  std::optional<BufferVariant> current_write_buffer_;
89  bool writing_ = false;
90  size_t queue_bytes_ = 0;
91  size_t bp_high_;
92  size_t bp_low_;
93  size_t bp_limit_;
94  bool backpressure_active_ = false;
95  unsigned first_retry_interval_ms_ = 100;
96 
100  mutable std::mutex callback_mtx_;
101  std::atomic<bool> connected_{false};
102  ThreadSafeLinkState state_{LinkState::Idle};
103  int retry_attempts_ = 0;
104  uint32_t reconnect_attempt_count_{0};
105  std::optional<ReconnectPolicy> reconnect_policy_;
106 
107  mutable std::mutex last_err_mtx_;
108  std::optional<diagnostics::ErrorInfo> last_error_info_;
109 
110  Impl(const TcpClientConfig& cfg, net::io_context* ioc_ptr)
111  : owned_ioc_(ioc_ptr ? nullptr : std::make_unique<net::io_context>()),
112  ioc_(ioc_ptr ? ioc_ptr : owned_ioc_.get()),
113  strand_(net::make_strand(*ioc_)),
114  resolver_(strand_),
115  socket_(strand_),
116  cfg_(cfg),
117  retry_timer_(strand_),
118  connect_timer_(strand_),
119  owns_ioc_(!ioc_ptr),
120  bp_high_(cfg.backpressure_threshold) {
121  init();
122  }
123 
124  void init() {
125  connected_ = false;
126  writing_ = false;
127  queue_bytes_ = 0;
128  cfg_.validate_and_clamp();
129  recalculate_backpressure_bounds();
130  first_retry_interval_ms_ = std::min(first_retry_interval_ms_, cfg_.retry_interval_ms);
131  }
132 
133  void do_resolve_connect(std::shared_ptr<TcpClient> self, uint64_t seq);
134  void schedule_retry(std::shared_ptr<TcpClient> self, uint64_t seq);
135  void start_read(std::shared_ptr<TcpClient> self, uint64_t seq);
136  void do_write(std::shared_ptr<TcpClient> self, uint64_t seq);
137  void handle_close(std::shared_ptr<TcpClient> self, uint64_t seq, const boost::system::error_code& ec = {});
138  void transition_to(LinkState next, const boost::system::error_code& ec = {});
139  void perform_stop_cleanup();
140  void reset_start_state();
141  void join_ioc_thread(bool allow_detach);
142  void close_socket();
143  void recalculate_backpressure_bounds();
144  void report_backpressure(size_t queued_bytes);
145  void notify_state();
146  void reset_io_objects();
147  void record_error(diagnostics::ErrorLevel lvl, diagnostics::ErrorCategory cat, std::string_view operation,
148  const boost::system::error_code& ec, std::string_view msg, bool retryable, uint32_t retry_count);
149 };
150 
151 std::shared_ptr<TcpClient> TcpClient::create(const TcpClientConfig& cfg) {
152  return std::shared_ptr<TcpClient>(new TcpClient(cfg));
153 }
154 
155 std::shared_ptr<TcpClient> TcpClient::create(const TcpClientConfig& cfg, boost::asio::io_context& ioc) {
156  return std::shared_ptr<TcpClient>(new TcpClient(cfg, ioc));
157 }
158 
159 TcpClient::TcpClient(const TcpClientConfig& cfg) : impl_(std::make_unique<Impl>(cfg, nullptr)) {}
160 TcpClient::TcpClient(const TcpClientConfig& cfg, boost::asio::io_context& ioc)
161  : impl_(std::make_unique<Impl>(cfg, &ioc)) {}
162 
164  stop();
165  impl_->join_ioc_thread(true);
166 
167  impl_->on_bytes_ = nullptr;
168  impl_->on_state_ = nullptr;
169  impl_->on_bp_ = nullptr;
170 }
171 
172 TcpClient::TcpClient(TcpClient&&) noexcept = default;
173 TcpClient& TcpClient::operator=(TcpClient&&) noexcept = default;
174 
175 std::optional<diagnostics::ErrorInfo> TcpClient::last_error_info() const {
176  std::lock_guard<std::mutex> lock(impl_->last_err_mtx_);
177  return impl_->last_error_info_;
178 }
179 
181  auto current_state = impl_->state_.get_state();
182  if (current_state == LinkState::Connecting || current_state == LinkState::Connected) {
183  UNILINK_LOG_DEBUG("tcp_client", "start", "Start called while already active, ignoring");
184  return;
185  }
186 
187  if (!impl_->ioc_) {
188  UNILINK_LOG_ERROR("tcp_client", "start", "io_context is null");
189  }
190 
191  impl_->recalculate_backpressure_bounds();
192 
193  if (impl_->ioc_ && impl_->ioc_->stopped()) {
194  UNILINK_LOG_DEBUG("tcp_client", "start", "io_context stopped; restarting before start");
195  impl_->ioc_->restart();
196  }
197 
198  if (impl_->ioc_thread_.joinable()) {
199  impl_->join_ioc_thread(false);
200  }
201 
202  const auto seq = impl_->lifecycle_seq_.fetch_add(1) + 1;
203  impl_->current_seq_.store(seq);
204 
205  if (impl_->owns_ioc_ && impl_->ioc_) {
206  impl_->work_guard_ =
207  std::make_unique<net::executor_work_guard<net::io_context::executor_type>>(impl_->ioc_->get_executor());
208  impl_->ioc_thread_ = std::thread([this]() {
209  try {
210  impl_->ioc_->run();
211  } catch (const std::exception& e) {
212  UNILINK_LOG_ERROR("tcp_client", "io_context", "IO context error: " + std::string(e.what()));
213  diagnostics::error_reporting::report_system_error("tcp_client", "io_context",
214  "Exception in IO context: " + std::string(e.what()));
215  }
216  });
217  }
218 
219  auto weak_self = weak_from_this();
220  if (impl_->ioc_) {
221  net::dispatch(impl_->strand_, [weak_self, seq] {
222  if (auto self = weak_self.lock()) {
223  if (seq <= self->impl_->stop_seq_.load()) {
224  return;
225  }
226  self->impl_->reset_start_state();
227  self->impl_->connected_.store(false);
228  self->impl_->reset_io_objects();
229  self->impl_->transition_to(LinkState::Connecting);
230  self->impl_->do_resolve_connect(self, seq);
231  }
232  });
233  } else {
234  UNILINK_LOG_ERROR("tcp_client", "start", "io_context is null");
235  }
236 }
237 
239  if (impl_->stop_requested_.exchange(true)) {
240  return;
241  }
242 
243  impl_->stopping_.store(true);
244  impl_->stop_seq_.store(impl_->current_seq_.load());
245  auto weak_self = weak_from_this();
246  if (!impl_->ioc_) {
247  return;
248  }
249 
250  if (auto self = weak_self.lock()) {
251  net::post(impl_->strand_, [self]() { self->impl_->perform_stop_cleanup(); });
252  }
253 
254  impl_->join_ioc_thread(false);
255 }
256 
257 bool TcpClient::is_connected() const { return get_impl()->connected_.load(); }
258 
259 void TcpClient::async_write_copy(memory::ConstByteSpan data) {
260  if (impl_->stop_requested_.load() || impl_->state_.is_state(LinkState::Closed) ||
261  impl_->state_.is_state(LinkState::Error) || !impl_->ioc_) {
262  return;
263  }
264 
265  size_t size = data.size();
266  if (size == 0) {
267  UNILINK_LOG_WARNING("tcp_client", "async_write_copy", "Ignoring zero-length write");
268  return;
269  }
270 
272  UNILINK_LOG_ERROR("tcp_client", "async_write_copy",
273  "Write size exceeds maximum allowed (" + std::to_string(size) + " bytes)");
274  return;
275  }
276 
277  if (size <= 65536) {
278  try {
279  memory::PooledBuffer pooled_buffer(size);
280  if (pooled_buffer.valid()) {
281  common::safe_memory::safe_memcpy(pooled_buffer.data(), data.data(), size);
282  const auto added = pooled_buffer.size();
283  net::dispatch(impl_->strand_, [self = shared_from_this(), buf = std::move(pooled_buffer), added]() mutable {
284  if (self->impl_->stop_requested_.load() || self->impl_->state_.is_state(LinkState::Closed) ||
285  self->impl_->state_.is_state(LinkState::Error)) {
286  return;
287  }
288 
289  if (self->impl_->queue_bytes_ + added > self->impl_->bp_limit_) {
290  UNILINK_LOG_ERROR("tcp_client", "async_write_copy",
291  "Queue limit exceeded (" + std::to_string(self->impl_->queue_bytes_ + added) + " bytes)");
292  self->impl_->record_error(diagnostics::ErrorLevel::ERROR, diagnostics::ErrorCategory::COMMUNICATION,
293  "async_write_copy", boost::asio::error::no_buffer_space, "Queue limit exceeded",
294  false, 0);
295  self->impl_->connected_.store(false);
296  self->impl_->close_socket();
297  self->impl_->tx_.clear();
298  self->impl_->queue_bytes_ = 0;
299  self->impl_->writing_ = false;
300  self->impl_->backpressure_active_ = false;
301  self->impl_->transition_to(LinkState::Error);
302  return;
303  }
304 
305  self->impl_->queue_bytes_ += added;
306  self->impl_->tx_.emplace_back(std::move(buf));
307  self->impl_->report_backpressure(self->impl_->queue_bytes_);
308  if (!self->impl_->writing_) self->impl_->do_write(self, self->impl_->current_seq_.load());
309  });
310  return;
311  }
312  } catch (const std::exception& e) {
313  UNILINK_LOG_ERROR("tcp_client", "async_write_copy", "Failed to acquire pooled buffer: " + std::string(e.what()));
314  }
315  }
316 
317  std::vector<uint8_t> fallback(data.begin(), data.end());
318  const auto added = fallback.size();
319 
320  net::dispatch(impl_->strand_, [self = shared_from_this(), buf = std::move(fallback), added]() mutable {
321  if (self->impl_->stop_requested_.load() || self->impl_->state_.is_state(LinkState::Closed) ||
322  self->impl_->state_.is_state(LinkState::Error)) {
323  return;
324  }
325 
326  if (self->impl_->queue_bytes_ + added > self->impl_->bp_limit_) {
327  UNILINK_LOG_ERROR("tcp_client", "async_write_copy",
328  "Queue limit exceeded (" + std::to_string(self->impl_->queue_bytes_ + added) + " bytes)");
329  self->impl_->record_error(diagnostics::ErrorLevel::ERROR, diagnostics::ErrorCategory::COMMUNICATION,
330  "async_write_copy", boost::asio::error::no_buffer_space, "Queue limit exceeded", false,
331  0);
332  self->impl_->connected_.store(false);
333  self->impl_->close_socket();
334  self->impl_->tx_.clear();
335  self->impl_->queue_bytes_ = 0;
336  self->impl_->writing_ = false;
337  self->impl_->backpressure_active_ = false;
338  self->impl_->transition_to(LinkState::Error);
339  return;
340  }
341 
342  self->impl_->queue_bytes_ += added;
343  self->impl_->tx_.emplace_back(std::move(buf));
344  self->impl_->report_backpressure(self->impl_->queue_bytes_);
345  if (!self->impl_->writing_) self->impl_->do_write(self, self->impl_->current_seq_.load());
346  });
347 }
348 
349 void TcpClient::async_write_move(std::vector<uint8_t>&& data) {
350  if (impl_->stop_requested_.load() || impl_->state_.is_state(LinkState::Closed) ||
351  impl_->state_.is_state(LinkState::Error) || !impl_->ioc_) {
352  return;
353  }
354  const auto size = data.size();
355  if (size == 0) {
356  UNILINK_LOG_WARNING("tcp_client", "async_write_move", "Ignoring zero-length write");
357  return;
358  }
360  UNILINK_LOG_ERROR("tcp_client", "async_write_move",
361  "Write size exceeds maximum allowed (" + std::to_string(size) + " bytes)");
362  return;
363  }
364 
365  const auto added = size;
366  net::dispatch(impl_->strand_, [self = shared_from_this(), buf = std::move(data), added]() mutable {
367  if (self->impl_->stop_requested_.load() || self->impl_->state_.is_state(LinkState::Closed) ||
368  self->impl_->state_.is_state(LinkState::Error)) {
369  return;
370  }
371 
372  if (self->impl_->queue_bytes_ + added > self->impl_->bp_limit_) {
373  UNILINK_LOG_ERROR("tcp_client", "async_write_move",
374  "Queue limit exceeded (" + std::to_string(self->impl_->queue_bytes_ + added) + " bytes)");
375  self->impl_->record_error(diagnostics::ErrorLevel::ERROR, diagnostics::ErrorCategory::COMMUNICATION,
376  "async_write_move", boost::asio::error::no_buffer_space, "Queue limit exceeded", false,
377  0);
378  self->impl_->connected_.store(false);
379  self->impl_->close_socket();
380  self->impl_->tx_.clear();
381  self->impl_->queue_bytes_ = 0;
382  self->impl_->writing_ = false;
383  self->impl_->backpressure_active_ = false;
384  self->impl_->transition_to(LinkState::Error);
385  return;
386  }
387 
388  self->impl_->queue_bytes_ += added;
389  self->impl_->tx_.emplace_back(std::move(buf));
390  self->impl_->report_backpressure(self->impl_->queue_bytes_);
391  if (!self->impl_->writing_) self->impl_->do_write(self, self->impl_->current_seq_.load());
392  });
393 }
394 
395 void TcpClient::async_write_shared(std::shared_ptr<const std::vector<uint8_t>> data) {
396  if (impl_->stop_requested_.load() || impl_->state_.is_state(LinkState::Closed) ||
397  impl_->state_.is_state(LinkState::Error) || !impl_->ioc_) {
398  return;
399  }
400  if (!data || data->empty()) {
401  UNILINK_LOG_WARNING("tcp_client", "async_write_shared", "Ignoring empty shared buffer");
402  return;
403  }
404  const auto size = data->size();
406  UNILINK_LOG_ERROR("tcp_client", "async_write_shared",
407  "Write size exceeds maximum allowed (" + std::to_string(size) + " bytes)");
408  return;
409  }
410 
411  const auto added = size;
412  net::dispatch(impl_->strand_, [self = shared_from_this(), buf = std::move(data), added]() mutable {
413  if (self->impl_->stop_requested_.load() || self->impl_->state_.is_state(LinkState::Closed) ||
414  self->impl_->state_.is_state(LinkState::Error)) {
415  return;
416  }
417 
418  if (self->impl_->queue_bytes_ + added > self->impl_->bp_limit_) {
419  UNILINK_LOG_ERROR("tcp_client", "async_write_shared",
420  "Queue limit exceeded (" + std::to_string(self->impl_->queue_bytes_ + added) + " bytes)");
421  self->impl_->record_error(diagnostics::ErrorLevel::ERROR, diagnostics::ErrorCategory::COMMUNICATION,
422  "async_write_shared", boost::asio::error::no_buffer_space, "Queue limit exceeded",
423  false, 0);
424  self->impl_->connected_.store(false);
425  self->impl_->close_socket();
426  self->impl_->tx_.clear();
427  self->impl_->queue_bytes_ = 0;
428  self->impl_->writing_ = false;
429  self->impl_->backpressure_active_ = false;
430  self->impl_->transition_to(LinkState::Error);
431  return;
432  }
433 
434  self->impl_->queue_bytes_ += added;
435  self->impl_->tx_.emplace_back(std::move(buf));
436  self->impl_->report_backpressure(self->impl_->queue_bytes_);
437  if (!self->impl_->writing_) self->impl_->do_write(self, self->impl_->current_seq_.load());
438  });
439 }
440 
441 void TcpClient::on_bytes(OnBytes cb) {
442  std::lock_guard<std::mutex> lock(impl_->callback_mtx_);
443  impl_->on_bytes_ = std::move(cb);
444 }
445 void TcpClient::on_state(OnState cb) {
446  std::lock_guard<std::mutex> lock(impl_->callback_mtx_);
447  impl_->on_state_ = std::move(cb);
448 }
449 void TcpClient::on_backpressure(OnBackpressure cb) {
450  std::lock_guard<std::mutex> lock(impl_->callback_mtx_);
451  impl_->on_bp_ = std::move(cb);
452 }
453 void TcpClient::set_retry_interval(unsigned interval_ms) { impl_->cfg_.retry_interval_ms = interval_ms; }
454 void TcpClient::set_reconnect_policy(ReconnectPolicy policy) {
455  if (policy) {
456  impl_->reconnect_policy_ = std::move(policy);
457  } else {
458  impl_->reconnect_policy_ = std::nullopt;
459  }
460 }
461 
462 // Impl methods implementation
463 
464 void TcpClient::Impl::do_resolve_connect(std::shared_ptr<TcpClient> self, uint64_t seq) {
465  resolver_.async_resolve(
466  cfg_.host, std::to_string(cfg_.port), [self, seq](auto ec, tcp::resolver::results_type results) {
467  if (ec == net::error::operation_aborted || seq != self->impl_->current_seq_.load()) {
468  return;
469  }
470  if (self->impl_->stop_requested_.load() || self->impl_->stopping_.load()) {
471  return;
472  }
473  if (ec) {
474  uint32_t current_attempts = self->impl_->reconnect_policy_
475  ? self->impl_->reconnect_attempt_count_
476  : static_cast<uint32_t>(self->impl_->retry_attempts_);
477  self->impl_->record_error(diagnostics::ErrorLevel::ERROR, diagnostics::ErrorCategory::CONNECTION, "resolve",
478  ec, "Resolution failed: " + ec.message(),
479  diagnostics::is_retryable_tcp_connect_error(ec), current_attempts);
480  self->impl_->schedule_retry(self, seq);
481  return;
482  }
483  self->impl_->connect_timer_.expires_after(std::chrono::milliseconds(self->impl_->cfg_.connection_timeout_ms));
484  self->impl_->connect_timer_.async_wait([self, seq](const boost::system::error_code& timer_ec) {
485  if (timer_ec == net::error::operation_aborted || seq != self->impl_->current_seq_.load()) {
486  return;
487  }
488  if (!timer_ec && !self->impl_->stop_requested_.load() && !self->impl_->stopping_.load()) {
489  UNILINK_LOG_ERROR(
490  "tcp_client", "connect_timeout",
491  "Connection timed out after " + std::to_string(self->impl_->cfg_.connection_timeout_ms) + "ms");
492  uint32_t current_attempts = self->impl_->reconnect_policy_
493  ? self->impl_->reconnect_attempt_count_
494  : static_cast<uint32_t>(self->impl_->retry_attempts_);
495  self->impl_->record_error(diagnostics::ErrorLevel::ERROR, diagnostics::ErrorCategory::CONNECTION, "connect",
496  boost::asio::error::timed_out, "Connection timed out",
497  diagnostics::is_retryable_tcp_connect_error(boost::asio::error::timed_out),
498  current_attempts);
499  self->impl_->handle_close(self, seq, boost::asio::error::timed_out);
500  }
501  });
502 
503  net::async_connect(self->impl_->socket_, results, [self, seq](auto ec2, const auto&) {
504  if (ec2 == net::error::operation_aborted || seq != self->impl_->current_seq_.load()) {
505  return;
506  }
507  if (self->impl_->stop_requested_.load() || self->impl_->stopping_.load()) {
508  self->impl_->close_socket();
509  self->impl_->connect_timer_.cancel();
510  return;
511  }
512  if (ec2) {
513  self->impl_->connect_timer_.cancel();
514  uint32_t current_attempts = self->impl_->reconnect_policy_
515  ? self->impl_->reconnect_attempt_count_
516  : static_cast<uint32_t>(self->impl_->retry_attempts_);
517  self->impl_->record_error(diagnostics::ErrorLevel::ERROR, diagnostics::ErrorCategory::CONNECTION, "connect",
518  ec2, "Connection failed: " + ec2.message(),
519  diagnostics::is_retryable_tcp_connect_error(ec2), current_attempts);
520  self->impl_->schedule_retry(self, seq);
521  return;
522  }
523  self->impl_->connect_timer_.cancel();
524  self->impl_->retry_attempts_ = 0;
525  self->impl_->reconnect_attempt_count_ = 0;
526  self->impl_->connected_.store(true);
527 
528 #if defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__)
529  int yes = 1;
530  (void)::setsockopt(self->impl_->socket_.native_handle(), SOL_SOCKET, SO_NOSIGPIPE, &yes, sizeof(yes));
531 #endif
532 
533  self->impl_->transition_to(LinkState::Connected);
534  boost::system::error_code ep_ec;
535  auto rep = self->impl_->socket_.remote_endpoint(ep_ec);
536  if (!ep_ec) {
537  UNILINK_LOG_INFO("tcp_client", "connect",
538  "Connected to " + rep.address().to_string() + ":" + std::to_string(rep.port()));
539  }
540  self->impl_->start_read(self, seq);
541  self->impl_->do_write(self, seq);
542  });
543  });
544 }
545 
546 void TcpClient::Impl::schedule_retry(std::shared_ptr<TcpClient> self, uint64_t seq) {
547  connected_.store(false);
548  if (stop_requested_.load() || stopping_.load()) {
549  return;
550  }
551 
552  // Prevent double scheduling of reconnect
553  if (reconnect_pending_.exchange(true)) {
554  return;
555  }
556 
557  std::optional<diagnostics::ErrorInfo> last_err;
558  {
559  std::lock_guard<std::mutex> lock(last_err_mtx_);
560  last_err = last_error_info_;
561  }
562 
563  if (!last_err) {
565  "tcp_client", "schedule_retry", "Unknown error",
566  make_error_code(boost::asio::error::not_connected), true);
567  }
568 
569  // Determine current attempt count based on active mode
570  uint32_t current_attempts = reconnect_policy_ ? reconnect_attempt_count_ : static_cast<uint32_t>(retry_attempts_);
571 
572  auto decision = detail::decide_reconnect(cfg_, *last_err, current_attempts, reconnect_policy_);
573 
574  if (!decision.should_retry) {
575  UNILINK_LOG_INFO("tcp_client", "retry", "Reconnect stopped by policy/config");
576  transition_to(LinkState::Error);
577  reconnect_pending_.store(false);
578  return;
579  }
580 
581  // The decider returns a base delay for both policy and fallback paths.
582  std::chrono::milliseconds delay = decision.delay.value_or(std::chrono::milliseconds(cfg_.retry_interval_ms));
583  if (reconnect_policy_) {
584  reconnect_attempt_count_++;
585  } else {
586  // Preserve existing "fast first retry" behavior for non-policy mode.
587  ++retry_attempts_;
588  if (retry_attempts_ == 1) {
589  delay = std::chrono::milliseconds(first_retry_interval_ms_);
590  }
591  }
592 
593  transition_to(LinkState::Connecting);
594 
595  UNILINK_LOG_INFO("tcp_client", "retry",
596  "Scheduling retry in " + std::to_string(static_cast<double>(delay.count()) / 1000.0) + "s");
597 
598  retry_timer_.expires_after(delay);
599  retry_timer_.async_wait([self, seq](const boost::system::error_code& ec) {
600  // Clear pending flag regardless of result (fired or aborted)
601  self->impl_->reconnect_pending_.store(false);
602 
603  if (ec == net::error::operation_aborted || seq != self->impl_->current_seq_.load()) {
604  return;
605  }
606  if (!ec && !self->impl_->stop_requested_.load() && !self->impl_->stopping_.load())
607  self->impl_->do_resolve_connect(self, seq);
608  });
609 }
610 
611 void TcpClient::Impl::start_read(std::shared_ptr<TcpClient> self, uint64_t seq) {
612  socket_.async_read_some(net::buffer(rx_.data(), rx_.size()), [self, seq](auto ec, std::size_t n) {
613  if (ec == net::error::operation_aborted || seq != self->impl_->current_seq_.load()) {
614  return;
615  }
616  if (self->impl_->stop_requested_.load()) {
617  return;
618  }
619  if (ec) {
620  self->impl_->handle_close(self, seq, ec);
621  return;
622  }
623  OnBytes on_bytes;
624  {
625  std::lock_guard<std::mutex> lock(self->impl_->callback_mtx_);
626  on_bytes = self->impl_->on_bytes_;
627  }
628 
629  if (on_bytes) {
630  try {
631  on_bytes(memory::ConstByteSpan(self->impl_->rx_.data(), n));
632  } catch (const std::exception& e) {
633  UNILINK_LOG_ERROR("tcp_client", "on_bytes", "Exception in on_bytes callback: " + std::string(e.what()));
634  self->impl_->record_error(diagnostics::ErrorLevel::ERROR, diagnostics::ErrorCategory::COMMUNICATION, "on_bytes",
635  boost::asio::error::connection_aborted,
636  "Exception in on_bytes: " + std::string(e.what()), false, 0);
637  self->impl_->handle_close(self, seq, make_error_code(boost::asio::error::connection_aborted));
638  return;
639  } catch (...) {
640  UNILINK_LOG_ERROR("tcp_client", "on_bytes", "Unknown exception in on_bytes callback");
641  self->impl_->handle_close(self, seq, make_error_code(boost::asio::error::connection_aborted));
642  return;
643  }
644  }
645  self->impl_->start_read(self, seq);
646  });
647 }
648 
649 void TcpClient::Impl::do_write(std::shared_ptr<TcpClient> self, uint64_t seq) {
650  if (stop_requested_.load()) {
651  tx_.clear();
652  queue_bytes_ = 0;
653  writing_ = false;
654  report_backpressure(queue_bytes_);
655  return;
656  }
657 
658  if (!connected_.load()) {
659  writing_ = false;
660  return;
661  }
662 
663  if (tx_.empty() || state_.is_state(LinkState::Closed) || state_.is_state(LinkState::Error)) {
664  writing_ = false;
665  return;
666  }
667  writing_ = true;
668 
669  current_write_buffer_ = std::move(tx_.front());
670  tx_.pop_front();
671 
672  auto& current = *current_write_buffer_;
673 
674  auto queued_bytes = std::visit(
675  [](auto&& buf) -> size_t {
676  using Buffer = std::decay_t<decltype(buf)>;
677  if constexpr (std::is_same_v<Buffer, std::shared_ptr<const std::vector<uint8_t>>>) {
678  return buf ? buf->size() : 0;
679  } else {
680  return buf.size();
681  }
682  },
683  current);
684 
685  auto on_write = [self, queued_bytes, seq](auto ec, std::size_t) {
686  if (ec == net::error::operation_aborted || seq != self->impl_->current_seq_.load()) {
687  self->impl_->current_write_buffer_.reset();
688  self->impl_->queue_bytes_ =
689  (self->impl_->queue_bytes_ > queued_bytes) ? (self->impl_->queue_bytes_ - queued_bytes) : 0;
690  self->impl_->report_backpressure(self->impl_->queue_bytes_);
691  self->impl_->writing_ = false;
692  return;
693  }
694 
695  if (ec) {
696  if (self->impl_->current_write_buffer_) {
697  self->impl_->tx_.push_front(std::move(*self->impl_->current_write_buffer_));
698  }
699  self->impl_->current_write_buffer_.reset();
700 
701  UNILINK_LOG_ERROR("tcp_client", "do_write", "Write failed: " + ec.message());
702  self->impl_->record_error(diagnostics::ErrorLevel::ERROR, diagnostics::ErrorCategory::COMMUNICATION, "write", ec,
703  "Write failed: " + ec.message(), false, 0);
704  self->impl_->writing_ = false;
705  self->impl_->handle_close(self, seq, ec);
706  return;
707  }
708 
709  self->impl_->current_write_buffer_.reset();
710  self->impl_->queue_bytes_ =
711  (self->impl_->queue_bytes_ > queued_bytes) ? (self->impl_->queue_bytes_ - queued_bytes) : 0;
712  self->impl_->report_backpressure(self->impl_->queue_bytes_);
713 
714  if (self->impl_->stop_requested_.load() || self->impl_->state_.is_state(LinkState::Closed) ||
715  self->impl_->state_.is_state(LinkState::Error)) {
716  self->impl_->writing_ = false;
717  return;
718  }
719 
720  self->impl_->do_write(self, seq);
721  };
722 
723  std::visit(
724  [&](const auto& buf) {
725  using T = std::decay_t<decltype(buf)>;
726  if constexpr (std::is_same_v<T, std::shared_ptr<const std::vector<uint8_t>>>) {
727  net::async_write(socket_, net::buffer(buf->data(), buf->size()), on_write);
728  } else {
729  net::async_write(socket_, net::buffer(buf.data(), buf.size()), on_write);
730  }
731  },
732  current);
733 }
734 
735 void TcpClient::Impl::handle_close(std::shared_ptr<TcpClient> self, uint64_t seq, const boost::system::error_code& ec) {
736  if (ec == net::error::operation_aborted || seq != current_seq_.load()) {
737  return;
738  }
739  UNILINK_LOG_INFO("tcp_client", "handle_close", "Closing connection. Error: " + ec.message());
740  if (ec) {
741  const bool retryable = diagnostics::is_retryable_tcp_connect_error(ec);
742  const uint32_t current_attempts =
743  reconnect_policy_ ? reconnect_attempt_count_ : static_cast<uint32_t>(retry_attempts_);
744 
746  "Connection closed with error: " + ec.message(), retryable, current_attempts);
747  }
748  connected_.store(false);
749  writing_ = false;
750  connect_timer_.cancel();
751  close_socket();
752  if (stop_requested_.load() || stopping_.load() || state_.is_state(LinkState::Closed)) {
753  transition_to(LinkState::Closed, ec);
754  return;
755  }
756  transition_to(LinkState::Connecting, ec);
757  schedule_retry(self, seq);
758 }
759 
760 void TcpClient::Impl::close_socket() {
761  boost::system::error_code ec;
762  socket_.shutdown(tcp::socket::shutdown_both, ec);
763  socket_.close(ec);
764 }
765 
766 void TcpClient::Impl::recalculate_backpressure_bounds() {
767  bp_high_ = cfg_.backpressure_threshold;
768  bp_low_ = bp_high_ > 1 ? bp_high_ / 2 : bp_high_;
769  if (bp_low_ == 0) {
770  bp_low_ = 1;
771  }
772  bp_limit_ = std::min(std::max(bp_high_ * 4, common::constants::DEFAULT_BACKPRESSURE_THRESHOLD),
774  if (bp_limit_ < bp_high_) {
775  bp_limit_ = bp_high_;
776  }
777  backpressure_active_ = false;
778 }
779 
780 void TcpClient::Impl::report_backpressure(size_t queued_bytes) {
781  if (stop_requested_.load() || stopping_.load()) return;
782 
783  OnBackpressure on_bp;
784  {
785  std::lock_guard<std::mutex> lock(callback_mtx_);
786  on_bp = on_bp_;
787  }
788  if (!on_bp) return;
789 
790  if (!backpressure_active_ && queued_bytes >= bp_high_) {
791  backpressure_active_ = true;
792  try {
793  on_bp(queued_bytes);
794  } catch (const std::exception& e) {
795  UNILINK_LOG_ERROR("tcp_client", "on_backpressure",
796  "Exception in backpressure callback: " + std::string(e.what()));
797  } catch (...) {
798  UNILINK_LOG_ERROR("tcp_client", "on_backpressure", "Unknown exception in backpressure callback");
799  }
800  } else if (backpressure_active_ && queued_bytes <= bp_low_) {
801  backpressure_active_ = false;
802  try {
803  on_bp(queued_bytes);
804  } catch (const std::exception& e) {
805  UNILINK_LOG_ERROR("tcp_client", "on_backpressure",
806  "Exception in backpressure callback: " + std::string(e.what()));
807  } catch (...) {
808  UNILINK_LOG_ERROR("tcp_client", "on_backpressure", "Unknown exception in backpressure callback");
809  }
810  }
811 }
812 
813 void TcpClient::Impl::transition_to(LinkState next, const boost::system::error_code& ec) {
814  if (ec == net::error::operation_aborted) {
815  return;
816  }
817 
818  const auto current = state_.get_state();
819  const bool retrying_same_state = (next == LinkState::Connecting && current == LinkState::Connecting);
820  if ((current == LinkState::Closed || current == LinkState::Error) &&
821  (next == LinkState::Closed || next == LinkState::Error)) {
822  return;
823  }
824 
825  if (next == LinkState::Closed || next == LinkState::Error) {
826  if (terminal_state_notified_.exchange(true)) {
827  return;
828  }
829  } else if (current == next && !retrying_same_state) {
830  return;
831  }
832 
833  state_.set_state(next);
834  notify_state();
835 }
836 
837 void TcpClient::Impl::perform_stop_cleanup() {
838  try {
839  retry_timer_.cancel();
840  connect_timer_.cancel();
841  resolver_.cancel();
842  boost::system::error_code ec_cancel;
843  socket_.cancel(ec_cancel);
844  close_socket();
845  tx_.clear();
846  queue_bytes_ = 0;
847  writing_ = false;
848  connected_.store(false);
849  backpressure_active_ = false;
850 
851  if (owns_ioc_ && work_guard_) {
852  work_guard_->reset();
853  }
854  transition_to(LinkState::Closed);
855  } catch (const std::exception& e) {
856  UNILINK_LOG_ERROR("tcp_client", "stop_cleanup", "Cleanup error: " + std::string(e.what()));
857  record_error(diagnostics::ErrorLevel::ERROR, diagnostics::ErrorCategory::SYSTEM, "stop_cleanup", {},
858  "Cleanup error: " + std::string(e.what()), false, 0);
859  diagnostics::error_reporting::report_system_error("tcp_client", "stop_cleanup",
860  "Exception in stop cleanup: " + std::string(e.what()));
861  } catch (...) {
862  UNILINK_LOG_ERROR("tcp_client", "stop_cleanup", "Unknown error in stop cleanup");
863  diagnostics::error_reporting::report_system_error("tcp_client", "stop_cleanup", "Unknown error in stop cleanup");
864  }
865 }
866 
867 void TcpClient::Impl::reset_start_state() {
868  stop_requested_.store(false);
869  stopping_.store(false);
870  terminal_state_notified_.store(false);
871  reconnect_pending_.store(false);
872  retry_attempts_ = 0;
873  reconnect_attempt_count_ = 0;
874  connected_.store(false);
875  writing_ = false;
876  queue_bytes_ = 0;
877  backpressure_active_ = false;
878  state_.set_state(LinkState::Idle);
879 }
880 
881 void TcpClient::Impl::join_ioc_thread(bool allow_detach) {
882  if (!owns_ioc_ || !ioc_thread_.joinable()) {
883  return;
884  }
885 
886  if (std::this_thread::get_id() == ioc_thread_.get_id()) {
887  if (allow_detach) {
888  ioc_thread_.detach();
889  }
890  return;
891  }
892 
893  try {
894  ioc_thread_.join();
895  } catch (const std::exception& e) {
896  UNILINK_LOG_ERROR("tcp_client", "join", "Join failed: " + std::string(e.what()));
897  } catch (...) {
898  UNILINK_LOG_ERROR("tcp_client", "join", "Join failed with unknown error");
899  }
900 }
901 
902 void TcpClient::Impl::notify_state() {
903  if (stop_requested_.load() || stopping_.load()) return;
904 
905  OnState on_state;
906  {
907  std::lock_guard<std::mutex> lock(callback_mtx_);
908  on_state = on_state_;
909  }
910  if (!on_state) return;
911 
912  try {
913  on_state(state_.get_state());
914  } catch (const std::exception& e) {
915  UNILINK_LOG_ERROR("tcp_client", "on_state", "Exception in state callback: " + std::string(e.what()));
916  } catch (...) {
917  UNILINK_LOG_ERROR("tcp_client", "on_state", "Unknown exception in state callback");
918  }
919 }
920 
921 void TcpClient::Impl::record_error(diagnostics::ErrorLevel lvl, diagnostics::ErrorCategory cat,
922  std::string_view operation, const boost::system::error_code& ec,
923  std::string_view msg, bool retryable, uint32_t retry_count) {
924  std::lock_guard<std::mutex> lock(last_err_mtx_);
925  diagnostics::ErrorInfo info(lvl, cat, "tcp_client", operation, msg, ec, retryable);
926  info.retry_count = retry_count;
927  last_error_info_ = info;
928 }
929 
930 void TcpClient::Impl::reset_io_objects() {
931  try {
932  boost::system::error_code ec_cancel;
933  socket_.cancel(ec_cancel);
934  close_socket();
935  socket_ = tcp::socket(strand_);
936  resolver_.cancel();
937  resolver_ = tcp::resolver(strand_);
938  retry_timer_ = net::steady_timer(strand_);
939  connect_timer_ = net::steady_timer(strand_);
940  tx_.clear();
941  queue_bytes_ = 0;
942  writing_ = false;
943  backpressure_active_ = false;
944  } catch (const std::exception& e) {
945  UNILINK_LOG_ERROR("tcp_client", "reset_io_objects", "Reset error: " + std::string(e.what()));
946  record_error(diagnostics::ErrorLevel::ERROR, diagnostics::ErrorCategory::SYSTEM, "reset_io_objects", {},
947  "Reset error: " + std::string(e.what()), false, 0);
948  diagnostics::error_reporting::report_system_error("tcp_client", "reset_io_objects",
949  "Exception while resetting io objects: " + std::string(e.what()));
950  } catch (...) {
951  UNILINK_LOG_ERROR("tcp_client", "reset_io_objects", "Unknown reset error");
952  diagnostics::error_reporting::report_system_error("tcp_client", "reset_io_objects",
953  "Unknown error while resetting io objects");
954  }
955 }
956 
957 } // namespace transport
958 } // namespace unilink
#define UNILINK_LOG_WARNING(component, operation, message)
Definition: logger.hpp:272
#define UNILINK_LOG_INFO(component, operation, message)
Definition: logger.hpp:265
#define UNILINK_LOG_ERROR(component, operation, message)
Definition: logger.hpp:279
#define UNILINK_LOG_DEBUG(component, operation, message)
Convenience macros for logging.
Definition: logger.hpp:258