Find Running Median from a Stream of Integers

Find Running Median from a Stream of Integers in Java.


Given that integers are read from a data stream. Find median from the elements read so far in efficient way.

Lets see sample input and output for better understanding:

Algorithm 


For calculating the median from the stream of integers, we need to use MinHeap and MaxHeap, let's see why,

Even number of elements:
5, 8, 1, 4, 2, 8

For getting the median in case of even number of elements, we need average of middle two elements.
Step 1: For getting the middle two element we need to sort the data: 1, 2, 4, 5, 8, 8
Step 2: Return the average of middle two elements, (4+5)/2.

Odd number of elements:
5, 8, 1, 4, 2, 8, 5

For getting the median in case of odd number of elements, we need to return the middle element.
Step 1: For getting the middle element we need to sort the data: 1, 2, 4, 5, 5, 8, 8
Step 2: Return the middle element 5.

Note: If you observe, we only need middle two elements or middle element for finding the median.

At each step we are required to sort the stream of data to get the median, Can we avoid sorting?
Yes, we can avoid sorting because we actually don't need the sorted data, we just need the highest element on the left portion of the stream and lowest element on right portion of stream.
Example: 1, 2, 4, 5, 8, 8

Left portion: 1 2 4
Right portion 5 8 8

1    2    4    5    8     8
            ^    ^
4 is the highest on left portion 1 2  4 stream.
5 is the lowest on right portion 5 8 8 stream.

MinHeap and MaxHeap is perfect data structure to get minimum element and maximum element from set of the data.
MinHeap: Keeps the Minimum element always at index 0 of minHeap array.
MaxHeap: Keeps the Maximum element always at index 0 of maxHeap array.

Lets run an example to understand the solution using MinHeap and MaxHeap.
Stream of data: 5 8 1 4 2 8 5 5 ....

First element 5,
Where you will put this element, as this is the first element put it in MinHeap.
MinHeap = 5


Second element 8,
Where you will put this element, since MinHeap should always contain Minimum at the top, we already have some data in MinHeap, so now we will put the data in MinHeap only when we find element which is higher than minHeap[0] so that MinHeap property is still intact.

8>minHeap[0] 
(8>5), true, so put the element in MinHeap.

MinHeap: 5, 8 (minimum element 5 is still at top, so MinHeap property is preserved)

Remember, if we keep on finding element greater than minHeap[0] then we will keep on adding element in MinHeap and it will be of no use, so to keep the balance of minimum and maximum element, we will try to balance the data in MinHeap and MaxHeap.

If difference of data in MinHeap and MaxHeap is > 1, then balance the data by transferring the element at top of higher heap to the heap having less elements.

After transferring the data, we need to run the heaping process to maintain MinHeap and MaxHeap property.

Note: If you encounter the element less than minHeap[0] then put the element in MaxHeap and
heapify the MaxHeap.

Java Program to Find Running Median from a Stream of Integers using MinHeap and MaxHeap.

package javabypatel.misc;


public class RunningMedianMain {
    public static void main(String[] args) {
        int size = 10;
        RunningMedian runningMedian = new RunningMedian(size);

        runningMedian.add(5);
        System.out.println(runningMedian.getMedian());

        runningMedian.add(8);
        System.out.println(runningMedian.getMedian());

        runningMedian.add(1);
        System.out.println(runningMedian.getMedian());

        runningMedian.add(4);
        System.out.println(runningMedian.getMedian());

        runningMedian.add(2);
        System.out.println(runningMedian.getMedian());

        runningMedian.add(8);
        System.out.println(runningMedian.getMedian());

        runningMedian.add(5);
        System.out.println(runningMedian.getMedian());

        runningMedian.add(5);
        System.out.println(runningMedian.getMedian());
    }
}

class RunningMedian {
    private MinHeap minHeap;
    private MaxHeap maxHeap;

    public RunningMedian (int size) {
        minHeap = new MinHeap(size);
        maxHeap = new MaxHeap(size);
    }

    public Float getMedian() {
        if (maxHeap.getSize() > minHeap.getSize()) {
            return (float) maxHeap.peek();
        } else if (maxHeap.getSize() == minHeap.getSize()) {
            if (maxHeap.getSize() != 0) {
                return (minHeap.peek() + maxHeap.peek()) / 2.0f;
            } else {
                return null;
            }
        } else {
            return (float) minHeap.peek();
        }
    }

