unilink  0.4.3
A simple C++ library for unified async communication
tcp_server.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2025 Jinwoo Sung
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <atomic>
20 #include <boost/asio/executor_work_guard.hpp>
21 #include <boost/asio/io_context.hpp>
22 #include <chrono>
23 #include <mutex>
24 #include <stdexcept>
25 #include <thread>
26 #include <vector>
27 
28 #include "unilink/base/common.hpp"
32 
33 namespace unilink {
34 namespace wrapper {
35 
37  mutable std::mutex mutex_;
38  uint16_t port_;
39  std::shared_ptr<interface::Channel> channel_;
40  std::shared_ptr<boost::asio::io_context> external_ioc_;
41  bool use_external_context_{false};
43  std::thread external_thread_;
44  std::unique_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_guard_;
45 
46  std::vector<std::promise<bool>> pending_promises_;
47  bool started_{false};
48  std::atomic<bool> is_listening_{false};
49 
50  // Configuration
51  bool auto_manage_{false};
52  bool port_retry_enabled_{false};
56  bool client_limit_enabled_{false};
57  size_t max_clients_{0};
58  bool notify_send_failure_{false};
59 
64 
65  explicit Impl(uint16_t port)
66  : port_(port),
67  started_(false),
68  is_listening_(false),
69  auto_manage_(false),
70  port_retry_enabled_(false),
74  client_limit_enabled_(false),
75  max_clients_(0),
76  notify_send_failure_(false) {}
77 
78  Impl(uint16_t port, std::shared_ptr<boost::asio::io_context> external_ioc)
79  : port_(port),
80  external_ioc_(std::move(external_ioc)),
83  started_(false),
84  is_listening_(false),
85  auto_manage_(false),
86  port_retry_enabled_(false),
90  client_limit_enabled_(false),
91  max_clients_(0),
92  notify_send_failure_(false) {}
93 
94  explicit Impl(std::shared_ptr<interface::Channel> channel)
95  : port_(0),
96  channel_(std::move(channel)),
97  started_(false),
98  is_listening_(false),
99  auto_manage_(false),
100  port_retry_enabled_(false),
103  idle_timeout_ms_(0),
104  client_limit_enabled_(false),
105  max_clients_(0),
106  notify_send_failure_(false) {
108  }
109 
110  ~Impl() {
111  try {
112  stop();
113  } catch (...) {
114  }
115  }
116 
117  void fulfill_all(bool value) {
118  std::lock_guard<std::mutex> lock(mutex_);
119  for (auto& p : pending_promises_) {
120  try {
121  p.set_value(value);
122  } catch (...) {
123  }
124  }
125  pending_promises_.clear();
126  }
127 
128  std::future<bool> start() {
129  std::lock_guard<std::mutex> lock(mutex_);
130  if (is_listening_) {
131  std::promise<bool> p;
132  p.set_value(true);
133  return p.get_future();
134  }
135  std::promise<bool> p;
136  auto f = p.get_future();
137  pending_promises_.push_back(std::move(p));
138  if (started_) return f;
139 
140  if (!channel_) {
142  config.port = port_;
147 
150 
151  if (client_limit_enabled_) {
152  auto transport_server = std::dynamic_pointer_cast<transport::TcpServer>(channel_);
153  if (transport_server) {
154  if (max_clients_ == 0)
155  transport_server->set_unlimited_clients();
156  else
157  transport_server->set_client_limit(max_clients_);
158  }
159  }
160  }
161  channel_->start();
163  work_guard_ = std::make_unique<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>>(
164  external_ioc_->get_executor());
165  external_thread_ = std::thread([ioc = external_ioc_]() {
166  try {
167  ioc->run();
168  } catch (...) {
169  }
170  });
171  }
172  started_ = true;
173  return f;
174  }
175 
176  void stop() {
177  bool should_join = false;
178  {
179  std::unique_lock<std::mutex> lock(mutex_);
180  if (!started_) {
181  for (auto& p : pending_promises_) {
182  try {
183  p.set_value(false);
184  } catch (...) {
185  }
186  }
187  pending_promises_.clear();
188  return;
189  }
190  if (channel_) {
191  channel_->on_bytes(nullptr);
192  channel_->on_state(nullptr);
193  auto transport_server = std::dynamic_pointer_cast<transport::TcpServer>(channel_);
194  if (transport_server) transport_server->request_stop();
195  channel_->stop();
196  }
198  if (work_guard_) work_guard_.reset();
199  if (external_ioc_) external_ioc_->stop();
200  should_join = true;
201  }
202  for (auto& p : pending_promises_) {
203  try {
204  p.set_value(false);
205  } catch (...) {
206  }
207  }
208  pending_promises_.clear();
209  started_ = false;
210  is_listening_ = false;
211  }
212  if (should_join && external_thread_.joinable()) {
213  try {
214  external_thread_.join();
215  } catch (...) {
216  }
217  }
218  std::lock_guard<std::mutex> lock(mutex_);
219  channel_.reset();
220  }
221 
223  if (!channel_) return;
224  auto transport_server = std::dynamic_pointer_cast<transport::TcpServer>(channel_);
225  if (transport_server) {
226  transport_server->on_multi_connect([this](size_t id, const std::string& info) {
228  });
229  transport_server->on_multi_data([this](size_t id, const std::string& data) {
230  if (on_data_) on_data_(MessageContext(id, data));
231  });
232  transport_server->on_multi_disconnect([this](size_t id) {
234  });
235  }
236  channel_->on_state([this](base::LinkState state) {
237  if (state == base::LinkState::Listening) {
238  is_listening_ = true;
239  fulfill_all(true);
240  } else if (state == base::LinkState::Error || state == base::LinkState::Closed) {
241  is_listening_ = false;
242  fulfill_all(false);
243  if (state == base::LinkState::Error && on_error_) {
244  on_error_(ErrorContext(ErrorCode::IoError, "Server disconnected"));
245  }
246  }
247  });
248  }
249 };
250 
251 TcpServer::TcpServer(uint16_t port) : impl_(std::make_unique<Impl>(port)) {}
252 TcpServer::TcpServer(uint16_t port, std::shared_ptr<boost::asio::io_context> ioc)
253  : impl_(std::make_unique<Impl>(port, ioc)) {}
254 TcpServer::TcpServer(std::shared_ptr<interface::Channel> ch) : impl_(std::make_unique<Impl>(ch)) {}
255 TcpServer::~TcpServer() = default;
256 
257 TcpServer::TcpServer(TcpServer&&) noexcept = default;
258 TcpServer& TcpServer::operator=(TcpServer&&) noexcept = default;
259 
260 std::future<bool> TcpServer::start() { return impl_->start(); }
261 void TcpServer::stop() { impl_->stop(); }
262 bool TcpServer::is_listening() const { return get_impl()->is_listening_.load(); }
263 
264 bool TcpServer::broadcast(std::string_view data) {
265  if (impl_->channel_) {
266  auto transport_server = std::dynamic_pointer_cast<transport::TcpServer>(impl_->channel_);
267  if (transport_server) return transport_server->broadcast(std::string(data));
268  }
269  return false;
270 }
271 
272 bool TcpServer::send_to(size_t client_id, std::string_view data) {
273  if (impl_->channel_) {
274  auto transport_server = std::dynamic_pointer_cast<transport::TcpServer>(impl_->channel_);
275  if (transport_server) return transport_server->send_to_client(client_id, std::string(data));
276  }
277  return false;
278 }
279 
281  impl_->on_client_connect_ = std::move(h);
282  return *this;
283 }
285  impl_->on_client_disconnect_ = std::move(h);
286  return *this;
287 }
289  impl_->on_data_ = std::move(h);
290  return *this;
291 }
293  impl_->on_error_ = std::move(h);
294  return *this;
295 }
296 
298  if (!get_impl()->channel_) return 0;
299  auto transport_server = std::dynamic_pointer_cast<transport::TcpServer>(get_impl()->channel_);
300  return transport_server ? transport_server->get_client_count() : 0;
301 }
302 
303 std::vector<size_t> TcpServer::get_connected_clients() const {
304  if (!get_impl()->channel_) return std::vector<size_t>();
305  auto transport_server = std::dynamic_pointer_cast<transport::TcpServer>(get_impl()->channel_);
306  return transport_server ? transport_server->get_connected_clients() : std::vector<size_t>();
307 }
308 
310  impl_->auto_manage_ = m;
311  if (impl_->auto_manage_ && !impl_->started_) start();
312  return *this;
313 }
314 
315 TcpServer& TcpServer::enable_port_retry(bool e, int m, int i) {
316  impl_->port_retry_enabled_ = e;
317  impl_->max_port_retries_ = m;
318  impl_->port_retry_interval_ms_ = i;
319  return *this;
320 }
321 
323  impl_->idle_timeout_ms_ = ms;
324  // Note: Runtime change of idle_timeout is not supported by current transport.
325  // Must be set via Builder before start().
326  return *this;
327 }
328 
330  impl_->max_clients_ = max;
331  impl_->client_limit_enabled_ = true;
332  if (impl_->channel_) {
333  auto transport_server = std::dynamic_pointer_cast<transport::TcpServer>(impl_->channel_);
334  if (transport_server) transport_server->set_client_limit(max);
335  }
336  return *this;
337 }
338 
340  impl_->client_limit_enabled_ = false;
341  if (impl_->channel_) {
342  auto transport_server = std::dynamic_pointer_cast<transport::TcpServer>(impl_->channel_);
343  if (transport_server) transport_server->set_unlimited_clients();
344  }
345  return *this;
346 }
347 
349  impl_->notify_send_failure_ = e;
350  return *this;
351 }
353  impl_->manage_external_context_ = m;
354  return *this;
355 }
356 
357 } // namespace wrapper
358 } // namespace unilink