unilink  0.4.3
A simple C++ library for unified async communication
io_context_manager.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2025 Jinwoo Sung
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <atomic>
20 #include <condition_variable>
21 #include <mutex>
22 #include <thread>
23 
25 
26 namespace unilink {
27 namespace concurrency {
28 
30  bool owns_context_{true};
31  std::shared_ptr<IoContext> ioc_;
32  std::unique_ptr<WorkGuard> work_guard_;
33  std::thread io_thread_;
34  std::atomic<bool> running_{false};
35  mutable std::mutex mutex_;
36  std::condition_variable cv_;
37  bool stopping_{false};
38 
40 
41  explicit Impl(std::shared_ptr<IoContext> external_context) : owns_context_(false), ioc_(std::move(external_context)) {
43  }
44 
45  explicit Impl(IoContext& external_context)
46  : owns_context_(false), ioc_(std::shared_ptr<IoContext>(&external_context, [](IoContext*) {})) {
48  }
49 
50  ~Impl() {
51  try {
52  stop();
53  if (io_thread_.joinable() && io_thread_.get_id() != std::this_thread::get_id()) {
54  io_thread_.join();
55  }
56  } catch (...) {
57  }
58  }
59 
60  void stop() {
61  std::thread worker;
62  {
63  std::lock_guard<std::mutex> lock(mutex_);
64  if (!owns_context_ && ioc_) return;
65  if (!running_.load() && !io_thread_.joinable()) return;
66 
67  stopping_ = true;
68  if (work_guard_) work_guard_.reset();
69  if (ioc_ && owns_context_) ioc_->stop();
70 
71  if (io_thread_.joinable()) {
72  if (io_thread_.get_id() != std::this_thread::get_id()) {
73  worker = std::move(io_thread_);
74  } else {
75  UNILINK_LOG_ERROR("io_context_manager", "stop", "Cannot join IoContext thread from within itself.");
76  stopping_ = false;
77  cv_.notify_all();
78  return;
79  }
80  }
81  }
82 
83  if (worker.joinable()) {
84  try {
85  worker.join();
86  } catch (...) {
87  }
88  }
89 
90  {
91  std::lock_guard<std::mutex> lock(mutex_);
92  stopping_ = false;
93  if (!worker.joinable() && !io_thread_.joinable()) {
94  running_.store(false);
95  }
96  }
97  cv_.notify_all();
98  }
99 };
100 
101 IoContextManager::IoContextManager() : impl_(std::make_unique<Impl>()) {}
102 
103 IoContextManager::IoContextManager(std::shared_ptr<IoContext> external_context)
104  : impl_(std::make_unique<Impl>(std::move(external_context))) {}
105 
106 IoContextManager::IoContextManager(IoContext& external_context) : impl_(std::make_unique<Impl>(external_context)) {}
107 
109 
112  return *instance;
113 }
114 
115 boost::asio::io_context& IoContextManager::get_context() {
116  std::lock_guard<std::mutex> lock(impl_->mutex_);
117  if (!impl_->ioc_) {
118  impl_->ioc_ = std::make_shared<IoContext>();
119  impl_->owns_context_ = true;
120  }
121  return *impl_->ioc_;
122 }
123 
125  std::shared_ptr<IoContext> context;
126  {
127  std::unique_lock<std::mutex> lock(impl_->mutex_);
128 
129  if (!impl_->owns_context_ && impl_->ioc_) {
130  if (impl_->ioc_->stopped()) {
131  UNILINK_LOG_WARNING("io_context_manager", "start", "External io_context is stopped.");
132  }
133  return;
134  }
135 
136  impl_->cv_.wait(lock, [this] { return !impl_->stopping_; });
137 
138  if (impl_->running_) return;
139 
140  if (impl_->io_thread_.joinable() && impl_->io_thread_.get_id() == std::this_thread::get_id()) {
141  UNILINK_LOG_ERROR("io_context_manager", "start", "Cannot restart from within its own thread.");
142  return;
143  }
144 
145  if (!impl_->ioc_) {
146  impl_->ioc_ = std::make_shared<IoContext>();
147  impl_->owns_context_ = true;
148  }
149 
150  if (impl_->ioc_->stopped()) {
151  impl_->ioc_->restart();
152  }
153  impl_->work_guard_ = std::make_unique<WorkGuard>(impl_->ioc_->get_executor());
154  context = impl_->ioc_;
155 
156  if (impl_->io_thread_.joinable()) {
157  impl_->io_thread_.join();
158  }
159 
160  impl_->io_thread_ = std::thread([this, context]() {
161  try {
162  context->run();
163  } catch (const std::exception& e) {
164  UNILINK_LOG_ERROR("io_context_manager", "run", "Thread error: " + std::string(e.what()));
165  } catch (...) {
166  }
167  impl_->running_.store(false);
168  });
169  impl_->running_.store(true);
170  }
171 }
172 
173 void IoContextManager::stop() { impl_->stop(); }
174 
175 bool IoContextManager::is_running() const { return get_impl()->running_.load(); }
176 
177 std::unique_ptr<boost::asio::io_context> IoContextManager::create_independent_context() {
178  return std::make_unique<IoContext>();
179 }
180 
181 IoContextManager::IoContextManager(IoContextManager&& other) noexcept = default;
183 
184 } // namespace concurrency
185 } // namespace unilink
#define UNILINK_LOG_WARNING(component, operation, message)
Definition: logger.hpp:272
#define UNILINK_LOG_ERROR(component, operation, message)
Definition: logger.hpp:279