Thursday, 19 September 2013

Inter-thread communication using wait notify scenario

Question -  There are three threads,t1, t2 and t3, each of these thread have an array of size three. thread t1 has array {1,4,7}, thread t2 has array {2,5,8}, thread t3 has array {3,6,9}.

How to design the thread execution so that the output of execution will be 1,2,3,4,5,6,7,8,9 

Solution - It is an scenario of multi-thread communication being handled using wait & notify methods.
The code is as mentioned below - 


public class InterThreadCommunication {
    
    static class SampleRunnable implements Runnable
    {
        final int[] arrayToPrint;
        final Object waitMonitor;
        final Object notifyMonitor;
        
        public SampleRunnable(int[] arrayToPrint, Object waitMonitor, Object notifyMonitor) {
            this.waitMonitor = waitMonitor;
            this.notifyMonitor = notifyMonitor;
            this.arrayToPrint = arrayToPrint;
        }
        
        @Override
        public  void  run() {
            
            try {
                setCurrentMonitorToWait();
                
                System.out.println(arrayToPrint[0]);
                notifyNextMonitor();
                
                setCurrentMonitorToWait();
                
                System.out.println(arrayToPrint[1]);
                notifyNextMonitor();
                
                setCurrentMonitorToWait();
                
                System.out.println(arrayToPrint[2]);
                notifyNextMonitor();
                
                
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void setCurrentMonitorToWait() throws InterruptedException {
            synchronized (waitMonitor) {
                this.waitMonitor.wait();    
            }
        }

        private void notifyNextMonitor() {
            synchronized (notifyMonitor) {
                this.notifyMonitor.notify();    
            }
        }
    }
    
    
    public static void main(String[] args) {
        
        Object firstMonitor = new Object();
        Object secondMonitor = new Object();
        Object thirdMonitor = new Object();
        
        int[] a1 = new int[] {1,4,7};
        int[] a2 = new int[]{2,5,8};
        int[] a3 = new int[]{3,6,9};
        
        Thread t1 = new Thread(new InterThreadCommunication.SampleRunnable(a1,firstMonitor,secondMonitor));
        Thread t2 = new Thread(new InterThreadCommunication.SampleRunnable(a2, secondMonitor, thirdMonitor));
        Thread t3 = new Thread(new InterThreadCommunication.SampleRunnable(a3, thirdMonitor, firstMonitor));
        
        t1.start();
        t2.start();
        t3.start();
        
        synchronized (firstMonitor) {
            firstMonitor.notify();
        }
    }

}

While testing in quad core processor machine, we found it working fine. But this solution might get into dead lock situation if first signal from main to first thread gets missed.

We could mitigate this problem, by using semaphore. The solution code is as mentioned below -

import java.util.concurrent.Semaphore;

public class InterThreadCommunication {
    
    static class SampleRunnable implements Runnable
    {
        final int[] arrayToPrint;
        final Semaphore acquireSemaphore;
        final Semaphore releaseSemaphore;
        
        public SampleRunnable(int[] arrayToPrint, Semaphore acquireSemaphore, Semaphore releaseSemaphore) {
            this.acquireSemaphore = acquireSemaphore;
            this.releaseSemaphore = releaseSemaphore;
            this.arrayToPrint = arrayToPrint;
        }
        
        @Override
        public  void  run()
        {
      try {
    
              this.acquireSemaphore.acquire();
              System.out.println(arrayToPrint[0]);
     
              this.releaseSemaphore.release();
              this.acquireSemaphore.acquire();
              System.out.println(arrayToPrint[1]);
              
              this.releaseSemaphore.release();
              this.acquireSemaphore.acquire();
              System.out.println(arrayToPrint[2]);
              this.releaseSemaphore.release();
                 
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
        }
    }
    
    
    public static void main(String[] args) {
        
        Semaphore firstSemaphore = new Semaphore(0);
        Semaphore secondSemaphore = new Semaphore(0);
        Semaphore thirdSemaphore = new Semaphore(0);
        
        int[] a1 = new int[] {1,4,7};
        int[] a2 = new int[]{2,5,8};
        int[] a3 = new int[]{3,6,9};
        
        Thread t1 = new Thread(new InterThreadCommunication.SampleRunnable(a1,firstSemaphore,secondSemaphore));
        Thread t2 = new Thread(new InterThreadCommunication.SampleRunnable(a2, secondSemaphore, thirdSemaphore));
        Thread t3 = new Thread(new InterThreadCommunication.SampleRunnable(a3, thirdSemaphore, firstSemaphore));
        
        t1.start();
        t2.start();
        t3.start();
        
        firstSemaphore.release();
    }

}

As in this problem, at a moment of time, only one thread should be in action to give the result in a specific sequence. We can implement the solution using Exchanger as well. The implementation is as mentioned below -


import java.util.concurrent.Exchanger;

public class PipeLineOperation extends Thread {
    
