21 #include <boost/asio.hpp>
28 #include <type_traits>
51 net::strand<net::io_context::executor_type>
strand_;
52 std::unique_ptr<net::executor_work_guard<net::io_context::executor_type>>
work_guard_;
60 std::array<uint8_t, common::constants::DEFAULT_READ_BUFFER_SIZE>
rx_{};
61 std::deque<std::variant<memory::PooledBuffer, std::vector<uint8_t>, std::shared_ptr<const std::vector<uint8_t>>>>
tx_;
89 bp_high_(config.backpressure_threshold) {
94 :
ioc_(&external_ioc),
96 strand_(external_ioc.get_executor()),
99 bp_high_(config.backpressure_threshold) {
124 if (std::this_thread::get_id() !=
ioc_thread_.get_id()) {
138 boost::system::error_code ec;
178 [
self](
const boost::system::error_code& ec, std::size_t bytes) {
179 auto impl =
self->get_impl();
180 impl->handle_receive(
self, ec, bytes);
184 void handle_receive(std::shared_ptr<UdpChannel>
self,
const boost::system::error_code& ec, std::size_t bytes) {
185 if (ec == boost::asio::error::operation_aborted) {
190 state_.is_state(LinkState::Error)) {
194 if (ec == boost::asio::error::message_size || bytes >=
rx_.size()) {
215 }
catch (
const std::exception& e) {
216 UNILINK_LOG_ERROR(
"udp",
"on_bytes",
"Exception in bytes callback: " + std::string(e.what()));
236 state_.is_state(LinkState::Error)) {
252 auto current = std::move(
tx_.front());
254 auto bytes_queued = std::visit(
255 [](
auto&& buf) ->
size_t {
256 using Buffer = std::decay_t<decltype(buf)>;
257 if constexpr (std::is_same_v<Buffer, std::shared_ptr<
const std::vector<uint8_t>>>) {
258 return buf ? buf->size() : 0;
265 auto on_write = [
self, bytes_queued](
const boost::system::error_code& ec, std::size_t) {
266 auto impl =
self->get_impl();
267 impl->queue_bytes_ = (impl->queue_bytes_ > bytes_queued) ? (impl->queue_bytes_ - bytes_queued) : 0;
268 impl->report_backpressure(impl->queue_bytes_);
270 if (ec == boost::asio::error::operation_aborted) {
271 impl->writing_ =
false;
275 if (impl->stop_requested_.load() || impl->stopping_.load() || impl->state_.is_state(LinkState::Closed) ||
276 impl->state_.is_state(LinkState::Error)) {
277 impl->writing_ =
false;
279 impl->queue_bytes_ = 0;
280 impl->report_backpressure(impl->queue_bytes_);
286 impl->transition_to(LinkState::Error, ec);
287 impl->writing_ =
false;
291 impl->writing_ =
false;
292 impl->do_write(
self);
297 using T = std::decay_t<decltype(buf)>;
299 auto* data_ptr = [&]() {
300 if constexpr (std::is_same_v<T, std::shared_ptr<
const std::vector<uint8_t>>>) {
308 if constexpr (std::is_same_v<T, std::shared_ptr<
const std::vector<uint8_t>>>) {
317 [buf_captured = std::move(buf), on_write = std::move(on_write)](
318 const boost::system::error_code& ec, std::size_t bytes)
mutable { on_write(ec, bytes); });
324 boost::system::error_code ec;
333 }
catch (
const std::exception& e) {
334 UNILINK_LOG_ERROR(
"udp",
"on_state",
"Exception in state callback: " + std::string(e.what()));
347 }
catch (
const std::exception& e) {
348 UNILINK_LOG_ERROR(
"udp",
"on_backpressure",
"Exception in backpressure callback: " + std::string(e.what()));
350 UNILINK_LOG_ERROR(
"udp",
"on_backpressure",
"Unknown exception in backpressure callback");
356 }
catch (
const std::exception& e) {
357 UNILINK_LOG_ERROR(
"udp",
"on_backpressure",
"Exception in backpressure callback: " + std::string(e.what()));
359 UNILINK_LOG_ERROR(
"udp",
"on_backpressure",
"Unknown exception in backpressure callback");
365 std::variant<
memory::PooledBuffer, std::vector<uint8_t>, std::shared_ptr<
const std::vector<uint8_t>>>&& buffer,
368 state_.is_state(LinkState::Error)) {
378 tx_.push_back(std::move(buffer));
385 boost::system::error_code ec;
394 if (ec == net::error::operation_aborted) {
398 const auto current =
state_.get_state();
399 if ((current == LinkState::Closed || current == LinkState::Error) &&
400 (target == LinkState::Closed || target == LinkState::Error)) {
404 if (target == LinkState::Closed || target == LinkState::Error) {
408 }
else if (current == target) {
424 if (had_backpressure &&
on_bp_) {
451 if (std::this_thread::get_id() ==
ioc_thread_.get_id()) {
466 return std::shared_ptr<UdpChannel>(
new UdpChannel(cfg));
470 return std::shared_ptr<UdpChannel>(
new UdpChannel(cfg, ioc));
475 UdpChannel::UdpChannel(
const config::UdpConfig& cfg, net::io_context& ioc) : impl_(std::make_unique<Impl>(cfg, ioc)) {}
480 impl_->stop_requested_.store(
true);
481 impl_->stopping_.store(
true);
482 impl_->perform_stop_cleanup();
483 impl_->join_ioc_thread(
true);
491 auto impl = get_impl();
492 if (impl->started_)
return;
493 if (!impl->cfg_.is_valid()) {
494 throw std::runtime_error(
"Invalid UDP configuration");
497 if (impl->owns_ioc_ && impl->owned_ioc_ && impl->owned_ioc_->stopped()) {
498 impl->owned_ioc_->restart();
501 if (impl->ioc_thread_.joinable()) {
502 impl->join_ioc_thread(
false);
505 if (impl->owns_ioc_) {
507 std::make_unique<net::executor_work_guard<net::io_context::executor_type>>(impl->ioc_->get_executor());
510 auto self = shared_from_this();
511 net::dispatch(impl->strand_, [
self]() {
512 auto impl = self->get_impl();
513 impl->stop_requested_.store(false);
514 impl->stopping_.store(false);
515 impl->terminal_state_notified_.store(false);
516 impl->connected_.store(false);
517 impl->opened_.store(false);
518 impl->writing_ = false;
519 impl->queue_bytes_ = 0;
520 impl->backpressure_active_ = false;
521 impl->state_.set_state(LinkState::Idle);
523 impl->transition_to(LinkState::Connecting);
524 impl->open_socket(self);
527 if (impl->owns_ioc_) {
528 impl->ioc_thread_ = std::thread([impl]() {
536 impl->started_ =
true;
540 auto impl = get_impl();
541 if (impl->stop_requested_.exchange(
true))
return;
543 if (!impl->started_) {
544 impl->transition_to(LinkState::Closed);
545 impl->on_bytes_ =
nullptr;
546 impl->on_state_ =
nullptr;
547 impl->on_bp_ =
nullptr;
551 impl->stopping_.store(
true);
552 auto self = shared_from_this();
553 net::post(impl->strand_, [
self]() { self->get_impl()->perform_stop_cleanup(); });
555 impl->join_ioc_thread(
false);
557 if (impl->owns_ioc_ && impl->owned_ioc_) {
558 impl->owned_ioc_->restart();
561 impl->started_ =
false;
567 auto impl = get_impl();
568 if (data.
empty())
return;
569 if (impl->stop_requested_.load())
return;
570 if (impl->stopping_.load() || impl->state_.is_state(LinkState::Closed) || impl->state_.is_state(LinkState::Error))
572 if (!impl->remote_endpoint_) {
577 size_t size = data.
size();
583 if (size > impl->bp_limit_) {
585 impl->transition_to(LinkState::Error);
589 if (impl->cfg_.enable_memory_pool && size <= 65536) {
591 if (pooled.
valid()) {
593 net::post(impl->strand_, [
self = shared_from_this(), buf = std::move(pooled), size]()
mutable {
594 auto impl =
self->get_impl();
595 if (!impl->enqueue_buffer(std::move(buf), size))
return;
596 impl->do_write(
self);
602 std::vector<uint8_t> copy(data.
begin(), data.
end());
603 net::post(impl->strand_, [
self = shared_from_this(), buf = std::move(copy), size]()
mutable {
604 auto impl =
self->get_impl();
605 if (!impl->enqueue_buffer(std::move(buf), size))
return;
606 impl->do_write(
self);
611 auto impl = get_impl();
612 auto size = data.size();
613 if (size == 0)
return;
614 if (impl->stop_requested_.load())
return;
615 if (impl->stopping_.load() || impl->state_.is_state(LinkState::Closed) || impl->state_.is_state(LinkState::Error))
617 if (!impl->remote_endpoint_) {
622 if (size > impl->bp_limit_) {
624 impl->transition_to(LinkState::Error);
628 net::post(impl->strand_, [
self = shared_from_this(), buf = std::move(data), size]()
mutable {
629 auto impl =
self->get_impl();
630 if (!impl->enqueue_buffer(std::move(buf), size))
return;
631 impl->do_write(
self);
636 auto impl = get_impl();
637 if (!data || data->empty())
return;
638 if (impl->stop_requested_.load())
return;
639 if (impl->stopping_.load() || impl->state_.is_state(LinkState::Closed) || impl->state_.is_state(LinkState::Error))
641 if (!impl->remote_endpoint_) {
646 auto size = data->size();
647 if (size > impl->bp_limit_) {
649 impl->transition_to(LinkState::Error);
653 net::post(impl->strand_, [
self = shared_from_this(), buf = std::move(data), size]()
mutable {
654 auto impl =
self->get_impl();
655 if (!impl->enqueue_buffer(std::move(buf), size))
return;
656 impl->do_write(
self);
std::function< void(memory::ConstByteSpan)> OnBytes
std::function< void(base::LinkState)> OnState
std::function< void(size_t)> OnBackpressure
RAII wrapper for memory pool buffers with enhanced safety.
A C++17 compatible span-like class for safe array access.
constexpr size_type size() const noexcept
constexpr pointer data() const noexcept
constexpr iterator begin() const noexcept
constexpr iterator end() const noexcept
constexpr bool empty() const noexcept
UDP Transport implementation.
void on_backpressure(OnBackpressure cb) override
void on_state(OnState cb) override
void async_write_move(std::vector< uint8_t > &&data) override
bool is_connected() const override
void async_write_copy(memory::ConstByteSpan data) override
static std::shared_ptr< UdpChannel > create(const config::UdpConfig &cfg)
void on_bytes(OnBytes cb) override
void async_write_shared(std::shared_ptr< const std::vector< uint8_t >> data) override
UdpChannel(UdpChannel &&) noexcept
#define UNILINK_LOG_WARNING(component, operation, message)
#define UNILINK_LOG_ERROR(component, operation, message)
constexpr size_t MAX_BUFFER_SIZE
constexpr size_t DEFAULT_BACKPRESSURE_THRESHOLD
void safe_memcpy(uint8_t *dest, const uint8_t *src, size_t size)
Safely copy memory with bounds checking.
ThreadSafeState< base::LinkState > ThreadSafeLinkState
size_t backpressure_threshold
std::optional< std::string > remote_address
void validate_and_clamp()
bool stop_on_callback_exception
std::string local_address
std::optional< uint16_t > remote_port
std::atomic< bool > stopping_
Impl(const config::UdpConfig &config)
std::unique_ptr< net::io_context > owned_ioc_
void start_receive(std::shared_ptr< UdpChannel > self)
std::atomic< bool > opened_
std::optional< udp::endpoint > remote_endpoint_
udp::endpoint local_endpoint_
void open_socket(std::shared_ptr< UdpChannel > self)
void join_ioc_thread(bool allow_detach)
bool backpressure_active_
std::deque< std::variant< memory::PooledBuffer, std::vector< uint8_t >, std::shared_ptr< const std::vector< uint8_t > > > > tx_
bool enqueue_buffer(std::variant< memory::PooledBuffer, std::vector< uint8_t >, std::shared_ptr< const std::vector< uint8_t >>> &&buffer, size_t size)
void report_backpressure(size_t queued_bytes)
net::strand< net::io_context::executor_type > strand_
std::atomic< bool > stop_requested_
void set_remote_from_config()
std::atomic< bool > connected_
void handle_receive(std::shared_ptr< UdpChannel > self, const boost::system::error_code &ec, std::size_t bytes)
ThreadSafeLinkState state_
std::array< uint8_t, common::constants::DEFAULT_READ_BUFFER_SIZE > rx_
Impl(const config::UdpConfig &config, net::io_context &external_ioc)
void transition_to(LinkState target, const boost::system::error_code &ec={})
void do_write(std::shared_ptr< UdpChannel > self)
udp::endpoint recv_endpoint_
std::atomic< bool > terminal_state_notified_
std::unique_ptr< net::executor_work_guard< net::io_context::executor_type > > work_guard_
void perform_stop_cleanup()