1. package se.lesc.stackoverflow.parallelize;
  2.  
  3. import java.io.DataInputStream;
  4. import java.io.IOException;
  5. import java.io.InputStream;
  6. import java.io.OutputStream;
  7. import java.io.PipedInputStream;
  8. import java.io.PipedOutputStream;
  9. import java.nio.ByteBuffer;
  10. import java.util.concurrent.ArrayBlockingQueue;
  11. import java.util.concurrent.BlockingQueue;
  12. import java.util.concurrent.ExecutorService;
  13. import java.util.concurrent.Executors;
  14. import java.util.concurrent.SynchronousQueue;
  15. import java.util.zip.Adler32;
  16.  
  17. /** Simulator of how to "Parallelize tasks but preserving input order in output". */
  18. public class MessageSteamDecoder {
  19.    
  20.     private static final boolean userSerialDecoder = false;
  21.    
  22.     public static void main(String args[]) throws IOException {
  23.         PipedInputStream in = new PipedInputStream(1024*1024);
  24.  
  25.         //Feeder thread to the system
  26.         new Thread(new MessageFeeder(new PipedOutputStream(in)), "Message feeder").start();
  27.        
  28.         //Output queue of the system
  29.         BlockingQueue<Message> outputQueue = new ArrayBlockingQueue<Message>(1000);
  30.        
  31.         //Handler of the output of the system
  32.         new Thread(new OutputHandler(outputQueue), "Output Handler").start();
  33.  
  34.         //Choice of decoder
  35.         if (userSerialDecoder) {
  36.             new Thread(new SerialDecoder(in, outputQueue), "Serial Decoder").start();
  37.         } else {
  38.           new Thread(new ThreadPoolDecoder(in, outputQueue), "Thread Pool Decoder").start();            
  39.         }
  40.         System.out.println("Started.");
  41.     }
  42.  
  43.     /** Receiver of decoded messages. Prints statistics every 10th second. */
  44.     private static class OutputHandler implements Runnable {
  45.         private final BlockingQueue<Message> outputQueue;
  46.  
  47.         private int nextExpectedMessageNumber = 1;
  48.         private int numberOfMessagesPer10Sec = 0;
  49.         private long nextPointInTimeToPrintSpeed = System.currentTimeMillis() + 10000;
  50.        
  51.         public OutputHandler(BlockingQueue<Message> outputQueue) {
  52.             this.outputQueue = outputQueue;
  53.         }
  54.  
  55.         public void run() {
  56.             while (true) {
  57.                 try {
  58.                     handleMessage(outputQueue.take());
  59.                 } catch (Exception e) {
  60.                     e.printStackTrace();
  61.                 }
  62.             }
  63.         }
  64.        
  65.         public void handleMessage(Message message) {
  66.             if (System.currentTimeMillis() > nextPointInTimeToPrintSpeed) {
  67.                 System.out.println(Math.round(numberOfMessagesPer10Sec/10.0/1000) + " K msg/s");
  68.                 numberOfMessagesPer10Sec = 0;
  69.                 nextPointInTimeToPrintSpeed += 10000;
  70.             }
  71.            
  72.             if (message.getMessageNumber() != nextExpectedMessageNumber) {
  73.                 System.err.println("Expected #" + nextExpectedMessageNumber + " but got #" +
  74.                                 message.getMessageNumber());
  75.                 System.exit(1); //Error in program
  76.             }
  77.             numberOfMessagesPer10Sec++;
  78.             nextExpectedMessageNumber++;
  79.         }
  80.     }
  81.    
  82.     /** Feeds "messages" to an OutputStream as fast as possible */
  83.     private static class MessageFeeder implements Runnable {
  84.         private final ByteBuffer messageBuffer = ByteBuffer.wrap(new byte[Message.MESSAGE_SIZE]);
  85.         private final OutputStream out;
  86.  
  87.         MessageFeeder(OutputStream out) {
  88.             this.out = out;
  89.         }
  90.  
  91.         public void run() {
  92.             int messageNumber = 0;
  93.             while(true) {
  94.                 try {
  95.                     messageNumber++;
  96.                     messageBuffer.putInt(0,  messageNumber);
  97.                     out.write(messageBuffer.array());
  98.                 } catch (IOException e) {
  99.                     e.printStackTrace();
  100.                 }
  101.             }
  102.         }
  103.     }
  104.    
  105.     /** A Message representation. To make it easy a message is 100 bytes large */
  106.     private static class Message {
  107.         private static final int MESSAGE_SIZE = 100;
  108.         private final byte[] messageBytes;
  109.         private String decodedValue;
  110.         private int messageNumber;
  111.  
  112.         public Message(byte[] messageBytes) {
  113.             this.messageBytes = messageBytes;
  114.         }
  115.        
  116.         /** Decodes a message. Might take some time. Mostly dummy code */
  117.         public void decode() {
  118.             ByteBuffer messageBuffer = ByteBuffer.wrap(messageBytes);
  119.             messageNumber = messageBuffer.getInt(0);
  120.             decodedValue = Integer.toHexString(messageNumber);
  121.             Adler32 checkSumCalculator = new Adler32();
  122.             checkSumCalculator.update(messageBytes);
  123.             long checksum = checkSumCalculator.getValue();
  124.             decodedValue = "Decoded value: " + decodedValue + ", checksum: " + checksum;
  125.         }
  126.        
  127.         public int getMessageNumber() {
  128.             return messageNumber;
  129.         }
  130.        
  131.         public static Message readMessage(DataInputStream in) throws IOException {
  132.             byte[] messageBytes = new byte[MESSAGE_SIZE];
  133.             in.readFully(messageBytes);
  134.             return new Message(messageBytes);
  135.         }
  136.     }
  137.    
  138.     /** The simplest form of Decoder that only uses one thread */
  139.     private static class SerialDecoder implements Runnable {
  140.         private final DataInputStream in;
  141.         private final BlockingQueue<Message> outputQueue;
  142.  
  143.         public SerialDecoder(InputStream in, BlockingQueue<Message> outputQueue) {
  144.             this.in = new DataInputStream(in);
  145.             this.outputQueue = outputQueue;
  146.         }
  147.        
  148.         @Override
  149.         public void run() {
  150.             while(true) {
  151.                 try {
  152.                     Message message = Message.readMessage(in);
  153.                     message.decode();
  154.                     outputQueue.put(message);
  155.                 } catch (Exception e) {
  156.                     e.printStackTrace();
  157.                 }
  158.             }
  159.         }
  160.     }
  161.    
  162.     /** Naive parallelizing implementation using a thread pool */
  163.     private static class ThreadPoolDecoder implements Runnable  {
  164.         private static final int NUMBER_OF_THREADS = 12;
  165.        
  166.         private final DataInputStream in;
  167.         private final BlockingQueue<Message> outputQueue;
  168.         private ExecutorService threadPool;
  169.         private int currentWorkerIndex = 0;
  170.  
  171.         private DecoderWorker[] workers;
  172.  
  173.         public ThreadPoolDecoder(InputStream in, BlockingQueue<Message> outputQueue) {
  174.             this.in = new DataInputStream(in);
  175.             this.outputQueue = outputQueue;
  176.            
  177.             threadPool = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
  178.             workers = new DecoderWorker[NUMBER_OF_THREADS];
  179.             for (int i = 0; i < NUMBER_OF_THREADS ; i++) {
  180.                 workers[i] = new DecoderWorker();
  181.                 threadPool.submit(workers[i]);
  182.             }
  183.         }
  184.  
  185.         public void run() {
  186.             int numberOfMessagesProcessed = 0;
  187.             while (true) {
  188.                 try {
  189.                     Message message = Message.readMessage(in);
  190.                     workers[currentWorkerIndex].inQueue.put(message);
  191.                     currentWorkerIndex++;
  192.                    
  193.                     if (currentWorkerIndex == NUMBER_OF_THREADS) {
  194.                         currentWorkerIndex = 0;
  195.                     }
  196.  
  197.                     numberOfMessagesProcessed++;
  198.                     if (numberOfMessagesProcessed >= NUMBER_OF_THREADS) {
  199.                         outputQueue.put(workers[currentWorkerIndex].outQueue.take());
  200.                     }
  201.                 } catch (Exception e) {
  202.                     e.printStackTrace();
  203.                 }
  204.             }
  205.         }
  206.        
  207.         /** Worker used to decode messages */
  208.         private class DecoderWorker implements Runnable {
  209.             SynchronousQueue<Message> inQueue =  new SynchronousQueue<Message>();
  210.             SynchronousQueue<Message> outQueue =  new SynchronousQueue<Message>();
  211.  
  212.             public void run() {
  213.                 while (true) {
  214.                     try {
  215.                         Message message = inQueue.take();
  216.                         message.decode();
  217.                         outQueue.put(message);
  218.                     } catch (InterruptedException e) {
  219.                         e.printStackTrace();
  220.                     }
  221.                 }
  222.             }
  223.         }
  224.     }
  225. }