  private int threadNumber;
  private Exchanger firstExchanger;
  private Exchanger secondExchanger;
  private int[] firstArray;
  private int[] secondArray;
  private int[] currentArray;
  
  public PipeLineOperation(int threadNumber, Exchanger firstExchanger, Exchanger secondExchanger, int[] threadsArray) {
   this.firstExchanger = firstExchanger;
   this.secondExchanger= secondExchanger;
   this.threadNumber = threadNumber;
   this.currentArray = threadsArray;
  }
 
  @Override
  public  void  run()
  {
   try
    {
      performOperationInthread();
    }
    catch(InterruptedException interruptedException)
    {
     throw new RuntimeException(interruptedException);
    }
  }

  private void performOperationInthread() throws InterruptedException {
   if(threadNumber == 0)
   {
     firstArray = currentArray;
     secondArray = firstExchanger.exchange(currentArray);
     currentArray = secondExchanger.exchange(currentArray);
          
          for (int i = 0; i < 3; i++) {
     System.out.println(firstArray[i]);
     System.out.println(secondArray[i]);
     System.out.println(currentArray[i]);
    }
   }
   else
   {
     currentArray = firstExchanger.exchange(currentArray);
   }
  }
    
    public static void main(String[] args) {
     Exchanger firstExchanger= new Exchanger<>();
     Exchanger secondExchanger = new Exchanger<>();

     new PipeLineOperation(0, firstExchanger, secondExchanger, new int[] {1,4,7}).start();
     new PipeLineOperation(1, firstExchanger, null, new int[]{2,5,8}).start();
     new PipeLineOperation(2, secondExchanger, null, new int[]{3,6,9}).start();
    }

}

The problem can be solved easily without using any synchronized construct, and implementing threads using callable construct. The complete working code is as following -

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class PipeLineOperation implements Callable {
    
 private final int[] holdingArray;
 
 public PipeLineOperation(final int[] holdingArray) {
  this.holdingArray=holdingArray;
 }
 
 @Override
 public int[] call() throws Exception {
  return holdingArray;
 }
    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
     final ExecutorService executorService = Executors.newFixedThreadPool(3);
    
     final FutureTask firstFutureTask = (FutureTask) executorService.submit(new PipeLineOperation(new int[]{1,4,7}));
     final FutureTask secondFutureTask = (FutureTask) executorService.submit(new PipeLineOperation(new int[]{2,5,8}));
     final FutureTask thirdFutureTask = (FutureTask) executorService.submit(new PipeLineOperation(new int[]{3,6,9}));
     
     executorService.shutdown();
     
     final int[] firstArray = firstFutureTask.get();
     final int[] secondArray = secondFutureTask.get();
     final int[] thirdArray = thirdFutureTask.get();
     
     for (int i = 0; i < 3; i++) {
   System.out.println(firstArray[i]);
   System.out.println(secondArray[i]);
   System.out.println(thirdArray[i]);
  }
    }
}

The same problem can be solved using wait notify mechanism by designing SequenceLatch. The solution is as mentioned following -
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SequencialLatchDemo {

 public static void main(String[] args) throws InterruptedException {

  SequencialLatch firstLatch = new SequencialLatch();
  SequencialLatch secondLatch = new SequencialLatch();
  SequencialLatch thirdLatch = new SequencialLatch();

  firstLatch.setNext(secondLatch);
  secondLatch.setNext(thirdLatch);
  thirdLatch.setNext(firstLatch);

  Runner runner1 = new Runner(new int[] { 1, 4, 7 }, 1, firstLatch);
  Runner runner2 = new Runner(new int[] { 2, 5, 8 }, 2, secondLatch);
  Runner runner3 = new Runner(new int[] { 3, 6, 9 }, 3, thirdLatch);

  ExecutorService service = Executors.newFixedThreadPool(3);
  service.submit(runner2);
  service.submit(runner3);
  service.submit(runner1);

  service.shutdown();
 }

}

class Runner implements Runnable {

