20 #include <boost/asio.hpp>
48 std::variant<memory::PooledBuffer, std::vector<uint8_t>, std::shared_ptr<const std::vector<uint8_t>>>;
51 net::io_context& acquire_shared_serial_context() {
54 return manager.get_context();
63 net::strand<net::io_context::executor_type>
strand_;
64 std::unique_ptr<net::executor_work_guard<net::io_context::executor_type>>
work_guard_;
67 std::unique_ptr<interface::SerialPortInterface>
port_;
71 std::vector<uint8_t>
rx_;
72 std::deque<BufferVariant>
tx_;
89 : ioc_(acquire_shared_serial_context()),
91 strand_(ioc_.get_executor()),
94 bp_high_(cfg.backpressure_threshold) {
96 port_ = std::make_unique<BoostSerialPort>(ioc_);
102 strand_(ioc.get_executor()),
103 port_(std::move(port)),
106 bp_high_(cfg.backpressure_threshold) {
115 bp_low_ = bp_high_ > 1 ? bp_high_ / 2 : bp_high_;
116 if (bp_low_ == 0) bp_low_ = 1;
126 if (ioc_thread_.joinable()) {
127 if (std::this_thread::get_id() != ioc_thread_.get_id()) {
130 ioc_thread_.detach();
139 boost::system::error_code ec;
140 port_->open(cfg_.
device, ec);
147 port_->set_option(net::serial_port_base::baud_rate(cfg_.
baud_rate), ec);
154 port_->set_option(net::serial_port_base::character_size(cfg_.
char_size), ec);
161 using sb = net::serial_port_base::stop_bits;
162 port_->set_option(sb(cfg_.
stop_bits == 2 ? sb::two : sb::one), ec);
169 using pa = net::serial_port_base::parity;
170 pa::type p = pa::none;
175 port_->set_option(pa(p), ec);
182 using fc = net::serial_port_base::flow_control;
183 fc::type f = fc::none;
188 port_->set_option(fc(f), ec);
190 UNILINK_LOG_ERROR(
"serial",
"configure",
"Failed flow control: " + ec.message());
199 state_.set_state(LinkState::Connected);
205 port_->async_read_some(
206 net::buffer(rx_.data(), rx_.size()), net::bind_executor(strand_, [
self](
auto ec, std::size_t n) {
207 auto impl = self->get_impl();
209 impl->handle_error(self,
"read", ec);
212 if (impl->on_bytes_) {
214 impl->on_bytes_(memory::ConstByteSpan(impl->rx_.data(), n));
215 } catch (
const std::exception& e) {
216 UNILINK_LOG_ERROR(
"serial",
"on_bytes",
"Exception in callback: " + std::string(e.what()));
217 if (impl->cfg_.stop_on_callback_exception) {
218 impl->opened_.store(
false);
220 impl->state_.set_state(LinkState::Error);
221 impl->notify_state();
224 impl->handle_error(
self,
"on_bytes_callback", make_error_code(boost::system::errc::io_error));
227 if (impl->cfg_.stop_on_callback_exception) {
228 impl->opened_.store(
false);
230 impl->state_.set_state(LinkState::Error);
231 impl->notify_state();
234 impl->handle_error(
self,
"on_bytes_callback", make_error_code(boost::system::errc::io_error));
238 impl->start_read(
self);
243 if (stopping_.load() || tx_.empty()) {
249 current_write_buffer_ = std::move(tx_.front());
252 auto& current = *current_write_buffer_;
254 auto on_write = [
self](
const boost::system::error_code& ec, std::size_t n) {
255 auto impl =
self->get_impl();
256 impl->current_write_buffer_.reset();
258 if (impl->queued_bytes_ >= n) {
259 impl->queued_bytes_ -= n;
261 impl->queued_bytes_ = 0;
263 impl->report_backpressure(impl->queued_bytes_);
265 if (impl->stopping_.load()) {
266 impl->writing_ =
false;
271 impl->handle_error(
self,
"write", ec);
274 impl->do_write(
self);
279 using T = std::decay_t<decltype(buf)>;
280 auto* data_ptr = [&]() {
281 if constexpr (std::is_same_v<T, std::shared_ptr<
const std::vector<uint8_t>>>)
287 if constexpr (std::is_same_v<T, std::shared_ptr<
const std::vector<uint8_t>>>)
292 port_->async_write(net::buffer(data_ptr, size), net::bind_executor(strand_, on_write));
299 retry_timer_.cancel();
304 report_backpressure(queued_bytes_);
305 opened_.store(
false);
306 state_.set_state(LinkState::Closed);
312 void handle_error(std::shared_ptr<Serial>
self,
const char* where,
const boost::system::error_code& ec) {
313 if (ec == boost::asio::error::eof) {
314 if (
self) start_read(
self);
318 if (stopping_.load()) {
323 if (ec == boost::asio::error::operation_aborted) {
324 if (state_.is_state(LinkState::Error))
return;
329 bool retryable = cfg_.reopen_on_error;
334 if (cfg_.reopen_on_error) {
335 opened_.store(
false);
337 state_.set_state(LinkState::Connecting);
339 if (
self) schedule_retry(
self, where, ec);
341 opened_.store(
false);
343 state_.set_state(LinkState::Error);
348 void schedule_retry(std::shared_ptr<Serial>
self,
const char* where,
const boost::system::error_code& ec) {
350 UNILINK_LOG_INFO(
"serial",
"retry",
"Scheduling retry at " + std::string(where));
351 if (stopping_.load())
return;
352 retry_timer_.expires_after(std::chrono::milliseconds(cfg_.retry_interval_ms));
353 retry_timer_.async_wait([
self](
auto e) {
354 if (!e &&
self && !self->get_impl()->stopping_.load()) self->get_impl()->open_and_configure(
self);
359 boost::system::error_code ec;
360 if (port_ && port_->is_open()) {
366 if (stopping_.load() || !on_state_)
return;
368 on_state_(state_.get_state());
374 if (stopping_.load() || !on_bp_)
return;
376 if (!backpressure_active_ && qb >= bp_high_) {
377 backpressure_active_ =
true;
382 }
else if (backpressure_active_ && qb <= bp_low_) {
383 backpressure_active_ =
false;
393 return std::shared_ptr<Serial>(
new Serial(cfg));
397 return std::shared_ptr<Serial>(
new Serial(cfg, std::make_unique<BoostSerialPort>(ioc), ioc));
401 std::unique_ptr<interface::SerialPortInterface> port, net::io_context& ioc) {
402 return std::shared_ptr<Serial>(
new Serial(cfg, std::move(port), ioc));
405 Serial::Serial(
const config::SerialConfig& cfg) : impl_(std::make_unique<Impl>(cfg)) {}
407 Serial::Serial(
const config::SerialConfig& cfg, std::unique_ptr<interface::SerialPortInterface> port,
408 net::io_context& ioc)
409 : impl_(std::make_unique<Impl>(cfg, std::move(port), ioc)) {}
412 if (impl_ && impl_->started_ && !impl_->state_.is_state(LinkState::Closed)) {
414 impl_->stopping_.store(
true);
415 impl_->perform_cleanup();
416 if (impl_->owns_ioc_ && impl_->ioc_thread_.joinable()) {
417 impl_->ioc_thread_.join();
426 auto impl = get_impl();
427 if (impl->started_)
return;
428 impl->stopping_.store(
false);
429 UNILINK_LOG_INFO(
"serial",
"start",
"Starting device: " + impl->cfg_.device);
430 if (!impl->owns_ioc_) {
432 if (!manager.is_running()) manager.start();
433 if (impl->ioc_.stopped()) impl->ioc_.restart();
436 std::make_unique<net::executor_work_guard<net::io_context::executor_type>>(impl->ioc_.get_executor());
437 if (impl->owns_ioc_) {
438 impl->ioc_thread_ = std::thread([impl] { impl->ioc_.run(); });
440 auto self = shared_from_this();
441 net::post(impl->strand_, [
self] {
442 auto impl = self->get_impl();
443 if (!impl->stopping_.load()) {
444 impl->state_.set_state(LinkState::Connecting);
445 impl->notify_state();
446 impl->open_and_configure(self);
449 impl->started_ =
true;
453 auto impl = get_impl();
454 if (!impl->started_) {
455 impl->state_.set_state(LinkState::Closed);
459 if (impl->stopping_.exchange(
true))
return;
461 auto self = shared_from_this();
462 net::post(impl->strand_, [
self] {
463 auto impl = self->get_impl();
464 impl->perform_cleanup();
465 if (impl->owns_ioc_) impl->ioc_.stop();
468 if (impl->owns_ioc_ && impl->ioc_thread_.joinable()) {
469 impl->ioc_thread_.join();
470 impl->ioc_.restart();
472 impl->started_ =
false;
478 auto impl = get_impl();
479 if (impl->stopping_.load() || impl->state_.is_state(LinkState::Closed) || impl->state_.is_state(LinkState::Error))
482 size_t n = data.
size();
490 if (pooled.
valid()) {
492 net::post(impl->strand_, [
self = shared_from_this(), buf = std::move(pooled)]()
mutable {
493 auto impl =
self->get_impl();
494 if (impl->queued_bytes_ + buf.size() > impl->bp_limit_) {
496 impl->queued_bytes_ = 0;
497 impl->writing_ = false;
498 impl->report_backpressure(impl->queued_bytes_);
499 impl->state_.set_state(LinkState::Error);
500 impl->notify_state();
501 impl->handle_error(self,
"write_queue_overflow", make_error_code(boost::system::errc::no_buffer_space));
504 impl->queued_bytes_ += buf.size();
505 impl->tx_.emplace_back(std::move(buf));
506 impl->report_backpressure(impl->queued_bytes_);
507 if (!impl->writing_) impl->do_write(
self);
513 std::vector<uint8_t> fallback(data.
begin(), data.
end());
514 net::post(impl->strand_, [
self = shared_from_this(), buf = std::move(fallback)]()
mutable {
515 auto impl =
self->get_impl();
516 if (impl->queued_bytes_ + buf.size() > impl->bp_limit_) {
518 impl->queued_bytes_ = 0;
519 impl->writing_ = false;
520 impl->report_backpressure(impl->queued_bytes_);
521 impl->state_.set_state(LinkState::Error);
522 impl->notify_state();
523 impl->handle_error(self,
"write_queue_overflow", make_error_code(boost::system::errc::no_buffer_space));
526 impl->queued_bytes_ += buf.size();
527 impl->tx_.emplace_back(std::move(buf));
528 impl->report_backpressure(impl->queued_bytes_);
529 if (!impl->writing_) impl->do_write(
self);
534 auto impl = get_impl();
535 if (impl->stopping_.load() || impl->state_.is_state(LinkState::Closed) || impl->state_.is_state(LinkState::Error))
537 const auto added = data.size();
538 net::post(impl->strand_, [
self = shared_from_this(), buf = std::move(data), added]()
mutable {
539 auto impl =
self->get_impl();
540 if (impl->queued_bytes_ + added > impl->bp_limit_) {
542 impl->queued_bytes_ = 0;
543 impl->writing_ = false;
544 impl->report_backpressure(impl->queued_bytes_);
545 impl->state_.set_state(LinkState::Error);
546 impl->notify_state();
547 impl->handle_error(self,
"write_queue_overflow", make_error_code(boost::system::errc::no_buffer_space));
550 impl->queued_bytes_ += added;
551 impl->tx_.emplace_back(std::move(buf));
552 impl->report_backpressure(impl->queued_bytes_);
553 if (!impl->writing_) impl->do_write(
self);
558 auto impl = get_impl();
559 if (impl->stopping_.load() || impl->state_.is_state(LinkState::Closed) || impl->state_.is_state(LinkState::Error))
561 if (!data || data->empty())
return;
562 const auto added = data->size();
563 net::post(impl->strand_, [
self = shared_from_this(), buf = std::move(data), added]()
mutable {
564 auto impl =
self->get_impl();
565 if (impl->queued_bytes_ + added > impl->bp_limit_) {
567 impl->queued_bytes_ = 0;
568 impl->writing_ = false;
569 impl->report_backpressure(impl->queued_bytes_);
570 impl->state_.set_state(LinkState::Error);
571 impl->notify_state();
572 impl->handle_error(self,
"write_queue_overflow", make_error_code(boost::system::errc::no_buffer_space));
575 impl->queued_bytes_ += added;
576 impl->tx_.emplace_back(std::move(buf));
577 impl->report_backpressure(impl->queued_bytes_);
578 if (!impl->writing_) impl->do_write(
self);
static IoContextManager & instance()
std::function< void(memory::ConstByteSpan)> OnBytes
std::function< void(base::LinkState)> OnState
std::function< void(size_t)> OnBackpressure
RAII wrapper for memory pool buffers with enhanced safety.
A C++17 compatible span-like class for safe array access.
constexpr size_type size() const noexcept
constexpr pointer data() const noexcept
constexpr iterator begin() const noexcept
constexpr iterator end() const noexcept
Serial Transport implementation.
void on_backpressure(OnBackpressure cb) override
void async_write_move(std::vector< uint8_t > &&data) override
Serial(Serial &&) noexcept
void set_retry_interval(unsigned interval_ms)
void async_write_copy(memory::ConstByteSpan data) override
void async_write_shared(std::shared_ptr< const std::vector< uint8_t >> data) override
bool is_connected() const override
void on_state(OnState cb) override
void on_bytes(OnBytes cb) override
#define UNILINK_LOG_INFO(component, operation, message)
#define UNILINK_LOG_ERROR(component, operation, message)
constexpr size_t MAX_BUFFER_SIZE
constexpr size_t DEFAULT_BACKPRESSURE_THRESHOLD
void safe_memcpy(uint8_t *dest, const uint8_t *src, size_t size)
Safely copy memory with bounds checking.
ThreadSafeState< base::LinkState > ThreadSafeLinkState
void report_connection_error(std::string_view component, std::string_view operation, const boost::system::error_code &ec, bool retryable)
Report connection-related error.
std::variant< memory::PooledBuffer, std::vector< uint8_t >, std::shared_ptr< const std::vector< uint8_t > >> BufferVariant
enum unilink::config::SerialConfig::Flow flow
void validate_and_clamp()
enum unilink::config::SerialConfig::Parity parity
size_t backpressure_threshold
void start_read(std::shared_ptr< Serial > self)
config::SerialConfig cfg_
bool backpressure_active_
std::unique_ptr< net::executor_work_guard< net::io_context::executor_type > > work_guard_
std::optional< BufferVariant > current_write_buffer_
std::deque< BufferVariant > tx_
std::atomic< bool > opened_
net::strand< net::io_context::executor_type > strand_
Impl(const config::SerialConfig &cfg)
std::vector< uint8_t > rx_
void handle_error(std::shared_ptr< Serial > self, const char *where, const boost::system::error_code &ec)
void do_write(std::shared_ptr< Serial > self)
Impl(const config::SerialConfig &cfg, std::unique_ptr< interface::SerialPortInterface > port, net::io_context &ioc)
void report_backpressure(size_t qb)
std::atomic< bool > stopping_
std::unique_ptr< interface::SerialPortInterface > port_
net::steady_timer retry_timer_
void schedule_retry(std::shared_ptr< Serial > self, const char *where, const boost::system::error_code &ec)
void open_and_configure(std::shared_ptr< Serial > self)
ThreadSafeLinkState state_