Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.ck;
- import java.io.BufferedInputStream;
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.util.Arrays;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.LinkedBlockingQueue;
- public class TestRunner {
- public static void main(String[] args) {
- BlockingQueue<byte[]> queue = new LinkedBlockingQueue<byte[]>();
- ExecutorService executor = Executors.newCachedThreadPool();
- System.out.println("start");
- Producer p = new Producer(queue);
- Consumer c = new Consumer(queue);
- p.run();
- c.run();
- System.out.println("wait end");
- // while (true) {
- // executor.submit(new Producer(queue));
- // }
- }
- private static class Consumer implements Runnable {
- private BlockingQueue<byte[]> queue;
- private boolean running = true;
- public Consumer(BlockingQueue<byte[]> queue) {
- this.queue = queue;
- }
- @Override
- public void run() {
- while (true) {
- try {
- while (!this.queue.isEmpty()) {
- byte[] takeout;
- takeout = this.queue.take();
- System.out.println("> "+new String(takeout, 0, takeout.length));
- }
- // if (!isRunning) {
- // break;
- // }
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- private static class Producer implements Runnable {
- private BlockingQueue<byte[]> queue;
- private boolean running = true;
- public Producer(BlockingQueue<byte[]> queue) {
- this.queue = queue;
- }
- @Override
- public void run() {
- while (running) {
- BufferedInputStream bufferedInputStream = new BufferedInputStream(
- System.in);
- byte[] data = new byte[350];
- byte[] b;
- ByteBuffer buffer = ByteBuffer.allocate(1024 * 1014);
- int nRead = 0;
- int i = 0;
- int start = 0;
- int end = 0;
- int new_line = 0;
- try {
- while ((nRead = bufferedInputStream.read(data, 0, data.length)) != -1) {
- new_line = 0;
- start = 0;
- end = 0;
- i = 0;
- if (buffer.position() != 0) {
- System.out.println("1");
- System.out.println("1-1>"+buffer);
- buffer.put(data);
- System.out.println("1-2>"+buffer);
- buffer.flip();
- b = new byte[buffer.remaining()];
- buffer.get(b);
- System.out.println("1-3>"+b.length);
- buffer.clear();
- System.out.println("1-4>"+buffer);
- } else {
- System.out.println("2");
- b = data;
- }
- for (i = 0; i < b.length; i++) {
- if (b[i] == 10) {
- new_line += 1;
- System.out.println("new line "+new_line);
- if (b[i + 1] == 64 && new_line == 4) {
- end = i + 1;
- System.out.println("W1> "+start+" - "+end);
- this.queue.put(Arrays.copyOfRange(b, start, end));
- System.out.println("stor> "+new String(Arrays.copyOfRange(b, start, end)));
- start = i + 1;
- new_line = 0;
- }
- }
- if (b[i] == 0) {
- end = i;
- break;
- }
- }
- System.out.println("before adjust> "+start+" : "+end+" : "+i);
- if (start == end) end = i;
- //this.queue.put(Arrays.copyOfRange(b, start, end));
- //System.out.println("stor2> "+new String(Arrays.copyOfRange(b, start, end)));
- System.out.println("W2> "+start+" - "+end);
- System.out.println("2-1>"+buffer);
- buffer.put(Arrays.copyOfRange(b, start, end));
- System.out.println("2-2>"+buffer);
- Thread.sleep(500);
- }
- System.out.println("W3> "+start+" - "+end);
- System.out.println("3-1>"+buffer);
- buffer.flip();
- System.out.println("3-2>"+buffer);
- b = new byte[buffer.remaining()];
- System.out.println("3-3>"+b.length);
- buffer.get(b);
- this.queue.put(b);
- System.out.println("stor3> "+new String(b));
- bufferedInputStream.close();
- running = false;
- } catch (IOException | InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- private void put(byte[] realPack) throws InterruptedException {
- this.queue.put(realPack);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement