# Concurrency and Parallel Programming ## Atomics and Memory Ordering ```cpp #include #include // Basic atomics std::atomic counter{0}; std::atomic flag{false}; // Memory ordering void producer(std::atomic& data, std::atomic& ready) { data.store(42, std::memory_order_relaxed); ready.store(true, std::memory_order_release); // Release barrier } void consumer(std::atomic& data, std::atomic& ready) { while (!ready.load(std::memory_order_acquire)) { // Acquire barrier std::this_thread::yield(); } int value = data.load(std::memory_order_relaxed); } // Compare-and-swap bool try_acquire_lock(std::atomic& lock) { bool expected = false; return lock.compare_exchange_strong(expected, true, std::memory_order_acquire, std::memory_order_relaxed); } // Fetch-and-add int increment_counter(std::atomic& counter) { return counter.fetch_add(1, std::memory_order_relaxed); } ``` ## Lock-Free Data Structures ```cpp #include #include // Lock-free stack template class LockFreeStack { struct Node { T data; Node* next; Node(const T& value) : data(value), next(nullptr) {} }; std::atomic head_{nullptr}; public: void push(const T& value) { Node* new_node = new Node(value); new_node->next = head_.load(std::memory_order_relaxed); while (!head_.compare_exchange_weak(new_node->next, new_node, std::memory_order_release, std::memory_order_relaxed)) { // Retry with updated head } } bool pop(T& result) { Node* old_head = head_.load(std::memory_order_relaxed); while (old_head && !head_.compare_exchange_weak(old_head, old_head->next, std::memory_order_acquire, std::memory_order_relaxed)) { // Retry } if (old_head) { result = old_head->data; delete old_head; // Note: ABA problem exists return true; } return false; } }; // Lock-free queue (single producer, single consumer) template class SPSCQueue { std::array buffer_; alignas(64) std::atomic head_{0}; alignas(64) std::atomic tail_{0}; public: bool push(const T& item) { size_t head = head_.load(std::memory_order_relaxed); size_t next_head = (head + 1) % Size; if (next_head == tail_.load(std::memory_order_acquire)) { return false; // Queue full } buffer_[head] = item; head_.store(next_head, std::memory_order_release); return true; } bool pop(T& item) { size_t tail = tail_.load(std::memory_order_relaxed); if (tail == head_.load(std::memory_order_acquire)) { return false; // Queue empty } item = buffer_[tail]; tail_.store((tail + 1) % Size, std::memory_order_release); return true; } }; ``` ## Thread Pool ```cpp #include #include #include #include #include #include class ThreadPool { std::vector workers_; std::queue> tasks_; std::mutex queue_mutex_; std::condition_variable condition_; bool stop_ = false; public: ThreadPool(size_t num_threads) { for (size_t i = 0; i < num_threads; ++i) { workers_.emplace_back([this] { while (true) { std::function task; { std::unique_lock lock(queue_mutex_); condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); }); if (stop_ && tasks_.empty()) { return; } task = std::move(tasks_.front()); tasks_.pop(); } task(); } }); } } ~ThreadPool() { { std::unique_lock lock(queue_mutex_); stop_ = true; } condition_.notify_all(); for (auto& worker : workers_) { worker.join(); } } template auto enqueue(F&& f, Args&&... args) -> std::future> { using return_type = typename std::invoke_result_t; auto task = std::make_shared>( std::bind(std::forward(f), std::forward(args)...) ); std::future result = task->get_future(); { std::unique_lock lock(queue_mutex_); if (stop_) { throw std::runtime_error("enqueue on stopped ThreadPool"); } tasks_.emplace([task]() { (*task)(); }); } condition_.notify_one(); return result; } }; ``` ## Parallel STL Algorithms ```cpp #include #include #include #include void parallel_algorithms_demo() { std::vector vec(1'000'000); std::iota(vec.begin(), vec.end(), 0); // Parallel sort std::sort(std::execution::par, vec.begin(), vec.end()); // Parallel for_each std::for_each(std::execution::par_unseq, vec.begin(), vec.end(), [](int& x) { x *= 2; }); // Parallel transform std::vector result(vec.size()); std::transform(std::execution::par, vec.begin(), vec.end(), result.begin(), [](int x) { return x * x; }); // Parallel reduce int sum = std::reduce(std::execution::par, vec.begin(), vec.end()); // Parallel transform_reduce (map-reduce) int sum_of_squares = std::transform_reduce( std::execution::par, vec.begin(), vec.end(), 0, std::plus<>(), [](int x) { return x * x; } ); } ``` ## Synchronization Primitives ```cpp #include #include #include // Mutex types std::mutex mtx; std::recursive_mutex rec_mtx; std::timed_mutex timed_mtx; std::shared_mutex shared_mtx; // RAII locks void exclusive_access() { std::lock_guard lock(mtx); // Critical section } void unique_lock_example() { std::unique_lock lock(mtx); // Can unlock and relock lock.unlock(); // Do some work lock.lock(); } // Reader-writer lock class SharedData { mutable std::shared_mutex mutex_; std::string data_; public: std::string read() const { std::shared_lock lock(mutex_); return data_; } void write(std::string new_data) { std::unique_lock lock(mutex_); data_ = std::move(new_data); } }; // Condition variable class Queue { std::queue queue_; std::mutex mutex_; std::condition_variable cv_; public: void push(int value) { { std::lock_guard lock(mutex_); queue_.push(value); } cv_.notify_one(); } int pop() { std::unique_lock lock(mutex_); cv_.wait(lock, [this] { return !queue_.empty(); }); int value = queue_.front(); queue_.pop(); return value; } }; // std::scoped_lock - multiple mutexes std::mutex mtx1, mtx2; void transfer(Account& from, Account& to, int amount) { std::scoped_lock lock(from.mutex, to.mutex); // Deadlock-free from.balance -= amount; to.balance += amount; } ``` ## Async and Futures ```cpp #include // std::async auto future = std::async(std::launch::async, []() { return expensive_computation(); }); // Get result (blocks until ready) auto result = future.get(); // Promise and future void producer(std::promise promise) { int value = compute_value(); promise.set_value(value); } void consumer(std::future future) { int value = future.get(); } std::promise promise; std::future future = promise.get_future(); std::thread producer_thread(producer, std::move(promise)); std::thread consumer_thread(consumer, std::move(future)); // Packaged task std::packaged_task task([](int a, int b) { return a + b; }); std::future task_future = task.get_future(); std::thread task_thread(std::move(task), 5, 3); int sum = task_future.get(); // 8 task_thread.join(); ``` ## Coroutine-Based Concurrency ```cpp #include #include // Async task coroutine template struct AsyncTask { struct promise_type { std::optional value; std::exception_ptr exception; AsyncTask get_return_object() { return AsyncTask{ std::coroutine_handle::from_promise(*this) }; } std::suspend_never initial_suspend() { return {}; } std::suspend_always final_suspend() noexcept { return {}; } void return_value(T v) { value = std::move(v); } void unhandled_exception() { exception = std::current_exception(); } }; std::coroutine_handle handle; AsyncTask(std::coroutine_handle h) : handle(h) {} ~AsyncTask() { if (handle) handle.destroy(); } T get() { if (!handle.done()) { handle.resume(); } if (handle.promise().exception) { std::rethrow_exception(handle.promise().exception); } return *handle.promise().value; } }; // Usage AsyncTask async_compute() { co_return 42; } ``` ## Quick Reference | Primitive | Use Case | Performance | |-----------|----------|-------------| | std::atomic | Simple shared state | Lock-free | | std::mutex | Exclusive access | Kernel call | | std::shared_mutex | Read-heavy workload | Better than mutex | | Lock-free structures | High contention | Best throughput | | Thread pool | Task parallelism | Avoid thread overhead | | Parallel STL | Data parallelism | Automatic scaling | | std::async | Simple async tasks | Thread pool | | Coroutines | Async I/O | Minimal overhead | ## Memory Ordering Guide | Ordering | Guarantees | Use Case | |----------|-----------|----------| | relaxed | No synchronization | Counters | | acquire | Load barrier | Consumer | | release | Store barrier | Producer | | acq_rel | Both | RMW operations | | seq_cst | Total order | Default |