Thread-safe queues in C++

In multithreaded programs, data synchronization is crucial to ensure thread safety and avoid race conditions. One common data structure used to manage concurrent operations is a queue. In this article, we will explore how to implement a thread-safe queue in C++.

Overview of Thread-Safe Queues

A thread-safe queue allows multiple threads to safely manipulate its elements without encountering inconsistencies or data corruption. The thread safety is achieved by using synchronization mechanisms, such as locks or atomic operations, to ensure that only one thread accesses the queue at a time.

Implementing a Thread-Safe Queue

Let’s start by creating a simple implementation of a thread-safe queue in C++. We will use a std::queue container and protect its operations using a std::mutex.

#include <iostream>
#include <queue>
#include <mutex>

template <typename T>
class ThreadSafeQueue {
public:
    void push(const T& item) {
        std::lock_guard<std::mutex> lock(mutex_);
        queue_.push(item);
    }

    bool try_pop(T& item) {
        std::lock_guard<std::mutex> lock(mutex_);
        if (queue_.empty()) {
            return false;
        }

        item = queue_.front();
        queue_.pop();
        return true;
    }

    bool empty() const {
        std::lock_guard<std::mutex> lock(mutex_);
        return queue_.empty();
    }

private:
    std::queue<T> queue_;
    mutable std::mutex mutex_;
};

In the code snippet above, we define a class ThreadSafeQueue that wraps the usage of std::queue and provides thread-safe operations. The push function uses a lock guard to ensure mutual exclusion while pushing an item into the queue. Similarly, the try_pop function pops an item from the queue if it is not empty. Lastly, the empty function safely checks if the queue is empty.

Using the Thread-Safe Queue

Now let’s demonstrate how to use the ThreadSafeQueue class in a multithreaded environment.

#include <iostream>
#include <thread>

void producer(ThreadSafeQueue<int>& queue) {
    for (int i = 0; i < 10; ++i) {
        queue.push(i);
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void consumer(ThreadSafeQueue<int>& queue) {
    int item;
    while (!queue.empty()) {
        if (queue.try_pop(item)) {
            std::cout << "Consumed: " << item << std::endl;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }
}

int main() {
    ThreadSafeQueue<int> queue;

    std::thread producerThread(producer, std::ref(queue));
    std::thread consumerThread(consumer, std::ref(queue));

    producerThread.join();
    consumerThread.join();

    return 0;
}

In this example, we define two functions producer and consumer that operate on the thread-safe queue. The producer pushes integers from 0 to 9 into the queue at regular intervals. Meanwhile, the consumer tries to pop items from the queue and prints them. Both functions are executed in separate threads spawned in the main function.

Conclusion

Thread-safe queues are essential for managing concurrent access to data structures in multithreaded programs. By implementing a thread-safe queue, we can ensure that multiple threads can safely manipulate its elements without causing race conditions or data corruption. In this article, we have explored how to implement a thread-safe queue using a combination of std::queue and std::mutex in C++.