Thursday, 19 September 2013

Inter-thread communication using wait notify scenario

Question -  There are three threads,t1, t2 and t3, each of these thread have an array of size three. thread t1 has array {1,4,7}, thread t2 has array {2,5,8}, thread t3 has array {3,6,9}.

How to design the thread execution so that the output of execution will be 1,2,3,4,5,6,7,8,9 

Solution - It is an scenario of multi-thread communication being handled using wait & notify methods.
The code is as mentioned below - 


public class InterThreadCommunication {
    
    static class SampleRunnable implements Runnable
    {
        final int[] arrayToPrint;
        final Object waitMonitor;
        final Object notifyMonitor;
        
        public SampleRunnable(int[] arrayToPrint, Object waitMonitor, Object notifyMonitor) {
            this.waitMonitor = waitMonitor;
            this.notifyMonitor = notifyMonitor;
            this.arrayToPrint = arrayToPrint;
        }
        
        @Override
        public  void  run() {
            
            try {
                setCurrentMonitorToWait();
                
                System.out.println(arrayToPrint[0]);
                notifyNextMonitor();
                
                setCurrentMonitorToWait();
                
                System.out.println(arrayToPrint[1]);
                notifyNextMonitor();
                
                setCurrentMonitorToWait();
                
                System.out.println(arrayToPrint[2]);
                notifyNextMonitor();
                
                
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void setCurrentMonitorToWait() throws InterruptedException {
            synchronized (waitMonitor) {
                this.waitMonitor.wait();    
            }
        }

        private void notifyNextMonitor() {
            synchronized (notifyMonitor) {
                this.notifyMonitor.notify();    
            }
        }
    }
    
    
    public static void main(String[] args) {
        
        Object firstMonitor = new Object();
        Object secondMonitor = new Object();
        Object thirdMonitor = new Object();
        
        int[] a1 = new int[] {1,4,7};
        int[] a2 = new int[]{2,5,8};
        int[] a3 = new int[]{3,6,9};
        
        Thread t1 = new Thread(new InterThreadCommunication.SampleRunnable(a1,firstMonitor,secondMonitor));
        Thread t2 = new Thread(new InterThreadCommunication.SampleRunnable(a2, secondMonitor, thirdMonitor));
        Thread t3 = new Thread(new InterThreadCommunication.SampleRunnable(a3, thirdMonitor, firstMonitor));
        
        t1.start();
        t2.start();
        t3.start();
        
        synchronized (firstMonitor) {
            firstMonitor.notify();
        }
    }

}

While testing in quad core processor machine, we found it working fine. But this solution might get into dead lock situation if first signal from main to first thread gets missed.

We could mitigate this problem, by using semaphore. The solution code is as mentioned below -

import java.util.concurrent.Semaphore;

public class InterThreadCommunication {
    
    static class SampleRunnable implements Runnable
    {
        final int[] arrayToPrint;
        final Semaphore acquireSemaphore;
        final Semaphore releaseSemaphore;
        
        public SampleRunnable(int[] arrayToPrint, Semaphore acquireSemaphore, Semaphore releaseSemaphore) {
            this.acquireSemaphore = acquireSemaphore;
            this.releaseSemaphore = releaseSemaphore;
            this.arrayToPrint = arrayToPrint;
        }
        
        @Override
        public  void  run()
        {
      try {
    
              this.acquireSemaphore.acquire();
              System.out.println(arrayToPrint[0]);
     
              this.releaseSemaphore.release();
              this.acquireSemaphore.acquire();
              System.out.println(arrayToPrint[1]);
              
              this.releaseSemaphore.release();
              this.acquireSemaphore.acquire();
              System.out.println(arrayToPrint[2]);
              this.releaseSemaphore.release();
                 
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
        }
    }
    
    
    public static void main(String[] args) {
        
        Semaphore firstSemaphore = new Semaphore(0);
        Semaphore secondSemaphore = new Semaphore(0);
        Semaphore thirdSemaphore = new Semaphore(0);
        
        int[] a1 = new int[] {1,4,7};
        int[] a2 = new int[]{2,5,8};
        int[] a3 = new int[]{3,6,9};
        
        Thread t1 = new Thread(new InterThreadCommunication.SampleRunnable(a1,firstSemaphore,secondSemaphore));
        Thread t2 = new Thread(new InterThreadCommunication.SampleRunnable(a2, secondSemaphore, thirdSemaphore));
        Thread t3 = new Thread(new InterThreadCommunication.SampleRunnable(a3, thirdSemaphore, firstSemaphore));
        
        t1.start();
        t2.start();
        t3.start();
        
        firstSemaphore.release();
    }

}

As in this problem, at a moment of time, only one thread should be in action to give the result in a specific sequence. We can implement the solution using Exchanger as well. The implementation is as mentioned below -


import java.util.concurrent.Exchanger;

public class PipeLineOperation extends Thread {
    
