22 #include <condition_variable>
25 #include <shared_mutex>
31 namespace concurrency {
40 template <
typename StateType>
69 void wait_for_state(
const State& expected_state, std::chrono::milliseconds timeout = std::chrono::milliseconds(1000));
77 mutable std::shared_mutex state_mutex_;
79 std::atomic<bool> state_changed_{
false};
85 std::vector<CallbackInfo> callbacks_;
87 mutable std::mutex callbacks_mutex_;
89 std::condition_variable_any state_cv_;
91 void notify_callbacks(
const State& new_state);
100 template <
typename StateType>
108 void set(const
State& new_state) noexcept;
109 void set(
State&& new_state) noexcept;
117 std::atomic<
State> state_;
127 int64_t
get()
const noexcept;
128 int64_t increment() noexcept;
129 int64_t decrement() noexcept;
130 int64_t add(int64_t value) noexcept;
131 int64_t subtract(int64_t value) noexcept;
134 int64_t
exchange(int64_t new_value) noexcept;
136 void reset() noexcept;
139 std::atomic<int64_t> value_;
149 bool get()
const noexcept;
150 void set(
bool value =
true) noexcept;
151 void clear() noexcept;
153 bool test_and_set() noexcept;
156 void wait_for_true(std::chrono::milliseconds timeout = std::chrono::milliseconds(1000))
const;
157 void wait_for_false(std::chrono::milliseconds timeout = std::chrono::milliseconds(1000))
const;
160 std::atomic<bool> flag_;
161 mutable std::condition_variable cv_;
162 mutable std::mutex cv_mutex_;
170 template <
typename StateType>
173 template <
typename StateType>
175 std::shared_lock<std::shared_mutex> lock(state_mutex_);
179 template <
typename StateType>
182 std::unique_lock<std::shared_mutex> lock(state_mutex_);
184 state_changed_.store(
true);
186 notify_callbacks(new_state);
187 state_cv_.notify_all();
190 template <
typename StateType>
193 std::unique_lock<std::shared_mutex> lock(state_mutex_);
194 state_ = std::move(new_state);
195 state_changed_.store(
true);
197 notify_callbacks(state_);
198 state_cv_.notify_all();
201 template <
typename StateType>
203 std::unique_lock<std::shared_mutex> lock(state_mutex_);
204 if (state_ == expected) {
206 state_changed_.store(
true);
208 notify_callbacks(desired);
209 state_cv_.notify_all();
215 template <
typename StateType>
219 std::unique_lock<std::shared_mutex> lock(state_mutex_);
222 state_changed_.store(
true);
224 notify_callbacks(new_state);
225 state_cv_.notify_all();
229 template <
typename StateType>
232 std::lock_guard<std::mutex> lock(callbacks_mutex_);
234 callbacks_.push_back({handle, std::move(callback)});
238 template <
typename StateType>
240 std::lock_guard<std::mutex> lock(callbacks_mutex_);
241 callbacks_.erase(std::remove_if(callbacks_.begin(), callbacks_.end(),
242 [handle](
const CallbackInfo& info) { return info.handle == handle; }),
246 template <
typename StateType>
248 std::lock_guard<std::mutex> lock(callbacks_mutex_);
252 template <
typename StateType>
254 std::unique_lock<std::shared_mutex> lock(state_mutex_);
255 state_cv_.wait_for(lock, timeout, [
this, &expected_state] {
return state_ == expected_state; });
258 template <
typename StateType>
260 std::unique_lock<std::shared_mutex> lock(state_mutex_);
261 state_cv_.wait_for(lock, timeout, [
this] {
return state_changed_.load(); });
262 state_changed_.store(
false);
265 template <
typename StateType>
267 std::shared_lock<std::shared_mutex> lock(state_mutex_);
268 return state_ == expected_state;
271 template <
typename StateType>
273 state_cv_.notify_all();
276 template <
typename StateType>
278 std::lock_guard<std::mutex> lock(callbacks_mutex_);
279 for (
const auto& info : callbacks_) {
281 info.callback(new_state);
289 template <
typename StateType>
292 template <
typename StateType>
294 return state_.load();
297 template <
typename StateType>
299 state_.store(new_state);
302 template <
typename StateType>
304 state_.store(new_state);
307 template <
typename StateType>
309 State expected_copy = expected;
310 return state_.compare_exchange_strong(expected_copy, desired);
313 template <
typename StateType>
315 return state_.exchange(new_state);
318 template <
typename StateType>
320 return state_.load() == expected_state;
337 return value_.compare_exchange_strong(expected, desired);
361 return flag_.compare_exchange_strong(expected, desired);
365 std::unique_lock<std::mutex> lock(cv_mutex_);
366 cv_.wait_for(lock, timeout, [
this] {
return flag_.load(); });
370 std::unique_lock<std::mutex> lock(cv_mutex_);
371 cv_.wait_for(lock, timeout, [
this] {
return !flag_.load(); });
Thread-safe atomic state wrapper.
State get() const noexcept
AtomicState(const State &initial_state=State{})
bool compare_and_set(const State &expected, const State &desired) noexcept
State exchange(const State &new_state) noexcept
bool is_state(const State &expected_state) const noexcept
void set(const State &new_state) noexcept
Thread-safe counter with atomic operations.
ThreadSafeCounter(int64_t initial_value=0)
int64_t subtract(int64_t value) noexcept
bool compare_and_set(int64_t expected, int64_t desired) noexcept
int64_t increment() noexcept
int64_t add(int64_t value) noexcept
int64_t exchange(int64_t new_value) noexcept
int64_t get() const noexcept
int64_t decrement() noexcept
Thread-safe flag with atomic operations.
bool test_and_set() noexcept
void wait_for_true(std::chrono::milliseconds timeout=std::chrono::milliseconds(1000)) const
void set(bool value=true) noexcept
void wait_for_false(std::chrono::milliseconds timeout=std::chrono::milliseconds(1000)) const
bool get() const noexcept
ThreadSafeFlag(bool initial_value=false)
bool compare_and_set(bool expected, bool desired) noexcept
Thread-safe state management class.
bool is_state(const State &expected_state) const
ThreadSafeState(const State &initial_state=State{})
void set_state(const State &new_state)
void wait_for_state_change(std::chrono::milliseconds timeout=std::chrono::milliseconds(1000))
void set_state(State &&new_state)
void wait_for_state(const State &expected_state, std::chrono::milliseconds timeout=std::chrono::milliseconds(1000))
void remove_state_change_callback(StateCallbackHandle handle)
size_t StateCallbackHandle
bool compare_and_set(const State &expected, const State &desired)
ThreadSafeState(ThreadSafeState &&)=delete
std::function< void(const State &)> StateCallback
void clear_state_change_callbacks()
State exchange(const State &new_state)
ThreadSafeState & operator=(ThreadSafeState &&)=delete
void notify_state_change()
ThreadSafeState & operator=(const ThreadSafeState &)=delete
StateCallbackHandle add_state_change_callback(StateCallback callback)
ThreadSafeState(const ThreadSafeState &)=delete