unilink  0.4.3
A simple C++ library for unified async communication
line_framer.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2025 Jinwoo Sung
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <cstring>
21 #include <iterator>
22 #include <string_view>
23 
24 namespace unilink {
25 namespace framer {
26 
27 LineFramer::LineFramer(std::string_view delimiter, bool include_delimiter, size_t max_length)
28  : delimiter_(delimiter), include_delimiter_(include_delimiter), max_length_(max_length) {
29  if (delimiter_.empty()) {
30  delimiter_ = "\n";
31  }
32 }
33 
35  if (data.empty()) return;
36 
37  // Process data in chunks to prevent large memory allocations (DoS protection)
38  // We use max(max_length_, 4096) as a reasonable chunk size.
39  // This ensures that even if the user sends a huge payload, we only
40  // allocate memory incrementally and have a chance to clear the buffer
41  // if limits are exceeded.
42  const size_t chunk_limit = std::max(max_length_, size_t(4096));
43 
44  size_t offset = 0;
45  while (offset < data.size()) {
46  size_t len = std::min(data.size() - offset, chunk_limit);
47  push_bytes_internal(data.subspan(offset, len));
48  offset += len;
49  }
50 }
51 
52 void LineFramer::push_bytes_internal(memory::ConstByteSpan data) {
53  if (data.empty()) return;
54 
55  // Fast Path: If buffer is empty, process data directly (zero-copy)
56  if (buffer_.empty()) {
57  size_t processed_count = scan_and_process(data, 0);
58 
59  // If we haven't processed everything, append the remainder to the buffer
60  if (processed_count < data.size()) {
61  buffer_.insert(buffer_.end(), data.begin() + processed_count, data.end());
62  scanned_idx_ = buffer_.size(); // We scanned all of it
63 
64  // DoS protection for partial message overflow
65  if (buffer_.size() > max_length_) {
66  buffer_.clear();
67  scanned_idx_ = 0;
68  }
69  } else {
70  // All processed, buffer remains empty
71  scanned_idx_ = 0;
72  }
73  return;
74  }
75 
76  // Slow Path: Append new data to buffer and process
77  buffer_.insert(buffer_.end(), data.begin(), data.end());
78 
79  // Determine where to start searching to avoid re-scanning
80  // We back up by delimiter length - 1 to catch split delimiters
81  size_t search_start_idx = scanned_idx_;
82  if (search_start_idx >= delimiter_.length()) {
83  search_start_idx -= (delimiter_.length() - 1);
84  } else {
85  search_start_idx = 0;
86  }
87 
88  size_t processed_count = scan_and_process(memory::ConstByteSpan(buffer_), search_start_idx);
89 
90  // Batch erase all processed data to ensure O(N) erase complexity
91  if (processed_count > 0) {
92  buffer_.erase(buffer_.begin(), buffer_.begin() + static_cast<std::ptrdiff_t>(processed_count));
93  }
94 
95  // Update scanned_idx_ for the next call.
96  // The buffer size is now reduced. We have scanned everything that remains.
97  scanned_idx_ = buffer_.size();
98 
99  // Final check: if the *remaining* partial message in the buffer already exceeds max_length_,
100  // we must reset to prevent unbound growth (DoS protection).
101  if (buffer_.size() > max_length_) {
102  buffer_.clear();
103  scanned_idx_ = 0;
104  }
105 }
106 
107 size_t LineFramer::scan_and_process(memory::ConstByteSpan data, size_t search_start_offset) {
108  // Safety clamp
109  if (search_start_offset > data.size()) {
110  search_start_offset = data.size();
111  }
112 
113  // processed_count tracks the number of bytes from the start of the buffer
114  // that have been either emitted as messages or skipped due to overflow.
115  size_t processed_count = 0;
116 
117  // Search cursor
118  size_t search_cursor = search_start_offset;
119 
120  // O(N) scan loop
121  while (true) {
122  if (search_cursor > data.size()) break;
123 
124  // Perform search using iterators derived from current buffer state
125  auto search_begin = data.begin() + static_cast<std::ptrdiff_t>(search_cursor);
126  decltype(data.begin()) it;
127 
128  if (delimiter_.size() == 1) {
129  // Optimization: Use std::memchr for single-byte delimiter
130  const void* found = std::memchr(search_begin, static_cast<uint8_t>(delimiter_[0]),
131  static_cast<size_t>(std::distance(search_begin, data.end())));
132  if (found) {
133  it = static_cast<const uint8_t*>(found);
134  } else {
135  it = data.end();
136  }
137  } else {
138  it = std::search(search_begin, data.end(), delimiter_.begin(), delimiter_.end());
139  }
140 
141  if (it == data.end()) {
142  break;
143  }
144 
145  // Found a delimiter
146  size_t match_start_idx = static_cast<size_t>(std::distance(data.begin(), it));
147  size_t match_end_idx = match_start_idx + delimiter_.length();
148 
149  // Calculate message length (from end of previous processed data to end of current delimiter)
150  size_t current_msg_total_len = match_end_idx - processed_count;
151 
152  // Check strict max_length constraint
153  if (current_msg_total_len > max_length_) {
154  // Message exceeds limit. Skip it.
155  } else {
156  // Valid message
157  if (on_message_) {
158  size_t payload_len = include_delimiter_ ? current_msg_total_len : (current_msg_total_len - delimiter_.length());
159  on_message_(memory::ConstByteSpan(data.data() + processed_count, payload_len));
160  }
161  }
162 
163  // Mark these bytes as processed
164  processed_count = match_end_idx;
165 
166  // Advance search cursor to start strictly after the current delimiter
167  search_cursor = processed_count;
168  }
169 
170  return processed_count;
171 }
172 
173 void LineFramer::set_on_message(MessageCallback cb) { on_message_ = std::move(cb); }
174 
176  buffer_.clear();
177  scanned_idx_ = 0;
178 }
179 
180 } // namespace framer
181 } // namespace unilink