Custom BlockingQueue implementation in java

Producer Consumer using custom BlockingQueue implementation in java


Implement a custom Blocking Queue is very popular interview question.

What is Blocking Queue?

BlockingQueue is a queue data structure that blocks all the threads trying to read(Consumer) the data from the queue if the queue is empty and similarly it blocks all the threads trying to add(Producer) the data to the queue if the queue is full. due to this blocking feature, the queue is known as BlockingQueue. 

Threads that are blocked for adding the data to the queue due to queue full gets unblocked when some other thread removes the data from the queue and there is a space to add new data.

Threads that are blocked for reading the data from the queue due to queue empty gets unblocked when some other thread adds the data to the queue and there is some data to read.

Note: BlockingQueue doesn't accept null values, If we try to add null, then it throws NullPointerException.
Custom BlockingQueue implementation in java
Custom BlockingQueue implementation in java

I would recommend going through below articles for better understanding of Threads and synchronization in Java.


Implement Custom BlockingQueue in Java


 
package javabypatel;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class CustomBlockingQueueUsingLock {
    private Lock lock = new ReentrantLock();
    private Condition putCondition = lock.newCondition();
    private Condition takeCondition = lock.newCondition();

    private Object[] queue;
    private int queueSize;

    private int putIndex;
    private int takeIndex;
    private int count;

    public CustomBlockingQueueUsingLock(int queueSize) {
        this.queueSize = queueSize;
        queue = new Object[queueSize];
    }

    public void put(Object data) {
        lock.lock();
        try{
            while (count >= queueSize) {
                try {
                    putCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("Queuing value :" + data);
            queue[putIndex] = data;
            count++;

            if (++putIndex >= queueSize) {
                putIndex = 0;
            }
            takeCondition.signalAll();
        }  finally {
            lock.unlock();
        }
    }

    public Object take() {
        lock.lock();
        try {
            while (count == 0) {
                try {
                    takeCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Object data = queue[takeIndex];
            count--;

            if (++takeIndex >= queueSize) {
                takeIndex = 0;
            }
            putCondition.signalAll();
            return data;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        CustomBlockingQueueUsingLock customBlockingQueue = new CustomBlockingQueueUsingLock(5);

        new Thread(() -> {
            int i = 0;
            while (i < 10) {
                System.out.println("data :" + customBlockingQueue.take());
                i++;
            }
        }, "Consumer Thread").start();

        new Thread(() -> {
            int i = 0;
            while (i < 10) {
                customBlockingQueue.put(i);
                i++;
            }
        }, "Producer Thread").start();
    }
}
 

Implement Custom Generic blocking queue using Linked list data structure and Locks in Java


package javabypatel;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class GenericCustomBlockingQueueUsingLock<T> {
    private Queue<T> queue = new LinkedList<T>();

    private Lock lock = new ReentrantLock();
    private Condition putCondition = lock.newCondition();
    private Condition takeCondition = lock.newCondition();

    private int size;

    public GenericCustomBlockingQueueUsingLock(int size) {
        this.size = size;
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                takeCondition.await();
            }
            T data = queue.poll();

            //If say the queue is full before we take, then chances are threads trying to put would be waiting, so after taking the
            //element we will inform put threads that there is space now for you to put.
            putCondition.signal();
            return data;
        } finally {
            lock.unlock();
        }
    }

    public void put(T obj) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == size) {
                putCondition.await();
            }
            System.out.println("Putting data :" + obj);
            queue.add(obj);

            //If say the queue is empty before we add the element to the queue, then chances are threads trying to take the element would be waiting,
            //so after adding the element we will inform take threads that there is element now for you to take.
            takeCondition.signal();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        GenericCustomBlockingQueueUsingLock<Integer> queue = new GenericCustomBlockingQueueUsingLock<>(10);
        new Thread(() -> {
            int i = 0;
            while (i < 20) {
                try {
                    System.out.println(queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                i++;
            }
        }).start();

        new Thread(() -> {
            int i = 0;
            while (i < 20) {
                try {
                    queue.put(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                i++;
            }
        }).start();
    }
}

Post a Comment