    public void add(int data) {
        if (maxHeap.getSize() == 0 && minHeap.getSize() == 0) {
            maxHeap.add(data);

        } else if (data < maxHeap.peek()) {
            maxHeap.add(data);

        } else {
            minHeap.add(data);
        }

        if (Math.abs(minHeap.getSize() - maxHeap.getSize()) > 1) {
            if (maxHeap.getSize() > minHeap.getSize()) {
                int removedData = maxHeap.remove();
                minHeap.add(removedData);
            } else {
                int removedData = minHeap.remove();
                maxHeap.add(removedData);
            }
        }
    }
}
MaxHeap
package javabypatel.misc;  

public class MaxHeap {
    private int[] heap;
    private int size;
    private int currentIndex=-1;

    public MaxHeap(int size) {
        this.heap = new int[size];
        this.size = size;
    }

    public int getSize() {
        return currentIndex+1;
    }

    public int remove() {
        if (currentIndex < 0) {
            throw new RuntimeException("Heap is empty");
        }
        int temp = heap[0];
        heap[0] = heap[currentIndex];
        currentIndex--;
        bubbleDown(0);
        return temp;
    }

    public int peek() {
        if (currentIndex < 0) {
            throw new RuntimeException("Heap is empty");
        }
        return heap[0];
    }

    public void add(int data) {
        if ((currentIndex + 1) >= size) {
            throw new RuntimeException("Heap full");
        }

        heap[++currentIndex] = data;
        bubbleUp(currentIndex);
    }

    private int getLeftChildIndex(int index) {
        return index * 2 + 1;
    }

    private int getRightChildIndex(int index) {
        return index * 2 + 2;
    }

    private int getParentIndex(int index) {
        return (index - 1) / 2;
    }

    private boolean isParentPresent(int index) {
        return getParentIndex(index) >= 0;
    }

    private void bubbleUp(int index) {
        if (!isParentPresent(index)) {
            return;
        }

        int parentIndex = getParentIndex(index);

        if (heap[index] > heap[parentIndex]) {
            swap(index, parentIndex);
            bubbleUp(parentIndex);
        }
    }

    private void bubbleDown(int index) {
        int left = getLeftChildIndex(index);
        int right = getRightChildIndex(index);

        int minIndex = index;
        if (left <= currentIndex && heap[left] > heap[minIndex]) {
            minIndex = left;
        }
        if (right <= currentIndex && heap[right] > heap[minIndex]) {
            minIndex = right;
        }

        if (index != minIndex) {
            swap(index, minIndex);
            bubbleDown(minIndex);
        }
    }

    private void swap(int index1, int index2) {
        int temp = heap[index1];
        heap[index1] = heap[index2];
        heap[index2] = temp;
    }

}
MinHeap
package javabypatel.misc; 

public class MinHeap {
    private int[] heap;
    private int size;
    private int currentIndex=-1;

    public MinHeap(int size) {
        this.heap = new int[size];
        this.size = size;
    }

    public int getSize() {
        return currentIndex+1;
    }

    public int remove() {
        if (currentIndex < 0) {
            throw new RuntimeException("Heap is empty");
        }
        int temp = heap[0];
        heap[0] = heap[currentIndex];
        currentIndex--;
        bubbleDown(0);
        return temp;
    }

    public int peek() {
        if (currentIndex < 0) {
            throw new RuntimeException("Heap is empty");
        }
        return heap[0];
    }

    public void add(int data) {
        if ((currentIndex + 1) >= size) {
            throw new RuntimeException("Heap full");
        }

        heap[++currentIndex] = data;
        bubbleUp(currentIndex);
    }

    private int getLeftChildIndex(int index) {
        return index * 2 + 1;
    }

    private int getRightChildIndex(int index) {
        return index * 2 + 2;
    }

    private int getParentIndex(int index) {
        return (index - 1) / 2;
    }

    private boolean isParentPresent(int index) {
        return getParentIndex(index) >= 0;
    }

    private void bubbleUp(int index) {
        if (!isParentPresent(index)) {
            return;
        }

        int parentIndex = getParentIndex(index);

        if (heap[index] < heap[parentIndex]) {
            swap(index, parentIndex);
            bubbleUp(parentIndex);
        }
    }