 private final SequencialLatch pass;
 private final int[] numbers;
 private final int threadNumber;

 public Runner(int[] numbers, final int threadNumber, SequencialLatch pass) {
  this.numbers = numbers;
  this.pass = pass;
  this.threadNumber = threadNumber;
 }

 @Override
 public void run() {

  for (int taskNumber = 0; taskNumber < numbers.length; taskNumber++) {
   if (!(this.threadNumber == 1 && taskNumber == 0))
    pass.await();
   System.out.println(numbers[taskNumber]);
   pass.nextPerssionRelease();
  }
 }

}

class SequencialLatch {

 private SequencialLatch next;
 private boolean active;

 public void nextPerssionRelease() {
  this.active = false;
  next.active = true;
  synchronized (next) {
   next.notify();
  }
 }

 public void await() {
  while (!active) {
   synchronized (this) {
    try {
     this.wait();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  }
 }

 public void setNext(SequencialLatch existingPass) {
  this.next = existingPass;
 }
}

The same problem can be solved using Lock and Condistion mechanism by designing SequenceLatch. The solution is as mentioned following -
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class SequencialLatchDemo {

 public static void main(String... args) throws InterruptedException {

  SequencialLatch firstLatch = new SequencialLatch();
  SequencialLatch secondLatch = new SequencialLatch();
  SequencialLatch thirdLatch = new SequencialLatch();

  firstLatch.setNext(secondLatch);
  secondLatch.setNext(thirdLatch);
  thirdLatch.setNext(firstLatch);

  Runner runner1 = new Runner(new int[] { 1, 4, 7 }, 1, firstLatch);
  Runner runner2 = new Runner(new int[] { 2, 5, 8 }, 2, secondLatch);
  Runner runner3 = new Runner(new int[] { 3, 6, 9 }, 3, thirdLatch);

  ExecutorService service = Executors.newFixedThreadPool(3);
  service.submit(runner2);
  service.submit(runner3);
  service.submit(runner1);

  service.shutdown();
 }

}

class Runner implements Runnable {

 private final SequencialLatch pass;
 private final int[] numbers;
 private final int threadNumber;

 public Runner(int[] numbers, final int threadNumber, SequencialLatch pass) {
  this.numbers = numbers;
  this.pass = pass;
  this.threadNumber = threadNumber;
 }

 @Override
 public void run() {

  for (int taskNumber = 0; taskNumber < numbers.length; taskNumber++) {
   if (!(this.threadNumber == 1 && taskNumber == 0))
    pass.await();
   System.out.println(numbers[taskNumber]);
   pass.nextPerssionRelease();
  }
 }

}

class SequencialLatch {

 private final static Lock baseLock = new ReentrantLock();
 private final Condition condition = baseLock.newCondition();
 private boolean active;
 private SequencialLatch next;

 public void nextPerssionRelease() {
  baseLock.lock();
  try {
   this.active = false;
   next.active = true;
   next.condition.signal();
  } finally {
   baseLock.unlock();
  }
 }

 public void await() {
  baseLock.lock();
  try {
   while (!active) {
    condition.await();
   }
  } catch (InterruptedException interruptedException) {
   interruptedException.printStackTrace();
  } finally {
   baseLock.unlock();
  }
 }

 public void setNext(SequencialLatch existingPass) {
  this.next = existingPass;
 }
}

2 comments:

  1. I'm pretty sure the interviewer would consider this "solution" a fail. You might as well just do system.out.printn(1) through to system.out.printn(9);

    ReplyDelete
  2. Hey Aaron,

    As discussed out of this blog, callable solution, which performs the task without using any synchronization mechanism for sequencing, is just another valid approach of solving the problem

    In this approach, main thread says, hey other threads, we need to perform the tasks (printing the value in this case) in sequence and in any case, we would not be taking the benefit of parallelism so you guys pass your tasks to me. I will take the responsibility of sequencing and performing the task.

    That is why the sub threads are not doing any thing but passing their tasks to main thread

    ReplyDelete