  private int threadNumber;
  private Exchanger firstExchanger;
  private Exchanger secondExchanger;
  private int[] firstArray;
  private int[] secondArray;
  private int[] currentArray;
  
  public PipeLineOperation(int threadNumber, Exchanger firstExchanger, Exchanger secondExchanger, int[] threadsArray) {
   this.firstExchanger = firstExchanger;
   this.secondExchanger= secondExchanger;
   this.threadNumber = threadNumber;
   this.currentArray = threadsArray;
  }
 
  @Override
  public  void  run()
  {
   try
    {
      performOperationInthread();
    }
    catch(InterruptedException interruptedException)
    {
     throw new RuntimeException(interruptedException);
    }
  }

  private void performOperationInthread() throws InterruptedException {
   if(threadNumber == 0)
   {
     firstArray = currentArray;
     secondArray = firstExchanger.exchange(currentArray);
     currentArray = secondExchanger.exchange(currentArray);
          
          for (int i = 0; i < 3; i++) {
     System.out.println(firstArray[i]);
     System.out.println(secondArray[i]);
     System.out.println(currentArray[i]);
    }
   }
   else
   {
     currentArray = firstExchanger.exchange(currentArray);
   }
  }
    
    public static void main(String[] args) {
     Exchanger firstExchanger= new Exchanger<>();
     Exchanger secondExchanger = new Exchanger<>();

     new PipeLineOperation(0, firstExchanger, secondExchanger, new int[] {1,4,7}).start();
     new PipeLineOperation(1, firstExchanger, null, new int[]{2,5,8}).start();
     new PipeLineOperation(2, secondExchanger, null, new int[]{3,6,9}).start();
    }

}

The problem can be solved easily without using any synchronized construct, and implementing threads using callable construct. The complete working code is as following -

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class PipeLineOperation implements Callable {
    
 private final int[] holdingArray;
 
 public PipeLineOperation(final int[] holdingArray) {
  this.holdingArray=holdingArray;
 }
 
 @Override
 public int[] call() throws Exception {
  return holdingArray;
 }
    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
     final ExecutorService executorService = Executors.newFixedThreadPool(3);
    
     final FutureTask firstFutureTask = (FutureTask) executorService.submit(new PipeLineOperation(new int[]{1,4,7}));
     final FutureTask secondFutureTask = (FutureTask) executorService.submit(new PipeLineOperation(new int[]{2,5,8}));
     final FutureTask thirdFutureTask = (FutureTask) executorService.submit(new PipeLineOperation(new int[]{3,6,9}));
     
     executorService.shutdown();
     
     final int[] firstArray = firstFutureTask.get();
     final int[] secondArray = secondFutureTask.get();
     final int[] thirdArray = thirdFutureTask.get();
     
     for (int i = 0; i < 3; i++) {
   System.out.println(firstArray[i]);
   System.out.println(secondArray[i]);
   System.out.println(thirdArray[i]);
  }
    }
}

The same problem can be solved using wait notify mechanism by designing SequenceLatch. The solution is as mentioned following -
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SequencialLatchDemo {

 public static void main(String[] args) throws InterruptedException {

  SequencialLatch firstLatch = new SequencialLatch();
  SequencialLatch secondLatch = new SequencialLatch();
  SequencialLatch thirdLatch = new SequencialLatch();

  firstLatch.setNext(secondLatch);
  secondLatch.setNext(thirdLatch);
  thirdLatch.setNext(firstLatch);

  Runner runner1 = new Runner(new int[] { 1, 4, 7 }, 1, firstLatch);
  Runner runner2 = new Runner(new int[] { 2, 5, 8 }, 2, secondLatch);
  Runner runner3 = new Runner(new int[] { 3, 6, 9 }, 3, thirdLatch);

  ExecutorService service = Executors.newFixedThreadPool(3);
  service.submit(runner2);
  service.submit(runner3);
  service.submit(runner1);

  service.shutdown();
 }

}

class Runner implements Runnable {

 private final SequencialLatch pass;
 private final int[] numbers;
 private final int threadNumber;

