28 using namespace common;
33 strand_(ioc.get_executor()),
38 bp_high_(backpressure_threshold),
39 idle_timeout_ms_(idle_timeout_ms),
41 cleanup_done_(false) {
44 bp_low_ = bp_high_ > 1 ? bp_high_ / 2 : bp_high_;
45 if (bp_low_ == 0) bp_low_ = 1;
49 size_t backpressure_threshold,
int idle_timeout_ms)
51 strand_(ioc.get_executor()),
53 socket_(std::move(socket)),
56 bp_high_(backpressure_threshold),
57 idle_timeout_ms_(idle_timeout_ms),
59 cleanup_done_(false) {
62 bp_low_ = bp_high_ > 1 ? bp_high_ / 2 : bp_high_;
63 if (bp_low_ == 0) bp_low_ = 1;
67 if (alive_.exchange(
true))
return;
68 auto self = shared_from_this();
69 net::dispatch(strand_, [
self] {
70 self->reset_idle_timer();
76 if (!alive_ || closing_)
return;
78 size_t size = data.
size();
80 UNILINK_LOG_ERROR(
"tcp_server_session",
"write",
"Write size exceeds maximum allowed");
87 if (pooled_buffer.
valid()) {
91 net::post(strand_, [
self = shared_from_this(), buf = std::move(pooled_buffer)]()
mutable {
92 if (!self->alive_ || self->closing_)
return;
93 if (self->queue_bytes_ + buf.size() > self->bp_limit_) {
94 UNILINK_LOG_ERROR(
"tcp_server_session",
"write",
"Queue limit exceeded, closing session");
99 self->queue_bytes_ += buf.size();
100 self->tx_.emplace_back(std::move(buf));
101 self->report_backpressure(self->queue_bytes_);
102 if (!self->writing_) self->do_write();
109 std::vector<uint8_t> fallback(data.
begin(), data.
end());
111 net::post(strand_, [
self = shared_from_this(), buf = std::move(fallback)]()
mutable {
112 if (!self->alive_ || self->closing_)
return;
113 if (self->queue_bytes_ + buf.size() > self->bp_limit_) {
114 UNILINK_LOG_ERROR(
"tcp_server_session",
"write",
"Queue limit exceeded, closing session");
119 self->queue_bytes_ += buf.size();
120 self->tx_.emplace_back(std::move(buf));
121 self->report_backpressure(self->queue_bytes_);
122 if (!self->writing_) self->do_write();
127 if (!alive_ || closing_)
return;
128 const auto added = data.size();
130 UNILINK_LOG_ERROR(
"tcp_server_session",
"write",
"Write size exceeds maximum allowed");
133 net::post(strand_, [
self = shared_from_this(), buf = std::move(data), added]()
mutable {
134 if (!self->alive_ || self->closing_)
return;
135 if (self->queue_bytes_ + added > self->bp_limit_) {
136 UNILINK_LOG_ERROR(
"tcp_server_session",
"write",
"Queue limit exceeded, closing session");
141 self->queue_bytes_ += added;
142 self->tx_.emplace_back(std::move(buf));
143 self->report_backpressure(self->queue_bytes_);
144 if (!self->writing_) self->do_write();
149 if (!alive_ || closing_ || !data)
return;
150 const auto added = data->size();
152 UNILINK_LOG_ERROR(
"tcp_server_session",
"write",
"Write size exceeds maximum allowed");
155 net::post(strand_, [
self = shared_from_this(), buf = std::move(data), added]()
mutable {
156 if (!self->alive_ || self->closing_)
return;
157 if (self->queue_bytes_ + added > self->bp_limit_) {
158 UNILINK_LOG_ERROR(
"tcp_server_session",
"write",
"Queue limit exceeded, closing session");
163 self->queue_bytes_ += added;
164 self->tx_.emplace_back(std::move(buf));
165 self->report_backpressure(self->queue_bytes_);
166 if (!self->writing_) self->do_write();
171 auto self = shared_from_this();
172 net::dispatch(strand_, [
self, cb = std::move(cb)]()
mutable {
173 if (self->closing_.load() || self->cleanup_done_.load())
return;
174 self->on_bytes_ = std::move(cb);
178 auto self = shared_from_this();
179 net::dispatch(strand_, [
self, cb = std::move(cb)]()
mutable {
180 if (self->closing_.load() || self->cleanup_done_.load())
return;
181 self->on_bp_ = std::move(cb);
185 auto self = shared_from_this();
186 net::dispatch(strand_, [
self, cb = std::move(cb)]()
mutable {
187 if (self->closing_.load() || self->cleanup_done_.load())
return;
188 self->on_close_ = std::move(cb);
195 if (closing_.exchange(
true))
return;
196 auto self = shared_from_this();
197 net::post(strand_, [
self] {
199 self->on_bytes_ =
nullptr;
200 self->on_bp_ =
nullptr;
201 self->on_close_ =
nullptr;
202 self->idle_timer_.cancel();
208 auto self = shared_from_this();
209 net::dispatch(strand_, [
self] {
210 self->idle_timer_.cancel();
211 boost::system::error_code ec;
216 self->socket_->close(ec);
221 void TcpServerSession::start_read() {
222 auto self = shared_from_this();
223 socket_->async_read_some(
224 net::buffer(rx_.data(), rx_.size()), net::bind_executor(strand_, [
self](
auto ec, std::size_t n) {
225 if (self->closing_ || !self->alive_) return;
230 self->reset_idle_timer();
231 if (self->on_bytes_) {
233 self->on_bytes_(memory::ConstByteSpan(self->rx_.data(), n));
234 } catch (
const std::exception& e) {
236 "Exception in on_bytes callback: " + std::string(e.what()));
240 UNILINK_LOG_ERROR(
"tcp_server_session",
"on_bytes",
"Unknown exception in on_bytes callback");
249 void TcpServerSession::do_write() {
255 auto self = shared_from_this();
260 current_write_buffer_ = std::move(tx_.front());
263 auto& current = *current_write_buffer_;
265 auto on_write = [
self](
const boost::system::error_code& ec, std::size_t n) {
267 self->current_write_buffer_.reset();
269 if (self->closing_ || !self->alive_)
return;
270 if (self->queue_bytes_ >= n) {
271 self->queue_bytes_ -= n;
273 self->queue_bytes_ = 0;
275 self->report_backpressure(self->queue_bytes_);
281 self->reset_idle_timer();
286 [&](
const auto& buf) {
287 using T = std::decay_t<decltype(buf)>;
288 if constexpr (std::is_same_v<T, std::shared_ptr<
const std::vector<uint8_t>>>) {
289 socket_->async_write(net::buffer(buf->data(), buf->size()), net::bind_executor(strand_, on_write));
291 socket_->async_write(net::buffer(buf.data(), buf.size()), net::bind_executor(strand_, on_write));
297 void TcpServerSession::do_close() {
298 if (cleanup_done_.exchange(
true))
return;
301 closing_.store(
true);
304 auto close_cb = std::move(on_close_);
310 idle_timer_.cancel();
312 UNILINK_LOG_INFO(
"tcp_server_session",
"disconnect",
"Client disconnected");
313 boost::system::error_code ec;
314 socket_->shutdown(tcp::socket::shutdown_both, ec);
324 }
catch (
const std::exception& e) {
325 UNILINK_LOG_ERROR(
"tcp_server_session",
"on_close",
"Exception in on_close callback: " + std::string(e.what()));
327 UNILINK_LOG_ERROR(
"tcp_server_session",
"on_close",
"Unknown exception in on_close callback");
332 void TcpServerSession::report_backpressure(
size_t queued_bytes) {
333 if (closing_ || !alive_ || !on_bp_)
return;
334 if (!backpressure_active_ && queued_bytes >= bp_high_) {
335 backpressure_active_ =
true;
337 on_bp_(queued_bytes);
338 }
catch (
const std::exception& e) {
340 "Exception in backpressure callback: " + std::string(e.what()));
342 UNILINK_LOG_ERROR(
"tcp_server_session",
"on_backpressure",
"Unknown exception in backpressure callback");
344 }
else if (backpressure_active_ && queued_bytes <= bp_low_) {
345 backpressure_active_ =
false;
347 on_bp_(queued_bytes);
348 }
catch (
const std::exception& e) {
350 "Exception in backpressure callback: " + std::string(e.what()));
352 UNILINK_LOG_ERROR(
"tcp_server_session",
"on_backpressure",
"Unknown exception in backpressure callback");
357 void TcpServerSession::reset_idle_timer() {
358 if (idle_timeout_ms_ <= 0)
return;
361 idle_timer_.cancel();
364 idle_timer_.expires_after(std::chrono::milliseconds(idle_timeout_ms_));
366 auto self = shared_from_this();
367 idle_timer_.async_wait(net::bind_executor(strand_, [
self](
const boost::system::error_code& ec) {
368 if (ec == boost::asio::error::operation_aborted)
return;
369 if (!self->alive_ || self->closing_)
return;
371 UNILINK_LOG_WARNING(
"tcp_server_session",
"timeout",
"Connection idle timeout expired, closing session");
RAII wrapper for memory pool buffers with enhanced safety.
A C++17 compatible span-like class for safe array access.
constexpr size_type size() const noexcept
constexpr pointer data() const noexcept
constexpr iterator begin() const noexcept
constexpr iterator end() const noexcept
Boost.Asio implementation of ITcpSocket interface. This is the real implementation used in production...
interface::Channel::OnBackpressure OnBackpressure
std::function< void()> OnClose
interface::Channel::OnBytes OnBytes
TcpServerSession(net::io_context &ioc, tcp::socket sock, size_t backpressure_threshold=common::constants::DEFAULT_BACKPRESSURE_THRESHOLD, int idle_timeout_ms=0)
void on_bytes(OnBytes cb)
void on_close(OnClose cb)
void async_write_move(std::vector< uint8_t > &&data)
void async_write_shared(std::shared_ptr< const std::vector< uint8_t >> data)
void on_backpressure(OnBackpressure cb)
void async_write_copy(memory::ConstByteSpan data)
#define UNILINK_LOG_WARNING(component, operation, message)
#define UNILINK_LOG_INFO(component, operation, message)
#define UNILINK_LOG_ERROR(component, operation, message)
constexpr size_t MAX_BUFFER_SIZE
constexpr size_t LARGE_BUFFER_THRESHOLD
constexpr size_t DEFAULT_BACKPRESSURE_THRESHOLD
void safe_memcpy(uint8_t *dest, const uint8_t *src, size_t size)
Safely copy memory with bounds checking.