unilink  0.4.3
A simple C++ library for unified async communication
tcp_client.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2025 Jinwoo Sung
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <boost/asio/executor_work_guard.hpp>
20 #include <boost/asio/io_context.hpp>
21 #include <chrono>
22 #include <mutex>
23 #include <optional>
24 #include <stdexcept>
25 #include <thread>
26 #include <vector>
27 
28 #include "unilink/base/common.hpp"
33 
34 namespace unilink {
35 namespace wrapper {
36 
38  mutable std::mutex mutex_;
39  std::string host_;
40  uint16_t port_;
41  std::shared_ptr<interface::Channel> channel_;
42  std::shared_ptr<boost::asio::io_context> external_ioc_;
43  bool use_external_context_{false};
45  std::thread external_thread_;
46  std::unique_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_guard_;
47 
48  std::vector<std::promise<bool>> pending_promises_;
49  std::atomic<bool> started_{false};
50  std::shared_ptr<bool> alive_marker_{std::make_shared<bool>(true)};
51 
56 
57  bool auto_manage_ = false;
58  std::chrono::milliseconds retry_interval_{3000};
59  int max_retries_ = -1;
60  std::chrono::milliseconds connection_timeout_{5000};
61 
62  Impl(const std::string& host, uint16_t port) : host_(host), port_(port), started_(false) {}
63 
64  Impl(const std::string& host, uint16_t port, std::shared_ptr<boost::asio::io_context> external_ioc)
65  : host_(host),
66  port_(port),
67  external_ioc_(std::move(external_ioc)),
70  started_(false) {}
71 
72  explicit Impl(std::shared_ptr<interface::Channel> channel)
73  : host_(""), port_(0), channel_(std::move(channel)), started_(false) {
75  }
76 
77  ~Impl() {
78  try {
79  stop();
80  } catch (...) {
81  }
82  }
83 
84  void fulfill_all(bool value) {
85  std::lock_guard<std::mutex> lock(mutex_);
86  for (auto& p : pending_promises_) {
87  try {
88  p.set_value(value);
89  } catch (...) {
90  }
91  }
92  pending_promises_.clear();
93  }
94 
95  std::future<bool> start() {
96  std::lock_guard<std::mutex> lock(mutex_);
97  if (channel_ && channel_->is_connected()) {
98  std::promise<bool> p;
99  p.set_value(true);
100  return p.get_future();
101  }
102  std::promise<bool> p;
103  auto f = p.get_future();
104  pending_promises_.push_back(std::move(p));
105  if (started_) return f;
106 
107  if (!channel_) {
109  config.host = host_;
110  config.port = port_;
111  config.retry_interval_ms = static_cast<unsigned int>(retry_interval_.count());
112  config.max_retries = max_retries_;
113  config.connection_timeout_ms = static_cast<unsigned>(connection_timeout_.count());
116  }
117  started_ = true;
118  channel_->start();
120  work_guard_ = std::make_unique<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>>(
121  external_ioc_->get_executor());
122  external_thread_ = std::thread([this, ioc = external_ioc_]() {
123  try {
124  while (started_.load() && !ioc->stopped()) {
125  if (ioc->run_one_for(std::chrono::milliseconds(50)) == 0) {
126  std::this_thread::yield();
127  }
128  }
129  } catch (...) {
130  }
131  });
132  }
133  return f;
134  }
135 
136  void stop() {
137  bool should_join = false;
138  {
139  std::unique_lock<std::mutex> lock(mutex_);
140  if (!started_.load()) {
141  for (auto& p : pending_promises_) {
142  try {
143  p.set_value(false);
144  } catch (...) {
145  }
146  }
147  pending_promises_.clear();
148  return;
149  }
150  started_ = false;
151  if (channel_) {
152  channel_->on_bytes(nullptr);
153  channel_->on_state(nullptr);
154  channel_->stop();
155  }
156  if (use_external_context_ && manage_external_context_) {
157  if (work_guard_) work_guard_.reset();
158  if (external_ioc_) external_ioc_->stop();
159  should_join = true;
160  }
161  for (auto& p : pending_promises_) {
162  try {
163  p.set_value(false);
164  } catch (...) {
165  }
166  }
167  pending_promises_.clear();
168  }
169  if (should_join && external_thread_.joinable()) {
170  try {
171  external_thread_.join();
172  } catch (...) {
173  }
174  }
175  std::lock_guard<std::mutex> lock(mutex_);
176  channel_.reset();
177  }
178 
179  void send(std::string_view data) {
180  if (channel_ && channel_->is_connected()) {
181  auto binary_view = common::safe_convert::string_to_bytes(data);
182  channel_->async_write_copy(memory::ConstByteSpan(binary_view.first, binary_view.second));
183  }
184  }
185 
186  bool is_connected() const { return channel_ && channel_->is_connected(); }
187 
189  if (!channel_) return;
190 
191  std::weak_ptr<bool> weak_alive = alive_marker_;
192 
193  // Explicitly do not use try-catch here to allow exceptions from handlers
194  // to propagate to transport layer for error handling (e.g., auto-reconnect)
195  channel_->on_bytes([this, weak_alive](memory::ConstByteSpan data) {
196  if (weak_alive.expired()) return;
197  if (data_handler_) {
198  std::string str_data = common::safe_convert::uint8_to_string(data.data(), data.size());
199  data_handler_(MessageContext(0, str_data));
200  }
201  });
202 
203  channel_->on_state([this, weak_alive](base::LinkState state) {
204  if (weak_alive.expired()) return;
205  if (state == base::LinkState::Connected) {
206  fulfill_all(true);
207  if (connect_handler_) connect_handler_(ConnectionContext(0));
208  } else if (state == base::LinkState::Closed || state == base::LinkState::Error) {
209  fulfill_all(false);
210  if (state == base::LinkState::Closed && disconnect_handler_) {
211  disconnect_handler_(ConnectionContext(0));
212  } else if (state == base::LinkState::Error && error_handler_) {
213  bool handled = false;
214  if (auto transport = std::dynamic_pointer_cast<transport::TcpClient>(channel_)) {
215  if (auto info = transport->last_error_info()) {
216  error_handler_(diagnostics::to_error_context(*info));
217  handled = true;
218  }
219  }
220  if (!handled) {
221  error_handler_(ErrorContext(ErrorCode::IoError, "Connection state error"));
222  }
223  }
224  }
225  });
226  }
227 };
228 
229 TcpClient::TcpClient(const std::string& h, uint16_t p) : impl_(std::make_unique<Impl>(h, p)) {}
230 TcpClient::TcpClient(const std::string& h, uint16_t p, std::shared_ptr<boost::asio::io_context> ioc)
231  : impl_(std::make_unique<Impl>(h, p, ioc)) {}
232 TcpClient::TcpClient(std::shared_ptr<interface::Channel> ch) : impl_(std::make_unique<Impl>(ch)) {}
233 TcpClient::~TcpClient() = default;
234 
235 TcpClient::TcpClient(TcpClient&&) noexcept = default;
236 TcpClient& TcpClient::operator=(TcpClient&&) noexcept = default;
237 
238 std::future<bool> TcpClient::start() { return impl_->start(); }
239 void TcpClient::stop() { impl_->stop(); }
240 void TcpClient::send(std::string_view data) { impl_->send(data); }
241 void TcpClient::send_line(std::string_view line) { impl_->send(std::string(line) + "\n"); }
242 bool TcpClient::is_connected() const { return get_impl()->is_connected(); }
243 
245  impl_->data_handler_ = std::move(h);
246  return *this;
247 }
249  impl_->connect_handler_ = std::move(h);
250  return *this;
251 }
253  impl_->disconnect_handler_ = std::move(h);
254  return *this;
255 }
257  impl_->error_handler_ = std::move(h);
258  return *this;
259 }
260 
262  impl_->auto_manage_ = m;
263  if (impl_->auto_manage_ && !impl_->started_.load()) start();
264  return *this;
265 }
266 
267 TcpClient& TcpClient::set_retry_interval(std::chrono::milliseconds i) {
268  impl_->retry_interval_ = i;
269  if (impl_->channel_) {
270  auto transport_client = std::dynamic_pointer_cast<transport::TcpClient>(impl_->channel_);
271  if (transport_client) transport_client->set_retry_interval(static_cast<unsigned int>(i.count()));
272  }
273  return *this;
274 }
275 
277  impl_->max_retries_ = m;
278  return *this;
279 }
280 TcpClient& TcpClient::set_connection_timeout(std::chrono::milliseconds t) {
281  impl_->connection_timeout_ = t;
282  return *this;
283 }
285  impl_->manage_external_context_ = m;
286  return *this;
287 }
288 
289 } // namespace wrapper
290 } // namespace unilink