unilink  0.4.3
A simple C++ library for unified async communication
tcp_server_session.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2025 Jinwoo Sung
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <cstring>
20 #include <iostream>
21 
24 
25 namespace unilink {
26 namespace transport {
27 
28 using namespace common;
29 
30 TcpServerSession::TcpServerSession(net::io_context& ioc, tcp::socket sock, size_t backpressure_threshold,
31  int idle_timeout_ms)
32  : ioc_(ioc),
33  strand_(ioc.get_executor()),
34  idle_timer_(ioc),
35  socket_(std::make_unique<BoostTcpSocket>(std::move(sock))),
36  writing_(false),
37  queue_bytes_(0),
38  bp_high_(backpressure_threshold),
39  idle_timeout_ms_(idle_timeout_ms),
40  alive_(false),
41  cleanup_done_(false) {
42  bp_limit_ = std::min(std::max(bp_high_ * 4, common::constants::DEFAULT_BACKPRESSURE_THRESHOLD),
44  bp_low_ = bp_high_ > 1 ? bp_high_ / 2 : bp_high_;
45  if (bp_low_ == 0) bp_low_ = 1;
46 }
47 
48 TcpServerSession::TcpServerSession(net::io_context& ioc, std::unique_ptr<interface::TcpSocketInterface> socket,
49  size_t backpressure_threshold, int idle_timeout_ms)
50  : ioc_(ioc),
51  strand_(ioc.get_executor()),
52  idle_timer_(ioc),
53  socket_(std::move(socket)),
54  writing_(false),
55  queue_bytes_(0),
56  bp_high_(backpressure_threshold),
57  idle_timeout_ms_(idle_timeout_ms),
58  alive_(false),
59  cleanup_done_(false) {
60  bp_limit_ = std::min(std::max(bp_high_ * 4, common::constants::DEFAULT_BACKPRESSURE_THRESHOLD),
62  bp_low_ = bp_high_ > 1 ? bp_high_ / 2 : bp_high_;
63  if (bp_low_ == 0) bp_low_ = 1;
64 }
65 
67  if (alive_.exchange(true)) return;
68  auto self = shared_from_this();
69  net::dispatch(strand_, [self] {
70  self->reset_idle_timer();
71  self->start_read();
72  });
73 }
74 
76  if (!alive_ || closing_) return; // Don't queue writes if session is not alive
77 
78  size_t size = data.size();
80  UNILINK_LOG_ERROR("tcp_server_session", "write", "Write size exceeds maximum allowed");
81  return;
82  }
83 
84  // Use memory pool for better performance (only for reasonable sizes)
85  if (size <= common::constants::LARGE_BUFFER_THRESHOLD) { // Only use pool for buffers <= 64KB
86  memory::PooledBuffer pooled_buffer(size);
87  if (pooled_buffer.valid()) {
88  // Copy data to pooled buffer safely
89  common::safe_memory::safe_memcpy(pooled_buffer.data(), data.data(), size);
90 
91  net::post(strand_, [self = shared_from_this(), buf = std::move(pooled_buffer)]() mutable {
92  if (!self->alive_ || self->closing_) return; // Double-check in case session was closed
93  if (self->queue_bytes_ + buf.size() > self->bp_limit_) {
94  UNILINK_LOG_ERROR("tcp_server_session", "write", "Queue limit exceeded, closing session");
95  self->do_close();
96  return;
97  }
98 
99  self->queue_bytes_ += buf.size();
100  self->tx_.emplace_back(std::move(buf));
101  self->report_backpressure(self->queue_bytes_);
102  if (!self->writing_) self->do_write();
103  });
104  return;
105  }
106  }
107 
108  // Fallback to regular allocation for large buffers or pool exhaustion
109  std::vector<uint8_t> fallback(data.begin(), data.end());
110 
111  net::post(strand_, [self = shared_from_this(), buf = std::move(fallback)]() mutable {
112  if (!self->alive_ || self->closing_) return; // Double-check in case session was closed
113  if (self->queue_bytes_ + buf.size() > self->bp_limit_) {
114  UNILINK_LOG_ERROR("tcp_server_session", "write", "Queue limit exceeded, closing session");
115  self->do_close();
116  return;
117  }
118 
119  self->queue_bytes_ += buf.size();
120  self->tx_.emplace_back(std::move(buf));
121  self->report_backpressure(self->queue_bytes_);
122  if (!self->writing_) self->do_write();
123  });
124 }
125 
126 void TcpServerSession::async_write_move(std::vector<uint8_t>&& data) {
127  if (!alive_ || closing_) return;
128  const auto added = data.size();
130  UNILINK_LOG_ERROR("tcp_server_session", "write", "Write size exceeds maximum allowed");
131  return;
132  }
133  net::post(strand_, [self = shared_from_this(), buf = std::move(data), added]() mutable {
134  if (!self->alive_ || self->closing_) return;
135  if (self->queue_bytes_ + added > self->bp_limit_) {
136  UNILINK_LOG_ERROR("tcp_server_session", "write", "Queue limit exceeded, closing session");
137  self->do_close();
138  return;
139  }
140 
141  self->queue_bytes_ += added;
142  self->tx_.emplace_back(std::move(buf));
143  self->report_backpressure(self->queue_bytes_);
144  if (!self->writing_) self->do_write();
145  });
146 }
147 
148 void TcpServerSession::async_write_shared(std::shared_ptr<const std::vector<uint8_t>> data) {
149  if (!alive_ || closing_ || !data) return;
150  const auto added = data->size();
152  UNILINK_LOG_ERROR("tcp_server_session", "write", "Write size exceeds maximum allowed");
153  return;
154  }
155  net::post(strand_, [self = shared_from_this(), buf = std::move(data), added]() mutable {
156  if (!self->alive_ || self->closing_) return;
157  if (self->queue_bytes_ + added > self->bp_limit_) {
158  UNILINK_LOG_ERROR("tcp_server_session", "write", "Queue limit exceeded, closing session");
159  self->do_close();
160  return;
161  }
162 
163  self->queue_bytes_ += added;
164  self->tx_.emplace_back(std::move(buf));
165  self->report_backpressure(self->queue_bytes_);
166  if (!self->writing_) self->do_write();
167  });
168 }
169 
171  auto self = shared_from_this();
172  net::dispatch(strand_, [self, cb = std::move(cb)]() mutable {
173  if (self->closing_.load() || self->cleanup_done_.load()) return;
174  self->on_bytes_ = std::move(cb);
175  });
176 }
178  auto self = shared_from_this();
179  net::dispatch(strand_, [self, cb = std::move(cb)]() mutable {
180  if (self->closing_.load() || self->cleanup_done_.load()) return;
181  self->on_bp_ = std::move(cb);
182  });
183 }
185  auto self = shared_from_this();
186  net::dispatch(strand_, [self, cb = std::move(cb)]() mutable {
187  if (self->closing_.load() || self->cleanup_done_.load()) return;
188  self->on_close_ = std::move(cb);
189  });
190 }
191 
192 bool TcpServerSession::alive() const { return alive_.load(); }
193 
195  if (closing_.exchange(true)) return;
196  auto self = shared_from_this();
197  net::post(strand_, [self] {
198  // Clear callbacks on the strand to block further user callbacks after stop.
199  self->on_bytes_ = nullptr;
200  self->on_bp_ = nullptr;
201  self->on_close_ = nullptr;
202  self->idle_timer_.cancel();
203  self->do_close();
204  });
205 }
206 
208  auto self = shared_from_this();
209  net::dispatch(strand_, [self] {
210  self->idle_timer_.cancel();
211  boost::system::error_code ec;
212  // Cancelling the socket via close() causes ongoing operations to complete with operation_aborted.
213  // Unlike stop(), this does NOT set closing_ flag immediately, allowing the
214  // error handler to run normally and trigger do_close() via the error path.
215  if (self->socket_) {
216  self->socket_->close(ec);
217  }
218  });
219 }
220 
221 void TcpServerSession::start_read() {
222  auto self = shared_from_this();
223  socket_->async_read_some(
224  net::buffer(rx_.data(), rx_.size()), net::bind_executor(strand_, [self](auto ec, std::size_t n) {
225  if (self->closing_ || !self->alive_) return;
226  if (ec) {
227  self->do_close();
228  return;
229  }
230  self->reset_idle_timer();
231  if (self->on_bytes_) {
232  try {
233  self->on_bytes_(memory::ConstByteSpan(self->rx_.data(), n));
234  } catch (const std::exception& e) {
235  UNILINK_LOG_ERROR("tcp_server_session", "on_bytes",
236  "Exception in on_bytes callback: " + std::string(e.what()));
237  self->do_close();
238  return;
239  } catch (...) {
240  UNILINK_LOG_ERROR("tcp_server_session", "on_bytes", "Unknown exception in on_bytes callback");
241  self->do_close();
242  return;
243  }
244  }
245  self->start_read();
246  }));
247 }
248 
249 void TcpServerSession::do_write() {
250  if (tx_.empty()) {
251  writing_ = false;
252  return;
253  }
254  writing_ = true;
255  auto self = shared_from_this();
256 
257  // Move buffer out of queue immediately to ensure lifetime safety during async op
258  // Optimization: Move into current_write_buffer_ to keep it alive during async op
259  // without allocating a shared_ptr control block.
260  current_write_buffer_ = std::move(tx_.front());
261  tx_.pop_front();
262 
263  auto& current = *current_write_buffer_;
264 
265  auto on_write = [self](const boost::system::error_code& ec, std::size_t n) {
266  // Release the buffer immediately
267  self->current_write_buffer_.reset();
268 
269  if (self->closing_ || !self->alive_) return;
270  if (self->queue_bytes_ >= n) {
271  self->queue_bytes_ -= n;
272  } else {
273  self->queue_bytes_ = 0;
274  }
275  self->report_backpressure(self->queue_bytes_);
276 
277  if (ec) {
278  self->do_close();
279  return;
280  }
281  self->reset_idle_timer();
282  self->do_write();
283  };
284 
285  std::visit(
286  [&](const auto& buf) {
287  using T = std::decay_t<decltype(buf)>;
288  if constexpr (std::is_same_v<T, std::shared_ptr<const std::vector<uint8_t>>>) {
289  socket_->async_write(net::buffer(buf->data(), buf->size()), net::bind_executor(strand_, on_write));
290  } else {
291  socket_->async_write(net::buffer(buf.data(), buf.size()), net::bind_executor(strand_, on_write));
292  }
293  },
294  current);
295 }
296 
297 void TcpServerSession::do_close() {
298  if (cleanup_done_.exchange(true)) return; // Ensures cleanup runs only once
299 
300  alive_.store(false);
301  closing_.store(true); // Redundant, but ensures consistency
302 
303  // Safely invoke on_close callback
304  auto close_cb = std::move(on_close_);
305 
306  // Clear all callbacks to prevent any further invocations
307  on_bytes_ = nullptr;
308  on_bp_ = nullptr;
309  on_close_ = nullptr;
310  idle_timer_.cancel();
311 
312  UNILINK_LOG_INFO("tcp_server_session", "disconnect", "Client disconnected");
313  boost::system::error_code ec;
314  socket_->shutdown(tcp::socket::shutdown_both, ec);
315  socket_->close(ec);
316 
317  // Release memory immediately
318  tx_.clear();
319  queue_bytes_ = 0;
320 
321  if (close_cb) {
322  try {
323  close_cb();
324  } catch (const std::exception& e) {
325  UNILINK_LOG_ERROR("tcp_server_session", "on_close", "Exception in on_close callback: " + std::string(e.what()));
326  } catch (...) {
327  UNILINK_LOG_ERROR("tcp_server_session", "on_close", "Unknown exception in on_close callback");
328  }
329  }
330 }
331 
332 void TcpServerSession::report_backpressure(size_t queued_bytes) {
333  if (closing_ || !alive_ || !on_bp_) return;
334  if (!backpressure_active_ && queued_bytes >= bp_high_) {
335  backpressure_active_ = true;
336  try {
337  on_bp_(queued_bytes);
338  } catch (const std::exception& e) {
339  UNILINK_LOG_ERROR("tcp_server_session", "on_backpressure",
340  "Exception in backpressure callback: " + std::string(e.what()));
341  } catch (...) {
342  UNILINK_LOG_ERROR("tcp_server_session", "on_backpressure", "Unknown exception in backpressure callback");
343  }
344  } else if (backpressure_active_ && queued_bytes <= bp_low_) {
345  backpressure_active_ = false;
346  try {
347  on_bp_(queued_bytes);
348  } catch (const std::exception& e) {
349  UNILINK_LOG_ERROR("tcp_server_session", "on_backpressure",
350  "Exception in backpressure callback: " + std::string(e.what()));
351  } catch (...) {
352  UNILINK_LOG_ERROR("tcp_server_session", "on_backpressure", "Unknown exception in backpressure callback");
353  }
354  }
355 }
356 
357 void TcpServerSession::reset_idle_timer() {
358  if (idle_timeout_ms_ <= 0) return;
359 
360  // Cancel any existing timer
361  idle_timer_.cancel();
362 
363  // Reset timer
364  idle_timer_.expires_after(std::chrono::milliseconds(idle_timeout_ms_));
365 
366  auto self = shared_from_this();
367  idle_timer_.async_wait(net::bind_executor(strand_, [self](const boost::system::error_code& ec) {
368  if (ec == boost::asio::error::operation_aborted) return;
369  if (!self->alive_ || self->closing_) return;
370 
371  UNILINK_LOG_WARNING("tcp_server_session", "timeout", "Connection idle timeout expired, closing session");
372  self->do_close();
373  }));
374 }
375 
376 } // namespace transport
377 } // namespace unilink
#define UNILINK_LOG_WARNING(component, operation, message)
Definition: logger.hpp:272
#define UNILINK_LOG_INFO(component, operation, message)
Definition: logger.hpp:265
#define UNILINK_LOG_ERROR(component, operation, message)
Definition: logger.hpp:279