 public Runner(int[] numbers, final int threadNumber, SequencialLatch pass) {
  this.numbers = numbers;
  this.pass = pass;
  this.threadNumber = threadNumber;
 }

 @Override
 public void run() {

  for (int taskNumber = 0; taskNumber < numbers.length; taskNumber++) {
   if (!(this.threadNumber == 1 && taskNumber == 0))
    pass.await();
   System.out.println(numbers[taskNumber]);
   pass.nextPerssionRelease();
  }
 }

}

class SequencialLatch {

 private SequencialLatch next;
 private boolean active;

 public void nextPerssionRelease() {
  this.active = false;
  next.active = true;
  synchronized (next) {
   next.notify();
  }
 }

 public void await() {
  while (!active) {
   synchronized (this) {
    try {
     this.wait();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  }
 }

 public void setNext(SequencialLatch existingPass) {
  this.next = existingPass;
 }
}

The same problem can be solved using Lock and Condistion mechanism by designing SequenceLatch. The solution is as mentioned following -
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class SequencialLatchDemo {

 public static void main(String... args) throws InterruptedException {

  SequencialLatch firstLatch = new SequencialLatch();
  SequencialLatch secondLatch = new SequencialLatch();
  SequencialLatch thirdLatch = new SequencialLatch();

  firstLatch.setNext(secondLatch);
  secondLatch.setNext(thirdLatch);
  thirdLatch.setNext(firstLatch);

  Runner runner1 = new Runner(new int[] { 1, 4, 7 }, 1, firstLatch);
  Runner runner2 = new Runner(new int[] { 2, 5, 8 }, 2, secondLatch);
  Runner runner3 = new Runner(new int[] { 3, 6, 9 }, 3, thirdLatch);

  ExecutorService service = Executors.newFixedThreadPool(3);
  service.submit(runner2);
  service.submit(runner3);
  service.submit(runner1);

  service.shutdown();
 }

}

class Runner implements Runnable {

 private final SequencialLatch pass;
 private final int[] numbers;
 private final int threadNumber;

 public Runner(int[] numbers, final int threadNumber, SequencialLatch pass) {
  this.numbers = numbers;
  this.pass = pass;
  this.threadNumber = threadNumber;
 }

 @Override
 public void run() {

  for (int taskNumber = 0; taskNumber < numbers.length; taskNumber++) {
   if (!(this.threadNumber == 1 && taskNumber == 0))
    pass.await();
   System.out.println(numbers[taskNumber]);
   pass.nextPerssionRelease();
  }
 }

}

class SequencialLatch {

 private final static Lock baseLock = new ReentrantLock();
 private final Condition condition = baseLock.newCondition();
 private boolean active;
 private SequencialLatch next;

 public void nextPerssionRelease() {
  baseLock.lock();
  try {
   this.active = false;
   next.active = true;
   next.condition.signal();
  } finally {
   baseLock.unlock();
  }
 }

 public void await() {
  baseLock.lock();
  try {
   while (!active) {
    condition.await();
   }
  } catch (InterruptedException interruptedException) {
   interruptedException.printStackTrace();
  } finally {
   baseLock.unlock();
  }
 }

 public void setNext(SequencialLatch existingPass) {
  this.next = existingPass;
 }
}

Wednesday, 18 September 2013

Max number existing in any of the participating table columns

Question - There are two tables (Ex - test1 & test2 ) having only one column of number type (Ex. number1 & number2). Write a query to find out max number existing in any of these tables.

Sample data setup query - 

CREATE TABLE test1( number1 NUMBER);
insert into test1 values (21);
INSERT INTO test1 VALUES (22);
INSERT INTO test1 VALUES (23);
INSERT INTO test1 VALUES (24);
insert into test1 values (25);


CREATE TABLE test2( number2 NUMBER);
INSERT INTO test2 VALUES (211);
INSERT INTO test2 VALUES (221);
INSERT INTO test2 VALUES (231);
INSERT INTO test2 VALUES (241);
insert into test2 values (251);

Three possible solutions - 

SELECT greatest((SELECT MAX(number1) FROM test1), (SELECT MAX(number2) FROM test2)) MAX FROM dual;

SELECT CASE WHEN (MAX(number1)>MAX(number2))
            THEN MAX(number1)
            ELSE
            MAX(number2)
            END CASE
from test1,test2;

SELECT MAX(number3) FROM 
(
  (SELECT MAX(number1)number3 FROM test1)
  UNION
  (SELECT MAX(number2)number3 FROM test2)
);