Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package multisoft.martv;
- import android.content.res.AssetManager;
- import android.support.annotation.Nullable;
- import android.text.TextUtils;
- import android.util.Log;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.io.PrintStream;
- import java.net.DatagramSocket;
- import java.net.DatagramPacket;
- import java.net.InetAddress;
- import java.net.MulticastSocket;
- import java.net.ServerSocket;
- import java.net.Socket;
- import java.net.SocketException;
- import java.net.SocketTimeoutException;
- import java.net.UnknownHostException;
- import java.nio.ByteBuffer;
- import java.nio.ByteOrder;
- import java.util.concurrent.TimeoutException;
- /**
- * Created by admin on 10/4/17.
- */
- public class HTTPThread implements Runnable {
- private static final String TAG = "HTTPThread";
- /**
- * The port number we listen to
- */
- private final int mPort;
- /**
- * The port number we listen to
- */
- private int bufferSizePkts = 4096;
- private int burstLen = 64;
- /**
- * True if the server is running.
- */
- private boolean mIsRunning;
- /**
- * number of recovery requests running.
- */
- private int numReqRunning;
- private int channelGlobalID;
- /**
- * The {@link java.net.ServerSocket} that we listen to.
- */
- private ServerSocket mServerSocket;
- /**
- * Multicast socket and group
- */
- private MulticastSocket mcastSocket;
- private InetAddress group;
- /**
- * WebServer constructor.
- */
- public HTTPThread(int port) {
- mPort = port;
- }
- /**
- * receive byuffer;
- */
- public byte[] mainBuffer = new byte[bufferSizePkts * 1316];
- /**
- * This method starts the web server listening to the specified port.
- */
- public void start() {
- Log.i(TAG, "Starting HTTP Thread");
- mIsRunning = true;
- MainActivity.socketThread = new Thread(this);
- MainActivity.socketThread.start();
- new Thread(this).start();
- }
- /**
- * This method stops the web server
- */
- public void stop() {
- try {
- mIsRunning = false;
- if (null != mServerSocket) {
- mServerSocket.close();
- mServerSocket = null;
- }
- if (null != mcastSocket){
- mcastSocket.leaveGroup(group);
- mcastSocket.close();
- }
- } catch (IOException e) {
- Log.e(TAG, "Error closing the server socket.", e);
- }
- }
- private Socket acceptSocket;
- @Override
- public void run() {
- try {
- mServerSocket = new ServerSocket(mPort);
- while (mIsRunning) {
- //Log.d(TAG, "waiting for accept");
- acceptSocket = mServerSocket.accept();
- mainBuffer = new byte[bufferSizePkts * 1316];
- handle(acceptSocket);
- acceptSocket.close();
- //Log.d(TAG, "Finished request waiting for next " + mIsRunning);
- }
- } catch (SocketException e) {
- Log.e(TAG, "socket error.", e);
- } catch (IOException e) {
- Log.e(TAG, "Web server error.", e);
- } catch (Throwable t){
- Log.e(TAG, "Web server error.", t);
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * Respond to a request from a client.
- *
- * @param socket The client socket.
- * @throws IOException e
- */
- private void handle(Socket socket) throws IOException {
- BufferedReader reader = null;
- PrintStream output = null;
- try {
- String route = null;
- // Read HTTP headers and parse out the route.
- reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
- String line;
- while (!TextUtils.isEmpty(line = reader.readLine())) {
- if (line.startsWith("GET /")) {
- int start = line.indexOf('/') + 1;
- int end = line.indexOf(' ', start);
- route = line.substring(start, end);
- break;
- }
- }
- // Output stream that we send the response to
- output = new PrintStream(socket.getOutputStream());
- // Prepare the content to send.
- if (null == route) {
- writeServerError(output);
- return;
- }
- loadBufferedStreamThread(output, route);
- } finally {
- if (null != output) {
- output.close();
- }
- if (null != reader) {
- reader.close();
- }
- }
- }
- /**
- * Writes a server error response (HTTP/1.0 500) to the given output stream.
- *
- * @param output The output stream.
- */
- private void writeServerError(PrintStream output) {
- output.println("HTTP/1.0 500 Internal Server Error");
- output.flush();
- }
- /**
- * Returns pktno from byte array, numbers are assigned on retransmiting server;
- */
- private long getPktNo(byte[] buf, int off) {
- ByteBuffer buffer = ByteBuffer.allocate(8);
- buffer.put(buf, off, 8);
- buffer.flip();
- buffer.order(ByteOrder.LITTLE_ENDIAN);
- return buffer.getLong();
- }
- /**
- * writes value to byte array chaning endianness
- */
- private byte[] byteFlipLong(long data) {
- byte[] byteData = new byte[8];
- int i;
- for(i = 0; i < 8; i++){
- byteData[i] = (byte)(data >> (i * 8));
- }
- return byteData;
- }
- /**
- * writes value to byte array chaning endianness
- */
- private byte[] byteFlipInt(int data) {
- byte[] byteData = new byte[4];
- int i;
- for(i = 0; i < 4; i++){
- byteData[i] = (byte)(data >> (i * 8));
- }
- return byteData;
- }
- /**
- * Loads all the content of {@code fileName}.
- *
- * @param output The output stream.
- * @param route route string.
- */
- private String command;
- private int chId;
- private String recoveryList;
- private String groupUri;
- private int groupPort;
- private void getVariables(PrintStream output, String route){
- Log.i(TAG, "Got route: " + route);
- int start = 0;
- int end = route.indexOf('?');
- if(end == -1){
- writeServerError(output);
- return;
- }
- command = route.substring(start, end);
- start = route.indexOf('=') + 1;
- end = route.indexOf('&');
- if(start == -1 || end == -1){
- writeServerError(output);
- return;
- }
- chId = Integer.parseInt(route.substring(start, end));
- start = route.indexOf('=', start + 1) + 1;
- end = route.indexOf('&', end + 1 );
- if(start == -1 || end == -1){
- writeServerError(output);
- return;
- }
- String uri = route.substring(start, end);
- start = route.indexOf('=', start + 1) + 1;
- end = route.length();
- if(start == -1 || end == -1){
- writeServerError(output);
- return;
- }
- recoveryList = route.substring(start, end);
- start = uri.indexOf('/') + 2;
- end = uri.indexOf(':', start);
- groupUri = uri.substring(start, end);
- groupPort = Integer.parseInt(uri.substring(end + 1, uri.length()));
- Log.i(TAG, "Channel id: " + chId + "; Uri for said channel: " + groupUri + "; List of recovery servers: " + recoveryList);
- }
- /**
- * Requests initial burst of buffer
- *
- * @param chID stream id
- * @param start expected sequence
- * @param end received sequence
- * @param recoveryList list of recovery servers
- * @return recovered pkts
- * @throws IOException e
- */
- private long burstLast = 0;
- private int burst(int chID, long start, long end, String recoveryList) throws IOException {
- String primaryRecovery = recoveryList.substring(0, recoveryList.indexOf(';', 0));
- InetAddress recoveryAddress = InetAddress.getByName(primaryRecovery);
- DatagramSocket recoverySocket = new DatagramSocket();
- byte[] sendData = new byte[37];
- byte[] tmpb = byteFlipInt(chID);
- System.arraycopy(tmpb, 0, sendData, 0 ,4);
- tmpb = byteFlipLong(start);
- System.arraycopy(tmpb, 0, sendData, 4 ,8);
- tmpb = byteFlipLong(end);
- System.arraycopy(tmpb, 0, sendData, 12 ,8);
- String mac = Utils.getMacAddress();
- byte yleoba[] = mac.getBytes();
- for(int bi = 20, z = 0; bi < 37; bi++, z++){
- sendData[bi] = yleoba[z];
- }
- //Log.d(TAG, "Sending " + Utils.bytesToHex(sendData));
- DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, recoveryAddress, 16969);
- recoverySocket.send(sendPacket);
- //Log.d(TAG, "start " + start + " end " + end);
- long bytesToRecv = bufferSizePkts * 1316;
- if(end > 0 ){
- bytesToRecv = end * 1316;
- }
- //Log.d(TAG, "Excepcting " + end + " pkts " + bytesToRecv * 1316 + " bvtes ");
- byte[] receiveData = new byte[1316];
- DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
- recoverySocket.setSoTimeout(100);
- int recLen = 0;
- while(recLen < bytesToRecv){
- try {
- recoverySocket.receive(receivePacket);
- recLen += receivePacket.getLength();
- byte[] tmp = receivePacket.getData();
- long RecoveredPktNo = getPktNo(tmp, 1133);
- //Log.d(TAG, "Initial Burst: recovered pkt" + Utils.bytesToHex(tmp));
- if(RecoveredPktNo < 0 || RecoveredPktNo >= bufferSizePkts){
- Log.e(TAG, "Initial Burst: Recvd damaged pkt should recover this separately " + RecoveredPktNo + Utils.bytesToHex(tmp));
- continue;
- }
- int offset = (int)(1316 * RecoveredPktNo);
- Log.i(TAG, "Initial Burst: Writing recovered pkt no " + RecoveredPktNo + " at offset " + offset);
- System.arraycopy(tmp, 0, mainBuffer, offset, 1316);
- burstLast = getPktNo(tmp, 1133);
- }catch (Throwable te) {
- Log.e(TAG, "Failed to recv initial burst ", te);
- break;
- }
- }
- if(recLen == bytesToRecv){
- //Log.d(TAG, "Burst: recvd " + receivePacket.getLength() + " bytes");
- }else{
- //Log.d(TAG, "Burst: recvd wrong" + receivePacket.getLength());
- }
- return recLen;
- }
- /*
- * recovewry thread lass
- */
- private class recoverThread implements Runnable {
- int chID;
- long startPktNo;
- long endPktNo;
- String recoveryList;
- public recoverThread(int tchID, long tstart, long tend, String trecoveryList){
- chID = tchID;
- channelGlobalID = chID;
- startPktNo = tstart;
- endPktNo = tend;
- recoveryList = trecoveryList;
- }
- @Override
- public void run() {
- String primaryRecovery = recoveryList.substring(0, recoveryList.indexOf(';', 0));
- try {
- while(numReqRunning > 5){
- Thread.sleep(50);
- if(numReqRunning > 15){
- return;
- }
- }
- if(chID != channelGlobalID){
- Log.e(TAG, "channel changed while waiting");
- return;
- }
- InetAddress recoveryAddress = InetAddress.getByName(primaryRecovery);
- DatagramSocket recoverySocket = new DatagramSocket();
- byte[] sendData = new byte[37];
- byte[] tmpb = byteFlipInt(chID);
- System.arraycopy(tmpb, 0, sendData, 0 ,4);
- tmpb = byteFlipLong(startPktNo);
- System.arraycopy(tmpb, 0, sendData, 4 ,8);
- tmpb = byteFlipLong(endPktNo);
- System.arraycopy(tmpb, 0, sendData, 12 ,8);
- String mac = Utils.getMacAddress();
- byte yleoba[] = mac.getBytes();
- for (int bi = 20, z = 0; bi < 37; bi++, z++) {
- sendData[bi] = yleoba[z];
- }
- //Log.d(TAG, "Req id: " + startPktNo + " Sending " + Utils.bytesToHex(sendData));
- DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, recoveryAddress, 16969);
- try {
- recoverySocket.send(sendPacket);
- }catch (IOException ie){
- Log.e(TAG, "Req id: " + startPktNo + " IO error");
- return;
- }
- //Log.d(TAG, "Req id: " + startPktNo + " start " + startPktNo + " end " + endPktNo);
- long bytesToRecv;
- if (startPktNo > endPktNo) {
- bytesToRecv = ((bufferSizePkts - startPktNo) + endPktNo);
- } else {
- bytesToRecv = endPktNo - startPktNo;
- }
- Log.i(TAG, "Req id: " + startPktNo + " Excepcting " + bytesToRecv + " pkts " + bytesToRecv * 1316 + " bvtes ");
- bytesToRecv = bytesToRecv * 1316;
- byte[] receiveData = new byte[1316];
- DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
- recoverySocket.setSoTimeout(50);
- int recLen = 0;
- int retries = 0;
- long RecoveredPktNo = -1;
- while(recLen < bytesToRecv){
- try {
- recoverySocket.receive(receivePacket);
- recLen += receivePacket.getLength();
- byte[] tmp = receivePacket.getData();
- RecoveredPktNo = getPktNo(tmp, 1133);
- //Log.d(TAG, "Req id: " + startPktNo + " recovered pkt" + Utils.bytesToHex(tmp));
- if(RecoveredPktNo < 0 || RecoveredPktNo >= bufferSizePkts){
- Log.e(TAG, "Req id: " + startPktNo + " Recvd damaged pkt should recover this separately " + RecoveredPktNo + Utils.bytesToHex(tmp));
- continue;
- }
- int offset = (int)(1316 * RecoveredPktNo);
- Log.i(TAG, "Req id: " + startPktNo + " Writing recovered pkt no " + RecoveredPktNo + " at offset " + offset);
- System.arraycopy(tmp, 0, mainBuffer, offset, 1316);
- if(RecoveredPktNo == endPktNo -1){
- break;
- }
- }catch (Throwable te) {
- if(RecoveredPktNo == endPktNo){break;}
- if(RecoveredPktNo == -1){RecoveredPktNo = startPktNo;}
- retries += 1;
- recLen = 0;
- if(retries >= 4){
- break;
- }
- Log.e(TAG, "Req id: " + startPktNo + " Timeout retry number " + retries + " retrying from " + RecoveredPktNo);
- sendData = new byte[37];
- tmpb = byteFlipInt(chID);
- System.arraycopy(tmpb, 0, sendData, 0 ,4);
- tmpb = byteFlipLong(RecoveredPktNo);
- System.arraycopy(tmpb, 0, sendData, 4 ,8);
- tmpb = byteFlipLong(endPktNo);
- System.arraycopy(tmpb, 0, sendData, 12 ,8);
- for (int bi = 20, z = 0; bi < 37; bi++, z++) {
- sendData[bi] = yleoba[z];
- }
- //Log.d(TAG, "Req id: " + startPktNo + " Sending " + Utils.bytesToHex(sendData));
- sendPacket = new DatagramPacket(sendData, sendData.length, recoveryAddress, 16969);
- try {
- recoverySocket.send(sendPacket);
- }catch (IOException ie){
- Log.e(TAG, "Req id: " + startPktNo + " IO error");
- return;
- }
- }
- }
- if (recLen == bytesToRecv) {
- Log.i(TAG, "Req id: " + startPktNo + " recvd " + recLen + " bytes");
- } else {
- Log.e(TAG, "Req id: " + startPktNo + " recvd smthn wierd " + recLen);
- }
- }catch (SocketException e){
- Log.e(TAG, "Req id: " + startPktNo + " cannot create socket", e);
- }catch (UnknownHostException he){
- Log.e(TAG, "Req id: " + startPktNo + " what host ", he);
- }catch (Throwable e){
- Log.e(TAG, "Exception ", e);
- }
- numReqRunning -= 1;
- }
- }
- /**
- * Loads all the content of {@code fileName}.
- *
- * @param output The output stream.
- * @throws IOException e
- */
- private void loadBufferedStreamThread(PrintStream output, String route) throws IOException {
- // Send out the content.
- // http://localhost:6969/live?ch=4&uri=udp://226.0.0.1:12345&recovery_list=recovery1.bintv.com;recovery2.bintv.com;
- if(route.length() <= 1){
- writeServerError(output);
- return;
- }
- getVariables(output, route);
- group = InetAddress.getByName(groupUri);
- mcastSocket = new MulticastSocket(groupPort);
- mcastSocket.setSoTimeout(150);
- int writeIndex = 0;
- int readIndex = 0;
- long prevPktNo = 0;
- long expectedPktNo;
- long buffPrevPktNo = 0;
- boolean checkDistance = true;
- int recoveredLen = burst(chId, -1, 32, recoveryList);
- if(recoveredLen > 0){
- // if(acceptSocket.isConnected()){
- int offset = (int)(burstLast * 1316);
- // int start = (offset - recoveredLen);
- // if(start < 0){
- // Log.e(TAG, "No data recovered");
- // return;
- // }
- // output.write(mainBuffer, offset - recoveredLen, recoveredLen);
- // output.flush();
- //
- prevPktNo = burstLast;
- readIndex = offset;
- // }else{
- // //Log.d(TAG, "Not connected anymore not sending or receiving data");
- // return;
- // }
- }else{
- Log.e(TAG, "Burst failed");
- writeServerError(output);
- return;
- }
- Log.i(TAG, "membership query start");
- try{
- mcastSocket.joinGroup(group);
- Log.i(TAG, "membership query end");
- }catch(Throwable e){
- writeServerError(output);
- Log.e(TAG, "Error joining ", e);
- return;
- }
- byte[] recvBuf = new byte[1316];
- DatagramPacket recvPkt = new DatagramPacket(recvBuf, recvBuf.length);
- output.println("HTTP/1.1 200 OK\r");
- output.println("Content-Type: application/octet-stream\r");
- output.println("\r");
- output.flush();
- int numberOfDups = 0;
- while(mIsRunning){
- try{
- mcastSocket.receive(recvPkt);
- }catch (SocketTimeoutException te){
- Log.e(TAG, "channel is dead");
- }catch(IOException e){
- writeServerError(output);
- Log.e(TAG, "IO exception", e);
- break;
- }
- if(recvPkt.getLength() <= 0){
- Log.i(TAG, "No data recvd");
- continue;
- }
- long pktNo = getPktNo(recvBuf, 1133);
- ////Log.d(TAG, "got pkt no " + pktNo + " write index " + writeIndex);
- if(prevPktNo == pktNo){
- Log.e(TAG, "recvd duplicate");
- if(numberOfDups >= 10){
- Log.e(TAG, "too many dups could be all zeros");
- break;
- }
- numberOfDups += 1;
- continue;
- }
- expectedPktNo = prevPktNo + 1;
- if(expectedPktNo == bufferSizePkts){
- expectedPktNo = 0;
- }
- if(expectedPktNo != pktNo){
- Log.i(TAG, "Req id: " + expectedPktNo + " expected " + expectedPktNo + " got " + pktNo);
- new Thread(new recoverThread(chId, expectedPktNo, pktNo, recoveryList)).start();
- numReqRunning += 1;
- Log.i(TAG, "Req id: " + expectedPktNo + " created thread, number of current requests " + numReqRunning);
- }
- prevPktNo = pktNo;
- int pktLen = recvPkt.getLength();
- byte []pktData = recvPkt.getData();
- writeIndex = (int)(1316 * pktNo);
- System.arraycopy(pktData, 0, mainBuffer, writeIndex, pktLen);
- if(checkDistance){
- readIndex = writeIndex - (burstLen * 1316);
- if(readIndex < 0 ){
- readIndex = (bufferSizePkts * 1316) + (readIndex);
- }
- checkDistance = false;
- }
- if(acceptSocket.isConnected()){
- long buffPktNo = getPktNo(mainBuffer, readIndex + 1133);
- long buffExpectedPktNo = (buffPrevPktNo + 1);
- if(buffExpectedPktNo == bufferSizePkts){buffExpectedPktNo = 0;}
- if(buffExpectedPktNo != buffPktNo){
- Log.e(TAG, "Expected " + (buffPrevPktNo + 1) + " from buffer got " + buffPktNo + " readindex " + readIndex);
- }else{
- output.write(mainBuffer, readIndex, pktLen);
- output.flush();
- //Log.d(TAG, "wrote pkt " + buffPktNo + " readindex " + readIndex);
- }
- byte []tmp1 = new byte[1316];
- System.arraycopy(tmp1, 0, mainBuffer, readIndex, pktLen);
- readIndex += pktLen;
- if(readIndex == (bufferSizePkts * 1316)){
- readIndex = 0;
- }
- buffPrevPktNo = buffPktNo;
- }else{
- Log.i(TAG, "Not connected anymore not sending or receiving data");
- break;
- }
- if(output.checkError()){
- Log.e(TAG, "Output error");
- break;
- }
- }
- Log.i(TAG, "Leaving channel " + chId);
- mcastSocket.leaveGroup(group);
- mcastSocket.close();
- mcastSocket = null;
- numReqRunning = 0;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement