unilink  0.4.3
A simple C++ library for unified async communication
udp.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2025 Jinwoo Sung
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <array>
20 #include <atomic>
21 #include <boost/asio.hpp>
22 #include <cstddef>
23 #include <deque>
24 #include <memory>
25 #include <optional>
26 #include <stdexcept>
27 #include <thread>
28 #include <type_traits>
29 #include <variant>
30 #include <vector>
31 
32 #include "unilink/base/common.hpp"
38 
39 namespace unilink {
40 namespace transport {
41 
42 namespace net = boost::asio;
43 using udp = net::ip::udp;
44 using base::LinkState;
46 
48  std::unique_ptr<net::io_context> owned_ioc_;
49  net::io_context* ioc_;
50  bool owns_ioc_{true};
51  net::strand<net::io_context::executor_type> strand_;
52  std::unique_ptr<net::executor_work_guard<net::io_context::executor_type>> work_guard_;
53  std::thread ioc_thread_;
54 
55  udp::socket socket_;
56  udp::endpoint local_endpoint_;
57  udp::endpoint recv_endpoint_;
58  std::optional<udp::endpoint> remote_endpoint_;
59 
60  std::array<uint8_t, common::constants::DEFAULT_READ_BUFFER_SIZE> rx_{};
61  std::deque<std::variant<memory::PooledBuffer, std::vector<uint8_t>, std::shared_ptr<const std::vector<uint8_t>>>> tx_;
62  bool writing_{false};
63  size_t queue_bytes_{0};
65  size_t bp_high_;
66  size_t bp_low_;
67  size_t bp_limit_;
68  bool backpressure_active_{false};
69 
70  std::atomic<bool> stop_requested_{false};
71  std::atomic<bool> stopping_{false};
72  std::atomic<bool> opened_{false};
73  std::atomic<bool> connected_{false};
74  bool started_{false};
75  ThreadSafeLinkState state_{LinkState::Idle};
76  std::atomic<bool> terminal_state_notified_{false};
77 
81 
82  explicit Impl(const config::UdpConfig& config)
83  : owned_ioc_(std::make_unique<net::io_context>()),
84  ioc_(owned_ioc_.get()),
85  owns_ioc_(true),
86  strand_(ioc_->get_executor()),
88  cfg_(config),
89  bp_high_(config.backpressure_threshold) {
90  init();
91  }
92 
93  Impl(const config::UdpConfig& config, net::io_context& external_ioc)
94  : ioc_(&external_ioc),
95  owns_ioc_(false),
96  strand_(external_ioc.get_executor()),
98  cfg_(config),
99  bp_high_(config.backpressure_threshold) {
100  init();
101  }
102 
103  void init() {
106  bp_low_ = bp_high_ > 1 ? bp_high_ / 2 : bp_high_;
107  if (bp_low_ == 0) bp_low_ = 1;
110  if (bp_limit_ < bp_high_) {
112  }
114  }
115 
116  ~Impl() {
117  try {
118  stop_requested_.store(true);
119  stopping_.store(true);
120  if (owns_ioc_ && ioc_) {
121  ioc_->stop();
122  }
123  if (ioc_thread_.joinable()) {
124  if (std::this_thread::get_id() != ioc_thread_.get_id()) {
125  ioc_thread_.join();
126  } else {
127  ioc_thread_.detach();
128  }
129  }
131  } catch (...) {
132  }
133  }
134 
135  void open_socket(std::shared_ptr<UdpChannel> self) {
136  if (stopping_.load() || stop_requested_.load()) return;
137 
138  boost::system::error_code ec;
139  auto address = net::ip::make_address(cfg_.local_address, ec);
140  if (ec) {
141  UNILINK_LOG_ERROR("udp", "bind", "Invalid local address: " + cfg_.local_address);
142  transition_to(LinkState::Error, ec);
143  return;
144  }
145 
146  local_endpoint_ = udp::endpoint(address, cfg_.local_port);
147  socket_.open(local_endpoint_.protocol(), ec);
148  if (ec) {
149  UNILINK_LOG_ERROR("udp", "open", "Socket open failed: " + ec.message());
150  transition_to(LinkState::Error, ec);
151  return;
152  }
153 
154  socket_.bind(local_endpoint_, ec);
155  if (ec) {
156  UNILINK_LOG_ERROR("udp", "bind", "Bind failed: " + ec.message());
157  transition_to(LinkState::Error, ec);
158  return;
159  }
160 
161  opened_.store(true);
162  if (remote_endpoint_) {
163  connected_.store(true);
164  transition_to(LinkState::Connected);
165  } else {
166  transition_to(LinkState::Listening);
167  }
168  start_receive(self);
169  }
170 
171  void start_receive(std::shared_ptr<UdpChannel> self) {
172  if (stopping_.load() || stop_requested_.load() || state_.is_state(LinkState::Closed) ||
173  state_.is_state(LinkState::Error) || !socket_.is_open()) {
174  return;
175  }
176 
177  socket_.async_receive_from(net::buffer(rx_), recv_endpoint_,
178  [self](const boost::system::error_code& ec, std::size_t bytes) {
179  auto impl = self->get_impl();
180  impl->handle_receive(self, ec, bytes);
181  });
182  }
183 
184  void handle_receive(std::shared_ptr<UdpChannel> self, const boost::system::error_code& ec, std::size_t bytes) {
185  if (ec == boost::asio::error::operation_aborted) {
186  return;
187  }
188 
189  if (stopping_.load() || stop_requested_.load() || state_.is_state(LinkState::Closed) ||
190  state_.is_state(LinkState::Error)) {
191  return;
192  }
193 
194  if (ec == boost::asio::error::message_size || bytes >= rx_.size()) {
195  UNILINK_LOG_ERROR("udp", "receive", "Datagram truncated (buffer too small)");
196  transition_to(LinkState::Error, ec);
197  return;
198  }
199 
200  if (ec) {
201  UNILINK_LOG_ERROR("udp", "receive", "Receive failed: " + ec.message());
202  transition_to(LinkState::Error, ec);
203  return;
204  }
205 
206  if (!remote_endpoint_) {
208  connected_.store(true);
209  transition_to(LinkState::Connected);
210  }
211 
212  if (bytes > 0 && on_bytes_) {
213  try {
214  on_bytes_(memory::ConstByteSpan(rx_.data(), bytes));
215  } catch (const std::exception& e) {
216  UNILINK_LOG_ERROR("udp", "on_bytes", "Exception in bytes callback: " + std::string(e.what()));
218  transition_to(LinkState::Error);
219  return;
220  }
221  } catch (...) {
222  UNILINK_LOG_ERROR("udp", "on_bytes", "Unknown exception in bytes callback");
224  transition_to(LinkState::Error);
225  return;
226  }
227  }
228  }
229 
230  start_receive(self);
231  }
232 
233  void do_write(std::shared_ptr<UdpChannel> self) {
234  if (writing_ || tx_.empty()) return;
235  if (stop_requested_.load() || stopping_.load() || state_.is_state(LinkState::Closed) ||
236  state_.is_state(LinkState::Error)) {
237  tx_.clear();
238  queue_bytes_ = 0;
239  writing_ = false;
240  backpressure_active_ = false;
242  return;
243  }
244  if (!remote_endpoint_) {
245  UNILINK_LOG_WARNING("udp", "write", "Remote endpoint not set; dropping write request");
246  writing_ = false;
247  return;
248  }
249 
250  writing_ = true;
251 
252  auto current = std::move(tx_.front());
253  tx_.pop_front();
254  auto bytes_queued = std::visit(
255  [](auto&& buf) -> size_t {
256  using Buffer = std::decay_t<decltype(buf)>;
257  if constexpr (std::is_same_v<Buffer, std::shared_ptr<const std::vector<uint8_t>>>) {
258  return buf ? buf->size() : 0;
259  } else {
260  return buf.size();
261  }
262  },
263  current);
264 
265  auto on_write = [self, bytes_queued](const boost::system::error_code& ec, std::size_t) {
266  auto impl = self->get_impl();
267  impl->queue_bytes_ = (impl->queue_bytes_ > bytes_queued) ? (impl->queue_bytes_ - bytes_queued) : 0;
268  impl->report_backpressure(impl->queue_bytes_);
269 
270  if (ec == boost::asio::error::operation_aborted) {
271  impl->writing_ = false;
272  return;
273  }
274 
275  if (impl->stop_requested_.load() || impl->stopping_.load() || impl->state_.is_state(LinkState::Closed) ||
276  impl->state_.is_state(LinkState::Error)) {
277  impl->writing_ = false;
278  impl->tx_.clear();
279  impl->queue_bytes_ = 0;
280  impl->report_backpressure(impl->queue_bytes_);
281  return;
282  }
283 
284  if (ec) {
285  UNILINK_LOG_ERROR("udp", "write", "Send failed: " + ec.message());
286  impl->transition_to(LinkState::Error, ec);
287  impl->writing_ = false;
288  return;
289  }
290 
291  impl->writing_ = false;
292  impl->do_write(self);
293  };
294 
295  std::visit(
296  [&](auto&& buf) {
297  using T = std::decay_t<decltype(buf)>;
298 
299  auto* data_ptr = [&]() {
300  if constexpr (std::is_same_v<T, std::shared_ptr<const std::vector<uint8_t>>>) {
301  return buf->data();
302  } else {
303  return buf.data();
304  }
305  }();
306 
307  auto size = [&]() {
308  if constexpr (std::is_same_v<T, std::shared_ptr<const std::vector<uint8_t>>>) {
309  return buf->size();
310  } else {
311  return buf.size();
312  }
313  }();
314 
315  socket_.async_send_to(
316  net::buffer(data_ptr, size), *remote_endpoint_,
317  [buf_captured = std::move(buf), on_write = std::move(on_write)](
318  const boost::system::error_code& ec, std::size_t bytes) mutable { on_write(ec, bytes); });
319  },
320  std::move(current));
321  }
322 
323  void close_socket() {
324  boost::system::error_code ec;
325  socket_.cancel(ec);
326  socket_.close(ec);
327  }
328 
329  void notify_state() {
330  if (!on_state_) return;
331  try {
332  on_state_(state_.get_state());
333  } catch (const std::exception& e) {
334  UNILINK_LOG_ERROR("udp", "on_state", "Exception in state callback: " + std::string(e.what()));
335  } catch (...) {
336  UNILINK_LOG_ERROR("udp", "on_state", "Unknown exception in state callback");
337  }
338  }
339 
340  void report_backpressure(size_t queued_bytes) {
341  if (stop_requested_.load() || !on_bp_) return;
342 
343  if (!backpressure_active_ && queued_bytes >= bp_high_) {
344  backpressure_active_ = true;
345  try {
346  on_bp_(queued_bytes);
347  } catch (const std::exception& e) {
348  UNILINK_LOG_ERROR("udp", "on_backpressure", "Exception in backpressure callback: " + std::string(e.what()));
349  } catch (...) {
350  UNILINK_LOG_ERROR("udp", "on_backpressure", "Unknown exception in backpressure callback");
351  }
352  } else if (backpressure_active_ && queued_bytes <= bp_low_) {
353  backpressure_active_ = false;
354  try {
355  on_bp_(queued_bytes);
356  } catch (const std::exception& e) {
357  UNILINK_LOG_ERROR("udp", "on_backpressure", "Exception in backpressure callback: " + std::string(e.what()));
358  } catch (...) {
359  UNILINK_LOG_ERROR("udp", "on_backpressure", "Unknown exception in backpressure callback");
360  }
361  }
362  }
363 
365  std::variant<memory::PooledBuffer, std::vector<uint8_t>, std::shared_ptr<const std::vector<uint8_t>>>&& buffer,
366  size_t size) {
367  if (stopping_.load() || stop_requested_.load() || state_.is_state(LinkState::Closed) ||
368  state_.is_state(LinkState::Error)) {
369  return false;
370  }
371 
372  if (queue_bytes_ + size > bp_limit_) {
373  UNILINK_LOG_ERROR("udp", "write", "Queue limit exceeded");
374  transition_to(LinkState::Error);
375  return false;
376  }
377  queue_bytes_ += size;
378  tx_.push_back(std::move(buffer));
380  return true;
381  }
382 
384  if (!cfg_.remote_address || !cfg_.remote_port) return;
385  boost::system::error_code ec;
386  auto addr = net::ip::make_address(*cfg_.remote_address, ec);
387  if (ec) {
388  throw std::runtime_error("Invalid remote address: " + *cfg_.remote_address);
389  }
390  remote_endpoint_ = udp::endpoint(addr, *cfg_.remote_port);
391  }
392 
393  void transition_to(LinkState target, const boost::system::error_code& ec = {}) {
394  if (ec == net::error::operation_aborted) {
395  return;
396  }
397 
398  const auto current = state_.get_state();
399  if ((current == LinkState::Closed || current == LinkState::Error) &&
400  (target == LinkState::Closed || target == LinkState::Error)) {
401  return;
402  }
403 
404  if (target == LinkState::Closed || target == LinkState::Error) {
405  if (terminal_state_notified_.exchange(true)) {
406  return;
407  }
408  } else if (current == target) {
409  return;
410  }
411 
412  state_.set_state(target);
413  notify_state();
414  }
415 
417  try {
418  close_socket();
419  tx_.clear();
420  queue_bytes_ = 0;
421  writing_ = false;
422  const bool had_backpressure = backpressure_active_;
423  backpressure_active_ = false;
424  if (had_backpressure && on_bp_) {
425  try {
427  } catch (...) {
428  }
429  }
430  connected_.store(false);
431  opened_.store(false);
432  if (owns_ioc_ && ioc_) {
433  ioc_->stop();
434  }
435  if (owns_ioc_ && work_guard_) {
436  work_guard_->reset();
437  }
438  transition_to(LinkState::Closed);
439  on_bytes_ = nullptr;
440  on_state_ = nullptr;
441  on_bp_ = nullptr;
442  } catch (...) {
443  }
444  }
445 
446  void join_ioc_thread(bool allow_detach) {
447  if (!owns_ioc_ || !ioc_thread_.joinable()) {
448  return;
449  }
450 
451  if (std::this_thread::get_id() == ioc_thread_.get_id()) {
452  if (allow_detach) {
453  ioc_thread_.detach();
454  }
455  return;
456  }
457 
458  try {
459  ioc_thread_.join();
460  } catch (...) {
461  }
462  }
463 };
464 
465 std::shared_ptr<UdpChannel> UdpChannel::create(const config::UdpConfig& cfg) {
466  return std::shared_ptr<UdpChannel>(new UdpChannel(cfg));
467 }
468 
469 std::shared_ptr<UdpChannel> UdpChannel::create(const config::UdpConfig& cfg, net::io_context& ioc) {
470  return std::shared_ptr<UdpChannel>(new UdpChannel(cfg, ioc));
471 }
472 
473 UdpChannel::UdpChannel(const config::UdpConfig& cfg) : impl_(std::make_unique<Impl>(cfg)) {}
474 
475 UdpChannel::UdpChannel(const config::UdpConfig& cfg, net::io_context& ioc) : impl_(std::make_unique<Impl>(cfg, ioc)) {}
476 
478  if (impl_) {
479  // Cannot use shared_from_this in destructor. Use internal cleanup directly.
480  impl_->stop_requested_.store(true);
481  impl_->stopping_.store(true);
482  impl_->perform_stop_cleanup();
483  impl_->join_ioc_thread(true);
484  }
485 }
486 
487 UdpChannel::UdpChannel(UdpChannel&&) noexcept = default;
488 UdpChannel& UdpChannel::operator=(UdpChannel&&) noexcept = default;
489 
490 void UdpChannel::start() {
491  auto impl = get_impl();
492  if (impl->started_) return;
493  if (!impl->cfg_.is_valid()) {
494  throw std::runtime_error("Invalid UDP configuration");
495  }
496 
497  if (impl->owns_ioc_ && impl->owned_ioc_ && impl->owned_ioc_->stopped()) {
498  impl->owned_ioc_->restart();
499  }
500 
501  if (impl->ioc_thread_.joinable()) {
502  impl->join_ioc_thread(false);
503  }
504 
505  if (impl->owns_ioc_) {
506  impl->work_guard_ =
507  std::make_unique<net::executor_work_guard<net::io_context::executor_type>>(impl->ioc_->get_executor());
508  }
509 
510  auto self = shared_from_this();
511  net::dispatch(impl->strand_, [self]() {
512  auto impl = self->get_impl();
513  impl->stop_requested_.store(false);
514  impl->stopping_.store(false);
515  impl->terminal_state_notified_.store(false);
516  impl->connected_.store(false);
517  impl->opened_.store(false);
518  impl->writing_ = false;
519  impl->queue_bytes_ = 0;
520  impl->backpressure_active_ = false;
521  impl->state_.set_state(LinkState::Idle);
522 
523  impl->transition_to(LinkState::Connecting);
524  impl->open_socket(self);
525  });
526 
527  if (impl->owns_ioc_) {
528  impl->ioc_thread_ = std::thread([impl]() {
529  try {
530  impl->ioc_->run();
531  } catch (...) {
532  }
533  });
534  }
535 
536  impl->started_ = true;
537 }
538 
540  auto impl = get_impl();
541  if (impl->stop_requested_.exchange(true)) return;
542 
543  if (!impl->started_) {
544  impl->transition_to(LinkState::Closed);
545  impl->on_bytes_ = nullptr;
546  impl->on_state_ = nullptr;
547  impl->on_bp_ = nullptr;
548  return;
549  }
550 
551  impl->stopping_.store(true);
552  auto self = shared_from_this();
553  net::post(impl->strand_, [self]() { self->get_impl()->perform_stop_cleanup(); });
554 
555  impl->join_ioc_thread(false);
556 
557  if (impl->owns_ioc_ && impl->owned_ioc_) {
558  impl->owned_ioc_->restart();
559  }
560 
561  impl->started_ = false;
562 }
563 
564 bool UdpChannel::is_connected() const { return get_impl()->connected_.load(); }
565 
567  auto impl = get_impl();
568  if (data.empty()) return;
569  if (impl->stop_requested_.load()) return;
570  if (impl->stopping_.load() || impl->state_.is_state(LinkState::Closed) || impl->state_.is_state(LinkState::Error))
571  return;
572  if (!impl->remote_endpoint_) {
573  UNILINK_LOG_WARNING("udp", "write", "Remote endpoint not set; dropping write request");
574  return;
575  }
576 
577  size_t size = data.size();
579  UNILINK_LOG_ERROR("udp", "write", "Write size exceeds maximum allowed");
580  return;
581  }
582 
583  if (size > impl->bp_limit_) {
584  UNILINK_LOG_ERROR("udp", "write", "Queue limit exceeded by single write");
585  impl->transition_to(LinkState::Error);
586  return;
587  }
588 
589  if (impl->cfg_.enable_memory_pool && size <= 65536) {
590  memory::PooledBuffer pooled(size);
591  if (pooled.valid()) {
592  common::safe_memory::safe_memcpy(pooled.data(), data.data(), size);
593  net::post(impl->strand_, [self = shared_from_this(), buf = std::move(pooled), size]() mutable {
594  auto impl = self->get_impl();
595  if (!impl->enqueue_buffer(std::move(buf), size)) return;
596  impl->do_write(self);
597  });
598  return;
599  }
600  }
601 
602  std::vector<uint8_t> copy(data.begin(), data.end());
603  net::post(impl->strand_, [self = shared_from_this(), buf = std::move(copy), size]() mutable {
604  auto impl = self->get_impl();
605  if (!impl->enqueue_buffer(std::move(buf), size)) return;
606  impl->do_write(self);
607  });
608 }
609 
610 void UdpChannel::async_write_move(std::vector<uint8_t>&& data) {
611  auto impl = get_impl();
612  auto size = data.size();
613  if (size == 0) return;
614  if (impl->stop_requested_.load()) return;
615  if (impl->stopping_.load() || impl->state_.is_state(LinkState::Closed) || impl->state_.is_state(LinkState::Error))
616  return;
617  if (!impl->remote_endpoint_) {
618  UNILINK_LOG_WARNING("udp", "write", "Remote endpoint not set; dropping write request");
619  return;
620  }
621 
622  if (size > impl->bp_limit_) {
623  UNILINK_LOG_ERROR("udp", "write", "Queue limit exceeded by single write");
624  impl->transition_to(LinkState::Error);
625  return;
626  }
627 
628  net::post(impl->strand_, [self = shared_from_this(), buf = std::move(data), size]() mutable {
629  auto impl = self->get_impl();
630  if (!impl->enqueue_buffer(std::move(buf), size)) return;
631  impl->do_write(self);
632  });
633 }
634 
635 void UdpChannel::async_write_shared(std::shared_ptr<const std::vector<uint8_t>> data) {
636  auto impl = get_impl();
637  if (!data || data->empty()) return;
638  if (impl->stop_requested_.load()) return;
639  if (impl->stopping_.load() || impl->state_.is_state(LinkState::Closed) || impl->state_.is_state(LinkState::Error))
640  return;
641  if (!impl->remote_endpoint_) {
642  UNILINK_LOG_WARNING("udp", "write", "Remote endpoint not set; dropping write request");
643  return;
644  }
645 
646  auto size = data->size();
647  if (size > impl->bp_limit_) {
648  UNILINK_LOG_ERROR("udp", "write", "Queue limit exceeded by single write");
649  impl->transition_to(LinkState::Error);
650  return;
651  }
652 
653  net::post(impl->strand_, [self = shared_from_this(), buf = std::move(data), size]() mutable {
654  auto impl = self->get_impl();
655  if (!impl->enqueue_buffer(std::move(buf), size)) return;
656  impl->do_write(self);
657  });
658 }
659 
660 void UdpChannel::on_bytes(OnBytes cb) { impl_->on_bytes_ = std::move(cb); }
661 
662 void UdpChannel::on_state(OnState cb) { impl_->on_state_ = std::move(cb); }
663 
664 void UdpChannel::on_backpressure(OnBackpressure cb) { impl_->on_bp_ = std::move(cb); }
665 
666 } // namespace transport
667 } // namespace unilink
#define UNILINK_LOG_WARNING(component, operation, message)
Definition: logger.hpp:272
#define UNILINK_LOG_ERROR(component, operation, message)
Definition: logger.hpp:279