Advertisement
s4553711

fastqParser

Oct 27th, 2016
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.13 KB | None | 0 0
  1. package com.ck;
  2.  
  3. import java.io.BufferedInputStream;
  4. import java.io.IOException;
  5. import java.nio.ByteBuffer;
  6. import java.util.Arrays;
  7. import java.util.concurrent.BlockingQueue;
  8. import java.util.concurrent.ExecutorService;
  9. import java.util.concurrent.Executors;
  10. import java.util.concurrent.LinkedBlockingQueue;
  11.  
  12. public class TestRunner {
  13.     public static void main(String[] args) {
  14.         BlockingQueue<byte[]> queue = new LinkedBlockingQueue<byte[]>();
  15.         ExecutorService executor = Executors.newCachedThreadPool();
  16.         System.out.println("start");
  17.  
  18.         Producer p = new Producer(queue);
  19.         Consumer c = new Consumer(queue);
  20.         p.run();
  21.         c.run();
  22.         System.out.println("wait end");
  23.  
  24.         // while (true) {
  25.         // executor.submit(new Producer(queue));
  26.         // }
  27.     }
  28.  
  29.     private static class Consumer implements Runnable {
  30.         private BlockingQueue<byte[]> queue;
  31.         private boolean running = true;
  32.  
  33.         public Consumer(BlockingQueue<byte[]> queue) {
  34.             this.queue = queue;
  35.         }
  36.  
  37.         @Override
  38.         public void run() {
  39.             while (true) {
  40.                 try {
  41.                     while (!this.queue.isEmpty()) {
  42.                         byte[] takeout;
  43.                         takeout = this.queue.take();
  44.                         System.out.println("> "+new String(takeout, 0, takeout.length));
  45.                     }
  46.                     // if (!isRunning) {
  47.                     //  break;
  48.                     // }
  49.                     Thread.sleep(100);
  50.                 } catch (InterruptedException e) {
  51.                     e.printStackTrace();
  52.                 }
  53.             }
  54.         }
  55.  
  56.     }
  57.  
  58.     private static class Producer implements Runnable {
  59.         private BlockingQueue<byte[]> queue;
  60.         private boolean running = true;
  61.  
  62.         public Producer(BlockingQueue<byte[]> queue) {
  63.             this.queue = queue;
  64.         }
  65.  
  66.         @Override
  67.         public void run() {
  68.             while (running) {
  69.                 BufferedInputStream bufferedInputStream = new BufferedInputStream(
  70.                         System.in);
  71.                 byte[] data = new byte[350];
  72.                 byte[] b;
  73.                 ByteBuffer buffer = ByteBuffer.allocate(1024 * 1014);
  74.                 int nRead = 0;
  75.                 int i = 0;
  76.                 int start = 0;
  77.                 int end = 0;
  78.                 int new_line = 0;
  79.                 try {
  80.                     while ((nRead = bufferedInputStream.read(data, 0, data.length)) != -1) {
  81.                         new_line = 0;
  82.                         start = 0;
  83.                         end = 0;
  84.                         i = 0;
  85.                         if (buffer.position() != 0) {
  86.                             System.out.println("1");
  87.                             System.out.println("1-1>"+buffer);
  88.                             buffer.put(data);
  89.                             System.out.println("1-2>"+buffer);
  90.                             buffer.flip();
  91.                             b = new byte[buffer.remaining()];
  92.                             buffer.get(b);
  93.                             System.out.println("1-3>"+b.length);
  94.                             buffer.clear();
  95.                             System.out.println("1-4>"+buffer);
  96.                         } else {
  97.                             System.out.println("2");
  98.                             b = data;
  99.                         }
  100.                         for (i = 0; i < b.length; i++) {
  101.                             if (b[i] == 10) {
  102.                                 new_line += 1;
  103.                                 System.out.println("new line "+new_line);
  104.                                 if (b[i + 1] == 64 && new_line == 4) {
  105.                                     end = i + 1;
  106.                                     System.out.println("W1> "+start+" - "+end);
  107.                                     this.queue.put(Arrays.copyOfRange(b, start, end));
  108.                                     System.out.println("stor> "+new String(Arrays.copyOfRange(b, start, end)));
  109.                                     start = i + 1;
  110.                                     new_line = 0;
  111.                                 }
  112.                             }
  113.                             if (b[i] == 0) {
  114.                                 end = i;
  115.                                 break;
  116.                             }
  117.                         }
  118.                         System.out.println("before adjust> "+start+" : "+end+" : "+i);
  119.                         if (start == end) end = i;
  120.                         //this.queue.put(Arrays.copyOfRange(b, start, end));
  121.                         //System.out.println("stor2> "+new String(Arrays.copyOfRange(b, start, end)));
  122.                         System.out.println("W2> "+start+" - "+end);
  123.                         System.out.println("2-1>"+buffer);
  124.                         buffer.put(Arrays.copyOfRange(b, start, end));
  125.                         System.out.println("2-2>"+buffer);
  126.                         Thread.sleep(500);
  127.                     }
  128.                     System.out.println("W3> "+start+" - "+end);
  129.                     System.out.println("3-1>"+buffer);
  130.                     buffer.flip();
  131.                     System.out.println("3-2>"+buffer);
  132.                     b = new byte[buffer.remaining()];
  133.                     System.out.println("3-3>"+b.length);
  134.                     buffer.get(b);
  135.                     this.queue.put(b);
  136.                     System.out.println("stor3> "+new String(b));
  137.                    
  138.                     bufferedInputStream.close();
  139.                     running = false;
  140.                 } catch (IOException | InterruptedException e) {
  141.                     e.printStackTrace();
  142.                 }
  143.             }
  144.         }
  145.  
  146.         private void put(byte[] realPack) throws InterruptedException {
  147.             this.queue.put(realPack);
  148.         }
  149.     }
  150. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement