unilink  0.4.3
A simple C++ library for unified async communication
serial.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2025 Jinwoo Sung
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <atomic>
20 #include <boost/asio.hpp>
21 #include <cstddef>
22 #include <deque>
23 #include <memory>
24 #include <optional>
25 #include <string>
26 #include <thread>
27 #include <variant>
28 #include <vector>
29 
30 #include "unilink/base/common.hpp"
39 
40 namespace unilink {
41 namespace transport {
42 
43 namespace net = boost::asio;
44 using base::LinkState;
46 
48  std::variant<memory::PooledBuffer, std::vector<uint8_t>, std::shared_ptr<const std::vector<uint8_t>>>;
49 
50 namespace {
51 net::io_context& acquire_shared_serial_context() {
53  manager.start();
54  return manager.get_context();
55 }
56 } // namespace
57 
58 struct Serial::Impl {
59  bool started_ = false;
60  std::atomic<bool> stopping_{false};
61  net::io_context& ioc_;
62  bool owns_ioc_{false};
63  net::strand<net::io_context::executor_type> strand_;
64  std::unique_ptr<net::executor_work_guard<net::io_context::executor_type>> work_guard_;
65  std::thread ioc_thread_;
66 
67  std::unique_ptr<interface::SerialPortInterface> port_;
69  net::steady_timer retry_timer_;
70 
71  std::vector<uint8_t> rx_;
72  std::deque<BufferVariant> tx_;
73  std::optional<BufferVariant> current_write_buffer_;
74  bool writing_ = false;
75  size_t queued_bytes_ = 0;
76  size_t bp_high_;
77  size_t bp_limit_;
78  size_t bp_low_;
79  bool backpressure_active_ = false;
80 
84 
85  std::atomic<bool> opened_{false};
86  ThreadSafeLinkState state_{LinkState::Idle};
87 
88  explicit Impl(const config::SerialConfig& cfg)
89  : ioc_(acquire_shared_serial_context()),
90  owns_ioc_(false),
91  strand_(ioc_.get_executor()),
92  cfg_(cfg),
93  retry_timer_(ioc_),
94  bp_high_(cfg.backpressure_threshold) {
95  init();
96  port_ = std::make_unique<BoostSerialPort>(ioc_);
97  }
98 
99  Impl(const config::SerialConfig& cfg, std::unique_ptr<interface::SerialPortInterface> port, net::io_context& ioc)
100  : ioc_(ioc),
101  owns_ioc_(false),
102  strand_(ioc.get_executor()),
103  port_(std::move(port)),
104  cfg_(cfg),
105  retry_timer_(ioc),
106  bp_high_(cfg.backpressure_threshold) {
107  init();
108  }
109 
110  void init() {
111  cfg_.validate_and_clamp();
112  bp_high_ = cfg_.backpressure_threshold;
113  bp_limit_ = std::min(std::max(bp_high_ * 4, common::constants::DEFAULT_BACKPRESSURE_THRESHOLD),
115  bp_low_ = bp_high_ > 1 ? bp_high_ / 2 : bp_high_;
116  if (bp_low_ == 0) bp_low_ = 1;
117  rx_.resize(cfg_.read_chunk);
118  }
119 
120  ~Impl() {
121  try {
122  stopping_.store(true);
123  if (owns_ioc_) {
124  ioc_.stop();
125  }
126  if (ioc_thread_.joinable()) {
127  if (std::this_thread::get_id() != ioc_thread_.get_id()) {
128  ioc_thread_.join();
129  } else {
130  ioc_thread_.detach();
131  }
132  }
133  perform_cleanup();
134  } catch (...) {
135  }
136  }
137 
138  void open_and_configure(std::shared_ptr<Serial> self) {
139  boost::system::error_code ec;
140  port_->open(cfg_.device, ec);
141  if (ec) {
142  UNILINK_LOG_ERROR("serial", "open", "Failed to open device: " + cfg_.device + " - " + ec.message());
143  handle_error(self, "open", ec);
144  return;
145  }
146 
147  port_->set_option(net::serial_port_base::baud_rate(cfg_.baud_rate), ec);
148  if (ec) {
149  UNILINK_LOG_ERROR("serial", "configure", "Failed baud rate: " + ec.message());
150  handle_error(self, "baud_rate", ec);
151  return;
152  }
153 
154  port_->set_option(net::serial_port_base::character_size(cfg_.char_size), ec);
155  if (ec) {
156  UNILINK_LOG_ERROR("serial", "configure", "Failed char size: " + ec.message());
157  handle_error(self, "char_size", ec);
158  return;
159  }
160 
161  using sb = net::serial_port_base::stop_bits;
162  port_->set_option(sb(cfg_.stop_bits == 2 ? sb::two : sb::one), ec);
163  if (ec) {
164  UNILINK_LOG_ERROR("serial", "configure", "Failed stop bits: " + ec.message());
165  handle_error(self, "stop_bits", ec);
166  return;
167  }
168 
169  using pa = net::serial_port_base::parity;
170  pa::type p = pa::none;
172  p = pa::even;
173  else if (cfg_.parity == config::SerialConfig::Parity::Odd)
174  p = pa::odd;
175  port_->set_option(pa(p), ec);
176  if (ec) {
177  UNILINK_LOG_ERROR("serial", "configure", "Failed parity: " + ec.message());
178  handle_error(self, "parity", ec);
179  return;
180  }
181 
182  using fc = net::serial_port_base::flow_control;
183  fc::type f = fc::none;
185  f = fc::software;
187  f = fc::hardware;
188  port_->set_option(fc(f), ec);
189  if (ec) {
190  UNILINK_LOG_ERROR("serial", "configure", "Failed flow control: " + ec.message());
191  handle_error(self, "flow_control", ec);
192  return;
193  }
194 
195  UNILINK_LOG_INFO("serial", "connect", "Device opened: " + cfg_.device);
196  start_read(self);
197 
198  opened_.store(true);
199  state_.set_state(LinkState::Connected);
200  notify_state();
201  do_write(self);
202  }
203 
204  void start_read(std::shared_ptr<Serial> self) {
205  port_->async_read_some(
206  net::buffer(rx_.data(), rx_.size()), net::bind_executor(strand_, [self](auto ec, std::size_t n) {
207  auto impl = self->get_impl();
208  if (ec) {
209  impl->handle_error(self, "read", ec);
210  return;
211  }
212  if (impl->on_bytes_) {
213  try {
214  impl->on_bytes_(memory::ConstByteSpan(impl->rx_.data(), n));
215  } catch (const std::exception& e) {
216  UNILINK_LOG_ERROR("serial", "on_bytes", "Exception in callback: " + std::string(e.what()));
217  if (impl->cfg_.stop_on_callback_exception) {
218  impl->opened_.store(false);
219  impl->close_port();
220  impl->state_.set_state(LinkState::Error);
221  impl->notify_state();
222  return;
223  }
224  impl->handle_error(self, "on_bytes_callback", make_error_code(boost::system::errc::io_error));
225  return;
226  } catch (...) {
227  if (impl->cfg_.stop_on_callback_exception) {
228  impl->opened_.store(false);
229  impl->close_port();
230  impl->state_.set_state(LinkState::Error);
231  impl->notify_state();
232  return;
233  }
234  impl->handle_error(self, "on_bytes_callback", make_error_code(boost::system::errc::io_error));
235  return;
236  }
237  }
238  impl->start_read(self);
239  }));
240  }
241 
242  void do_write(std::shared_ptr<Serial> self) {
243  if (stopping_.load() || tx_.empty()) {
244  writing_ = false;
245  return;
246  }
247  writing_ = true;
248 
249  current_write_buffer_ = std::move(tx_.front());
250  tx_.pop_front();
251 
252  auto& current = *current_write_buffer_;
253 
254  auto on_write = [self](const boost::system::error_code& ec, std::size_t n) {
255  auto impl = self->get_impl();
256  impl->current_write_buffer_.reset();
257 
258  if (impl->queued_bytes_ >= n) {
259  impl->queued_bytes_ -= n;
260  } else {
261  impl->queued_bytes_ = 0;
262  }
263  impl->report_backpressure(impl->queued_bytes_);
264 
265  if (impl->stopping_.load()) {
266  impl->writing_ = false;
267  return;
268  }
269 
270  if (ec) {
271  impl->handle_error(self, "write", ec);
272  return;
273  }
274  impl->do_write(self);
275  };
276 
277  std::visit(
278  [&](auto&& buf) {
279  using T = std::decay_t<decltype(buf)>;
280  auto* data_ptr = [&]() {
281  if constexpr (std::is_same_v<T, std::shared_ptr<const std::vector<uint8_t>>>)
282  return buf->data();
283  else
284  return buf.data();
285  }();
286  auto size = [&]() {
287  if constexpr (std::is_same_v<T, std::shared_ptr<const std::vector<uint8_t>>>)
288  return buf->size();
289  else
290  return buf.size();
291  }();
292  port_->async_write(net::buffer(data_ptr, size), net::bind_executor(strand_, on_write));
293  },
294  current);
295  }
296 
298  try {
299  retry_timer_.cancel();
300  close_port();
301  tx_.clear();
302  queued_bytes_ = 0;
303  writing_ = false;
304  report_backpressure(queued_bytes_);
305  opened_.store(false);
306  state_.set_state(LinkState::Closed);
307  notify_state();
308  } catch (...) {
309  }
310  }
311 
312  void handle_error(std::shared_ptr<Serial> self, const char* where, const boost::system::error_code& ec) {
313  if (ec == boost::asio::error::eof) {
314  if (self) start_read(self);
315  return;
316  }
317 
318  if (stopping_.load()) {
319  perform_cleanup();
320  return;
321  }
322 
323  if (ec == boost::asio::error::operation_aborted) {
324  if (state_.is_state(LinkState::Error)) return;
325  perform_cleanup();
326  return;
327  }
328 
329  bool retryable = cfg_.reopen_on_error;
330  diagnostics::error_reporting::report_connection_error("serial", where, ec, retryable);
331 
332  UNILINK_LOG_ERROR("serial", where, "Error: " + ec.message());
333 
334  if (cfg_.reopen_on_error) {
335  opened_.store(false);
336  close_port();
337  state_.set_state(LinkState::Connecting);
338  notify_state();
339  if (self) schedule_retry(self, where, ec);
340  } else {
341  opened_.store(false);
342  close_port();
343  state_.set_state(LinkState::Error);
344  notify_state();
345  }
346  }
347 
348  void schedule_retry(std::shared_ptr<Serial> self, const char* where, const boost::system::error_code& ec) {
349  (void)ec;
350  UNILINK_LOG_INFO("serial", "retry", "Scheduling retry at " + std::string(where));
351  if (stopping_.load()) return;
352  retry_timer_.expires_after(std::chrono::milliseconds(cfg_.retry_interval_ms));
353  retry_timer_.async_wait([self](auto e) {
354  if (!e && self && !self->get_impl()->stopping_.load()) self->get_impl()->open_and_configure(self);
355  });
356  }
357 
358  void close_port() {
359  boost::system::error_code ec;
360  if (port_ && port_->is_open()) {
361  port_->close(ec);
362  }
363  }
364 
365  void notify_state() {
366  if (stopping_.load() || !on_state_) return;
367  try {
368  on_state_(state_.get_state());
369  } catch (...) {
370  }
371  }
372 
373  void report_backpressure(size_t qb) {
374  if (stopping_.load() || !on_bp_) return;
375 
376  if (!backpressure_active_ && qb >= bp_high_) {
377  backpressure_active_ = true;
378  try {
379  on_bp_(qb);
380  } catch (...) {
381  }
382  } else if (backpressure_active_ && qb <= bp_low_) {
383  backpressure_active_ = false;
384  try {
385  on_bp_(qb);
386  } catch (...) {
387  }
388  }
389  }
390 };
391 
392 std::shared_ptr<Serial> Serial::create(const config::SerialConfig& cfg) {
393  return std::shared_ptr<Serial>(new Serial(cfg));
394 }
395 
396 std::shared_ptr<Serial> Serial::create(const config::SerialConfig& cfg, net::io_context& ioc) {
397  return std::shared_ptr<Serial>(new Serial(cfg, std::make_unique<BoostSerialPort>(ioc), ioc));
398 }
399 
400 std::shared_ptr<Serial> Serial::create(const config::SerialConfig& cfg,
401  std::unique_ptr<interface::SerialPortInterface> port, net::io_context& ioc) {
402  return std::shared_ptr<Serial>(new Serial(cfg, std::move(port), ioc));
403 }
404 
405 Serial::Serial(const config::SerialConfig& cfg) : impl_(std::make_unique<Impl>(cfg)) {}
406 
407 Serial::Serial(const config::SerialConfig& cfg, std::unique_ptr<interface::SerialPortInterface> port,
408  net::io_context& ioc)
409  : impl_(std::make_unique<Impl>(cfg, std::move(port), ioc)) {}
410 
412  if (impl_ && impl_->started_ && !impl_->state_.is_state(LinkState::Closed)) {
413  // In destructor, stop without shared_from_this
414  impl_->stopping_.store(true);
415  impl_->perform_cleanup();
416  if (impl_->owns_ioc_ && impl_->ioc_thread_.joinable()) {
417  impl_->ioc_thread_.join();
418  }
419  }
420 }
421 
422 Serial::Serial(Serial&&) noexcept = default;
423 Serial& Serial::operator=(Serial&&) noexcept = default;
424 
425 void Serial::start() {
426  auto impl = get_impl();
427  if (impl->started_) return;
428  impl->stopping_.store(false);
429  UNILINK_LOG_INFO("serial", "start", "Starting device: " + impl->cfg_.device);
430  if (!impl->owns_ioc_) {
431  auto& manager = concurrency::IoContextManager::instance();
432  if (!manager.is_running()) manager.start();
433  if (impl->ioc_.stopped()) impl->ioc_.restart();
434  }
435  impl->work_guard_ =
436  std::make_unique<net::executor_work_guard<net::io_context::executor_type>>(impl->ioc_.get_executor());
437  if (impl->owns_ioc_) {
438  impl->ioc_thread_ = std::thread([impl] { impl->ioc_.run(); });
439  }
440  auto self = shared_from_this();
441  net::post(impl->strand_, [self] {
442  auto impl = self->get_impl();
443  if (!impl->stopping_.load()) {
444  impl->state_.set_state(LinkState::Connecting);
445  impl->notify_state();
446  impl->open_and_configure(self);
447  }
448  });
449  impl->started_ = true;
450 }
451 
452 void Serial::stop() {
453  auto impl = get_impl();
454  if (!impl->started_) {
455  impl->state_.set_state(LinkState::Closed);
456  return;
457  }
458 
459  if (impl->stopping_.exchange(true)) return;
460 
461  auto self = shared_from_this();
462  net::post(impl->strand_, [self] {
463  auto impl = self->get_impl();
464  impl->perform_cleanup();
465  if (impl->owns_ioc_) impl->ioc_.stop();
466  });
467 
468  if (impl->owns_ioc_ && impl->ioc_thread_.joinable()) {
469  impl->ioc_thread_.join();
470  impl->ioc_.restart();
471  }
472  impl->started_ = false;
473 }
474 
475 bool Serial::is_connected() const { return get_impl()->opened_.load(); }
476 
478  auto impl = get_impl();
479  if (impl->stopping_.load() || impl->state_.is_state(LinkState::Closed) || impl->state_.is_state(LinkState::Error))
480  return;
481 
482  size_t n = data.size();
484  UNILINK_LOG_ERROR("serial", "write", "Write size exceeds maximum");
485  return;
486  }
487 
488  if (n <= 65536) {
489  memory::PooledBuffer pooled(n);
490  if (pooled.valid()) {
491  common::safe_memory::safe_memcpy(pooled.data(), data.data(), n);
492  net::post(impl->strand_, [self = shared_from_this(), buf = std::move(pooled)]() mutable {
493  auto impl = self->get_impl();
494  if (impl->queued_bytes_ + buf.size() > impl->bp_limit_) {
495  impl->tx_.clear();
496  impl->queued_bytes_ = 0;
497  impl->writing_ = false;
498  impl->report_backpressure(impl->queued_bytes_);
499  impl->state_.set_state(LinkState::Error);
500  impl->notify_state();
501  impl->handle_error(self, "write_queue_overflow", make_error_code(boost::system::errc::no_buffer_space));
502  return;
503  }
504  impl->queued_bytes_ += buf.size();
505  impl->tx_.emplace_back(std::move(buf));
506  impl->report_backpressure(impl->queued_bytes_);
507  if (!impl->writing_) impl->do_write(self);
508  });
509  return;
510  }
511  }
512 
513  std::vector<uint8_t> fallback(data.begin(), data.end());
514  net::post(impl->strand_, [self = shared_from_this(), buf = std::move(fallback)]() mutable {
515  auto impl = self->get_impl();
516  if (impl->queued_bytes_ + buf.size() > impl->bp_limit_) {
517  impl->tx_.clear();
518  impl->queued_bytes_ = 0;
519  impl->writing_ = false;
520  impl->report_backpressure(impl->queued_bytes_);
521  impl->state_.set_state(LinkState::Error);
522  impl->notify_state();
523  impl->handle_error(self, "write_queue_overflow", make_error_code(boost::system::errc::no_buffer_space));
524  return;
525  }
526  impl->queued_bytes_ += buf.size();
527  impl->tx_.emplace_back(std::move(buf));
528  impl->report_backpressure(impl->queued_bytes_);
529  if (!impl->writing_) impl->do_write(self);
530  });
531 }
532 
533 void Serial::async_write_move(std::vector<uint8_t>&& data) {
534  auto impl = get_impl();
535  if (impl->stopping_.load() || impl->state_.is_state(LinkState::Closed) || impl->state_.is_state(LinkState::Error))
536  return;
537  const auto added = data.size();
538  net::post(impl->strand_, [self = shared_from_this(), buf = std::move(data), added]() mutable {
539  auto impl = self->get_impl();
540  if (impl->queued_bytes_ + added > impl->bp_limit_) {
541  impl->tx_.clear();
542  impl->queued_bytes_ = 0;
543  impl->writing_ = false;
544  impl->report_backpressure(impl->queued_bytes_);
545  impl->state_.set_state(LinkState::Error);
546  impl->notify_state();
547  impl->handle_error(self, "write_queue_overflow", make_error_code(boost::system::errc::no_buffer_space));
548  return;
549  }
550  impl->queued_bytes_ += added;
551  impl->tx_.emplace_back(std::move(buf));
552  impl->report_backpressure(impl->queued_bytes_);
553  if (!impl->writing_) impl->do_write(self);
554  });
555 }
556 
557 void Serial::async_write_shared(std::shared_ptr<const std::vector<uint8_t>> data) {
558  auto impl = get_impl();
559  if (impl->stopping_.load() || impl->state_.is_state(LinkState::Closed) || impl->state_.is_state(LinkState::Error))
560  return;
561  if (!data || data->empty()) return;
562  const auto added = data->size();
563  net::post(impl->strand_, [self = shared_from_this(), buf = std::move(data), added]() mutable {
564  auto impl = self->get_impl();
565  if (impl->queued_bytes_ + added > impl->bp_limit_) {
566  impl->tx_.clear();
567  impl->queued_bytes_ = 0;
568  impl->writing_ = false;
569  impl->report_backpressure(impl->queued_bytes_);
570  impl->state_.set_state(LinkState::Error);
571  impl->notify_state();
572  impl->handle_error(self, "write_queue_overflow", make_error_code(boost::system::errc::no_buffer_space));
573  return;
574  }
575  impl->queued_bytes_ += added;
576  impl->tx_.emplace_back(std::move(buf));
577  impl->report_backpressure(impl->queued_bytes_);
578  if (!impl->writing_) impl->do_write(self);
579  });
580 }
581 
582 void Serial::on_bytes(OnBytes cb) { impl_->on_bytes_ = std::move(cb); }
583 void Serial::on_state(OnState cb) { impl_->on_state_ = std::move(cb); }
584 void Serial::on_backpressure(OnBackpressure cb) { impl_->on_bp_ = std::move(cb); }
585 
586 void Serial::set_retry_interval(unsigned interval_ms) { get_impl()->cfg_.retry_interval_ms = interval_ms; }
587 
588 } // namespace transport
589 } // namespace unilink
#define UNILINK_LOG_INFO(component, operation, message)
Definition: logger.hpp:265
#define UNILINK_LOG_ERROR(component, operation, message)
Definition: logger.hpp:279