    private void bubbleDown(int index) {
        int left = getLeftChildIndex(index);
        int right = getRightChildIndex(index);

        int minIndex = index;
        if (left <= currentIndex && heap[left] < heap[minIndex]) {
            minIndex = left;
        }
        if (right <= currentIndex && heap[right] < heap[minIndex]) {
            minIndex = right;
        }

        if (index != minIndex) {
            swap(index, minIndex);
            bubbleDown(minIndex);
        }
    }

    private void swap(int index1, int index2) {
        int temp = heap[index1];
        heap[index1] = heap[index2];
        heap[index2] = temp;
    }
}



Java Program to Find Running Median from a Stream of Integers using PriorityQueue.

package com.javabypatel.misc;

import java.util.PriorityQueue;

/*


Input: Stream of Data: 5 8 1 4 2 8 5 5 ....

5
-> Median 5

5, 8
-> Even number of elements, find average of middle two elements (5 + 8)/2 = 6.5

5, 8, 1
Sorted: (1, 5, 8)
-> Odd number of elements, return middle 5

5, 8, 1, 4
Sorted: (1, 4, 5, 8)
-> Even number of elements, find average of middle two elements (4 + 5)/2 = 4.5

5, 8, 1, 4, 2
Sorted: (1, 2, 4, 5, 8)
-> Odd number of elements, return middle 4

5, 8, 1, 4, 2, 8
Sorted: (1, 2, 4, 5, 8, 8)
-> Even number of elements, find average of middle two elements (4 + 5)/2 = 4.5

5, 8, 1, 4, 2, 8, 5
Sorted: (1, 2, 4, 5, 5, 8, 8)
-> Odd number of elements, return middle 5

5, 8, 1, 4, 2, 8, 5, 5
Sorted: (1, 2, 4, 5, 5, 5, 8, 8)
-> Even number of elements, find average of middle two elements (5 + 5)/2 = 5


 */
public class RunningMedianUsingPriorityQueue {

    private static PriorityQueue<Float> minHeap = new PriorityQueue<>();
    private static PriorityQueue<Float> maxHeap = new PriorityQueue<>((number1, number2) -> {
        if (number1 < number2) return 1;
        if (number1 > number2) return -1;
        else return 0;
    });

    public static void main(String[] args) {
        addNumber(5);
        float median = getMedian();
        System.out.println("Median :" + median); //5.0

        addNumber(8);
        median = getMedian();
        System.out.println("Median :" + median); //6.5

        addNumber(1);
        median = getMedian();
        System.out.println("Median :" + median); //5.0

        addNumber(4);
        median = getMedian();
        System.out.println("Median :" + median); //4.5

        addNumber(2);
        median = getMedian();
        System.out.println("Median :" + median); //4.0

        addNumber(8);
        median = getMedian();
        System.out.println("Median :" + median); //4.5

        addNumber(5);
        median = getMedian();
        System.out.println("Median :" + median); //5.0

        addNumber(5);
        median = getMedian();
        System.out.println("Median :" + median); //5.0
    }

    private static float getMedian() {
        if (minHeap.size() == maxHeap.size()) {
            return (minHeap.peek() + maxHeap.peek()) / 2;

        } else if (minHeap.size() > maxHeap.size()) {
            return minHeap.peek();

        } else {
            return maxHeap.peek();
        }
    }

    private static void addNumber(float number) {
        if (minHeap.size() == 0 || number > minHeap.peek()) {
            minHeap.add(number);
        } else {
            maxHeap.add(number);
        }

        if (Math.abs(minHeap.size() - maxHeap.size()) > 1) {
            if (minHeap.size() > maxHeap.size()) {
                maxHeap.add(minHeap.poll());
            } else {
                minHeap.add(maxHeap.poll());
            }
        }
    }

}

Output:
Median :5.0
Median :6.5
Median :5.0
Median :4.5
Median :4.0
Median :4.5
Median :5.0
Median :5.0

You may also like to see


Sort Linked list using Merge sort

Bubble Sort

Heap Sort

Selection Sort

Insertion Sort


How ConcurrentHashMap works and ConcurrentHashMap interview questions

How Much Water Can A Bar Graph with different heights can Hold

Interview Questions-Answer Bank

Enjoy !!!! 
If you find any issue in post or face any error while implementing, Please comment.

Post a Comment