ggregory

AsyncReverseProxyExampleNoLogging

Nov 11th, 2018
213
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5 31.29 KB | None | 0 0
  1. /*
  2.  * ====================================================================
  3.  * Licensed to the Apache Software Foundation (ASF) under one
  4.  * or more contributor license agreements.  See the NOTICE file
  5.  * distributed with this work for additional information
  6.  * regarding copyright ownership.  The ASF licenses this file
  7.  * to you under the Apache License, Version 2.0 (the
  8.  * "License"); you may not use this file except in compliance
  9.  * with the License.  You may obtain a copy of the License at
  10.  *
  11.  *   http://www.apache.org/licenses/LICENSE-2.0
  12.  *
  13.  * Unless required by applicable law or agreed to in writing,
  14.  * software distributed under the License is distributed on an
  15.  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  16.  * KIND, either express or implied.  See the License for the
  17.  * specific language governing permissions and limitations
  18.  * under the License.
  19.  * ====================================================================
  20.  *
  21.  * This software consists of voluntary contributions made by many
  22.  * individuals on behalf of the Apache Software Foundation.  For more
  23.  * information on the Apache Software Foundation, please see
  24.  * <http://www.apache.org/>.
  25.  *
  26.  */
  27. package org.apache.hc.core5.http.examples;
  28.  
  29. import java.io.IOException;
  30. import java.io.InterruptedIOException;
  31. import java.net.InetSocketAddress;
  32. import java.nio.ByteBuffer;
  33. import java.nio.CharBuffer;
  34. import java.nio.charset.StandardCharsets;
  35. import java.util.Arrays;
  36. import java.util.Collections;
  37. import java.util.HashSet;
  38. import java.util.Iterator;
  39. import java.util.List;
  40. import java.util.Locale;
  41. import java.util.Set;
  42. import java.util.concurrent.TimeUnit;
  43. import java.util.concurrent.atomic.AtomicLong;
  44.  
  45. import org.apache.hc.core5.concurrent.FutureCallback;
  46. import org.apache.hc.core5.function.Supplier;
  47. import org.apache.hc.core5.http.ConnectionClosedException;
  48. import org.apache.hc.core5.http.ContentType;
  49. import org.apache.hc.core5.http.EntityDetails;
  50. import org.apache.hc.core5.http.Header;
  51. import org.apache.hc.core5.http.HeaderElements;
  52. import org.apache.hc.core5.http.HttpConnection;
  53. import org.apache.hc.core5.http.HttpException;
  54. import org.apache.hc.core5.http.HttpHeaders;
  55. import org.apache.hc.core5.http.HttpHost;
  56. import org.apache.hc.core5.http.HttpRequest;
  57. import org.apache.hc.core5.http.HttpResponse;
  58. import org.apache.hc.core5.http.HttpStatus;
  59. import org.apache.hc.core5.http.impl.BasicEntityDetails;
  60. import org.apache.hc.core5.http.impl.Http1StreamListener;
  61. import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
  62. import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
  63. import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
  64. import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
  65. import org.apache.hc.core5.http.impl.nio.ExpandableBuffer;
  66. import org.apache.hc.core5.http.message.BasicHttpRequest;
  67. import org.apache.hc.core5.http.message.BasicHttpResponse;
  68. import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
  69. import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
  70. import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
  71. import org.apache.hc.core5.http.nio.CapacityChannel;
  72. import org.apache.hc.core5.http.nio.DataStreamChannel;
  73. import org.apache.hc.core5.http.nio.RequestChannel;
  74. import org.apache.hc.core5.http.nio.ResponseChannel;
  75. import org.apache.hc.core5.http.protocol.HttpContext;
  76. import org.apache.hc.core5.io.CloseMode;
  77. import org.apache.hc.core5.pool.ConnPoolListener;
  78. import org.apache.hc.core5.pool.ConnPoolStats;
  79. import org.apache.hc.core5.pool.PoolStats;
  80. import org.apache.hc.core5.reactor.IOReactorConfig;
  81. import org.apache.hc.core5.util.TimeValue;
  82. import org.apache.hc.core5.util.Timeout;
  83.  
  84. /**
  85.  * Example of asynchronous embedded  HTTP/1.1 reverse proxy with full content streaming.
  86.  */
  87. public class AsyncReverseProxyExampleNoLogging {
  88.  
  89.     static void println(String msg) {
  90.         System.out.println(msg);
  91.     }
  92.  
  93.     public static void main(final String[] args) throws Exception {
  94.         if (args.length < 1) {
  95.             System.out.println("Usage: <hostname[:port]> [listener port]");
  96.             System.exit(1);
  97.         }
  98.         // Target host
  99.         final HttpHost targetHost = HttpHost.create(args[0]);
  100.         int port = 8080;
  101.         if (args.length > 1) {
  102.             port = Integer.parseInt(args[1]);
  103.         }
  104.  
  105.         println("Reverse proxy to " + targetHost);
  106.  
  107.         final IOReactorConfig config = IOReactorConfig.custom()
  108.             .setSoTimeout(10, TimeUnit.SECONDS)
  109.             .setIoThreadCount(1)
  110.             .setSelectInterval(TimeValue.ofMilliseconds(10000))
  111.             .build();
  112.  
  113.         final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
  114.                 .setIOReactorConfig(config)
  115.                 .setConnPoolListener(new ConnPoolListener<HttpHost>() {
  116.  
  117.                     @Override
  118.                     public void onLease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
  119.                         final StringBuilder buf = new StringBuilder();
  120.                         buf.append("[proxy->origin] connection leased ").append(route);
  121.                         //println(buf.toString());
  122.                     }
  123.  
  124.                     @Override
  125.                     public void onRelease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
  126.                         final StringBuilder buf = new StringBuilder();
  127.                         buf.append("[proxy->origin] connection released ").append(route);
  128.                         final PoolStats totals = connPoolStats.getTotalStats();
  129.                         buf.append("; total kept alive: ").append(totals.getAvailable()).append("; ");
  130.                         buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
  131.                         buf.append(" of ").append(totals.getMax());
  132.                         //println(buf.toString());
  133.                     }
  134.  
  135.                 })
  136.                 .setStreamListener(new Http1StreamListener() {
  137.  
  138.                     @Override
  139.                     public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
  140.                         // empty
  141.                     }
  142.  
  143.                     @Override
  144.                     public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
  145.                         // empty
  146.                     }
  147.  
  148.                     @Override
  149.                     public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
  150.                         //println("[proxy<-origin] connection " + connection +
  151.                                 //(keepAlive ? " kept alive" : " cannot be kept alive"));
  152.                     }
  153.  
  154.                 })
  155.                 .setMaxTotal(100)
  156.                 .setDefaultMaxPerRoute(20)
  157.                 .create();
  158.  
  159.         final IOReactorConfig config2 = IOReactorConfig.custom()
  160.                         .setSoTimeout(10, TimeUnit.SECONDS)
  161.                         .setIoThreadCount(1)
  162.                         .setSelectInterval(TimeValue.ofMilliseconds(1))
  163.                         .build();
  164.  
  165.         final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
  166.                 .setIOReactorConfig(config2)
  167.                 .setStreamListener(new Http1StreamListener() {
  168.  
  169.                     @Override
  170.                     public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
  171.                         // empty
  172.                     }
  173.  
  174.                     @Override
  175.                     public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
  176.                         // empty
  177.                     }
  178.  
  179.                     @Override
  180.                     public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
  181.                         //println("[client<-proxy] connection " + connection +
  182.                                 //(keepAlive ? " kept alive" : " cannot be kept alive"));
  183.                     }
  184.  
  185.                 })
  186.                 .register("*", new Supplier<AsyncServerExchangeHandler>() {
  187.  
  188.                     @Override
  189.                     public AsyncServerExchangeHandler get() {
  190.                         return new IncomingExchangeHandler(targetHost, requester);
  191.                     }
  192.  
  193.                 })
  194.                 .create();
  195.  
  196.         Runtime.getRuntime().addShutdownHook(new Thread() {
  197.             @Override
  198.             public void run() {
  199.                 //println("Reverse proxy shutting down");
  200.                 server.close(CloseMode.GRACEFUL);
  201.                 requester.close(CloseMode.GRACEFUL);
  202.             }
  203.         });
  204.  
  205.         requester.start();
  206.         server.start();
  207.         server.listen(new InetSocketAddress(port));
  208.         println("Listening on port " + port);
  209.  
  210.         server.awaitShutdown(TimeValue.ofDays(Long.MAX_VALUE));
  211.     }
  212.  
  213.     private static class ProxyBuffer extends ExpandableBuffer {
  214.  
  215.         ProxyBuffer(final int bufferSize) {
  216.             super(bufferSize);
  217.         }
  218.  
  219.         void put(final ByteBuffer src) {
  220.             setInputMode();
  221.             final int requiredCapacity = buffer().position() + src.remaining();
  222.             ensureCapacity(requiredCapacity);
  223.             buffer().put(src);
  224.         }
  225.  
  226.         int write(final DataStreamChannel channel) throws IOException {
  227.             setOutputMode();
  228.             if (buffer().hasRemaining()) {
  229.                 return channel.write(buffer());
  230.             }
  231.             return 0;
  232.         }
  233.  
  234.     }
  235.  
  236.     private static final AtomicLong COUNT = new AtomicLong(0);
  237.  
  238.     private static class ProxyExchangeState {
  239.  
  240.         final String id;
  241.  
  242.         HttpRequest request;
  243.         EntityDetails requestEntityDetails;
  244.         DataStreamChannel requestDataChannel;
  245.         CapacityChannel requestCapacityChannel;
  246.         ProxyBuffer inBuf;
  247.         boolean inputEnd;
  248.  
  249.         HttpResponse response;
  250.         EntityDetails responseEntityDetails;
  251.         ResponseChannel responseMessageChannel;
  252.         DataStreamChannel responseDataChannel;
  253.         CapacityChannel responseCapacityChannel;
  254.         ProxyBuffer outBuf;
  255.         boolean outputEnd;
  256.  
  257.         AsyncClientEndpoint clientEndpoint;
  258.  
  259.         ProxyExchangeState() {
  260.             this.id = String.format("%08X", COUNT.getAndIncrement());
  261.         }
  262.  
  263.     }
  264.  
  265.     private static final int INIT_BUFFER_SIZE = 4096;
  266.  
  267.     private static class IncomingExchangeHandler implements AsyncServerExchangeHandler {
  268.  
  269.         private final HttpHost targetHost;
  270.         private final HttpAsyncRequester requester;
  271.         private final ProxyExchangeState exchangeState;
  272.  
  273.         IncomingExchangeHandler(final HttpHost targetHost, final HttpAsyncRequester requester) {
  274.             super();
  275.             this.targetHost = targetHost;
  276.             this.requester = requester;
  277.             this.exchangeState = new ProxyExchangeState();
  278.         }
  279.  
  280.         @Override
  281.         public void handleRequest(
  282.                 final HttpRequest incomingRequest,
  283.                 final EntityDetails entityDetails,
  284.                 final ResponseChannel responseChannel,
  285.                 final HttpContext httpContext) throws HttpException, IOException {
  286.  
  287.             synchronized (exchangeState) {
  288.                 //println("[client->proxy] " + exchangeState.id + " " +
  289.                         //incomingRequest.getMethod() + " " + incomingRequest.getRequestUri());
  290.                 exchangeState.request = incomingRequest;
  291.                 exchangeState.requestEntityDetails = entityDetails;
  292.                 exchangeState.inputEnd = entityDetails == null;
  293.                 exchangeState.responseMessageChannel = responseChannel;
  294.  
  295.                 if (entityDetails != null) {
  296.                     final Header h = incomingRequest.getFirstHeader(HttpHeaders.EXPECT);
  297.                     if (h != null && "100-continue".equalsIgnoreCase(h.getValue())) {
  298.                         responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), httpContext);
  299.                     }
  300.                 }
  301.             }
  302.  
  303.             //println("[proxy->origin] " + exchangeState.id + " request connection to " + targetHost);
  304.  
  305.             requester.connect(targetHost, Timeout.ofSeconds(30), null, new FutureCallback<AsyncClientEndpoint>() {
  306.  
  307.                 @Override
  308.                 public void completed(final AsyncClientEndpoint clientEndpoint) {
  309.                     //println("[proxy->origin] " + exchangeState.id + " connection leased");
  310.                     synchronized (exchangeState) {
  311.                         exchangeState.clientEndpoint = clientEndpoint;
  312.                     }
  313.                     clientEndpoint.execute(new OutgoingExchangeHandler(targetHost, clientEndpoint, exchangeState), httpContext);
  314.                 }
  315.  
  316.                 @Override
  317.                 public void failed(final Exception cause) {
  318.                     final HttpResponse outgoingResponse = new BasicHttpResponse(HttpStatus.SC_SERVICE_UNAVAILABLE);
  319.                     outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
  320.                     final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
  321.                     final EntityDetails exEntityDetails = new BasicEntityDetails(msg.remaining(),
  322.                                     ContentType.TEXT_PLAIN);
  323.                     synchronized (exchangeState) {
  324.                         exchangeState.response = outgoingResponse;
  325.                         exchangeState.responseEntityDetails = exEntityDetails;
  326.                         exchangeState.outBuf = new ProxyBuffer(1024);
  327.                         exchangeState.outBuf.put(msg);
  328.                         exchangeState.outputEnd = true;
  329.                     }
  330.                     //println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
  331.  
  332.                     try {
  333.                         responseChannel.sendResponse(outgoingResponse, exEntityDetails, httpContext);
  334.                     } catch (HttpException | IOException ignore) {
  335.                         // ignore
  336.                     }
  337.                 }
  338.  
  339.                 @Override
  340.                 public void cancelled() {
  341.                     failed(new InterruptedIOException());
  342.                 }
  343.  
  344.             });
  345.  
  346.         }
  347.  
  348.         @Override
  349.         public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
  350.             synchronized (exchangeState) {
  351.                 exchangeState.requestCapacityChannel = capacityChannel;
  352.                 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
  353.                 if (capacity > 0) {
  354.                     //println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
  355.                     capacityChannel.update(capacity);
  356.                 }
  357.             }
  358.         }
  359.  
  360.         @Override
  361.         public void consume(final ByteBuffer src) throws IOException {
  362.             synchronized (exchangeState) {
  363.                 //println("[client->proxy] " + exchangeState.id + " " + src.remaining() + " bytes received");
  364.                 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
  365.                 if (dataChannel != null && exchangeState.inBuf != null) {
  366.                     if (exchangeState.inBuf.hasData()) {
  367.                         final int bytesWritten = exchangeState.inBuf.write(dataChannel);
  368.                         //println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
  369.                     }
  370.                     if (!exchangeState.inBuf.hasData()) {
  371.                         final int bytesWritten = dataChannel.write(src);
  372.                         //println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
  373.                     }
  374.                 }
  375.                 if (src.hasRemaining()) {
  376.                     if (exchangeState.inBuf == null) {
  377.                         exchangeState.inBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
  378.                     }
  379.                     exchangeState.inBuf.put(src);
  380.                 }
  381.                 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
  382.                 //println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
  383.                 if (dataChannel != null) {
  384.                     dataChannel.requestOutput();
  385.                 }
  386.             }
  387.         }
  388.  
  389.         @Override
  390.         public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
  391.             synchronized (exchangeState) {
  392.                 //println("[client->proxy] " + exchangeState.id + " end of input");
  393.                 exchangeState.inputEnd = true;
  394.                 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
  395.                 if (dataChannel != null && (exchangeState.inBuf == null || !exchangeState.inBuf.hasData())) {
  396.                     //println("[proxy->origin] " + exchangeState.id + " end of output");
  397.                     dataChannel.endStream();
  398.                 }
  399.             }
  400.         }
  401.  
  402.         @Override
  403.         public int available() {
  404.             synchronized (exchangeState) {
  405.                 final int available = exchangeState.outBuf != null ? exchangeState.outBuf.length() : 0;
  406.                 //println("[client<-proxy] " + exchangeState.id + " output available: " + available);
  407.                 return available;
  408.             }
  409.         }
  410.  
  411.         @Override
  412.         public void produce(final DataStreamChannel channel) throws IOException {
  413.             synchronized (exchangeState) {
  414.                 //println("[client<-proxy] " + exchangeState.id + " produce output");
  415.                 exchangeState.responseDataChannel = channel;
  416.  
  417.                 if (exchangeState.outBuf != null) {
  418.                     if (exchangeState.outBuf.hasData()) {
  419.                         final int bytesWritten = exchangeState.outBuf.write(channel);
  420.                         //println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
  421.                     }
  422.                     if (exchangeState.outputEnd && !exchangeState.outBuf.hasData()) {
  423.                         channel.endStream();
  424.                         //println("[client<-proxy] " + exchangeState.id + " end of output");
  425.                     }
  426.                     if (!exchangeState.outputEnd) {
  427.                         final CapacityChannel capacityChannel = exchangeState.responseCapacityChannel;
  428.                         if (capacityChannel != null) {
  429.                             final int capacity = exchangeState.outBuf.capacity();
  430.                             if (capacity > 0) {
  431.                                 //println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
  432.                                 capacityChannel.update(capacity);
  433.                             }
  434.                         }
  435.                     }
  436.                 }
  437.             }
  438.         }
  439.  
  440.         @Override
  441.         public void failed(final Exception cause) {
  442.             //println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
  443.             if (!(cause instanceof ConnectionClosedException)) {
  444.                 cause.printStackTrace(System.out);
  445.             }
  446.             synchronized (exchangeState) {
  447.                 if (exchangeState.clientEndpoint != null) {
  448.                     exchangeState.clientEndpoint.releaseAndDiscard();
  449.                 }
  450.             }
  451.         }
  452.  
  453.         @Override
  454.         public void releaseResources() {
  455.             synchronized (exchangeState) {
  456.                 exchangeState.responseMessageChannel = null;
  457.                 exchangeState.responseDataChannel = null;
  458.                 exchangeState.requestCapacityChannel = null;
  459.             }
  460.         }
  461.  
  462.     }
  463.  
  464.     private static class OutgoingExchangeHandler implements AsyncClientExchangeHandler {
  465.  
  466.         private final static Set<String> HOP_BY_HOP = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
  467.                         HttpHeaders.HOST.toLowerCase(Locale.ROOT),
  468.                         HttpHeaders.CONTENT_LENGTH.toLowerCase(Locale.ROOT),
  469.                         HttpHeaders.TRANSFER_ENCODING.toLowerCase(Locale.ROOT),
  470.                         HttpHeaders.CONNECTION.toLowerCase(Locale.ROOT),
  471.                         "Keep-Alive".toLowerCase(Locale.ROOT),
  472.                         "Proxy-Authenticate".toLowerCase(Locale.ROOT),
  473.                         HttpHeaders.TE.toLowerCase(Locale.ROOT),
  474.                         HttpHeaders.TRAILER.toLowerCase(Locale.ROOT),
  475.                         HttpHeaders.UPGRADE.toLowerCase(Locale.ROOT))));
  476.  
  477.         private final HttpHost targetHost;
  478.         private final AsyncClientEndpoint clientEndpoint;
  479.         private final ProxyExchangeState exchangeState;
  480.  
  481.         OutgoingExchangeHandler(
  482.                 final HttpHost targetHost,
  483.                 final AsyncClientEndpoint clientEndpoint,
  484.                 final ProxyExchangeState exchangeState) {
  485.             this.targetHost = targetHost;
  486.             this.clientEndpoint = clientEndpoint;
  487.             this.exchangeState = exchangeState;
  488.         }
  489.  
  490.         @Override
  491.         public void produceRequest(
  492.                 final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
  493.             synchronized (exchangeState) {
  494.                 final HttpRequest incomingRequest = exchangeState.request;
  495.                 final EntityDetails entityDetails = exchangeState.requestEntityDetails;
  496.                 final HttpRequest outgoingRequest = new BasicHttpRequest(
  497.                         incomingRequest.getMethod(),
  498.                         targetHost,
  499.                         incomingRequest.getPath());
  500.                 for (final Iterator<Header> it = incomingRequest.headerIterator(); it.hasNext(); ) {
  501.                     final Header header = it.next();
  502.                     if (!HOP_BY_HOP.contains(header.getName().toLowerCase(Locale.ROOT))) {
  503.                         outgoingRequest.addHeader(header);
  504.                     }
  505.                 }
  506.  
  507.                 //println("[proxy->origin] " + exchangeState.id + " " +
  508.                         //outgoingRequest.getMethod() + " " + outgoingRequest.getRequestUri());
  509.  
  510.                 channel.sendRequest(outgoingRequest, entityDetails, httpContext);
  511.             }
  512.         }
  513.  
  514.         @Override
  515.         public int available() {
  516.             synchronized (exchangeState) {
  517.                 final int available = exchangeState.inBuf != null ? exchangeState.inBuf.length() : 0;
  518.                 //println("[proxy->origin] " + exchangeState.id + " output available: " + available);
  519.                 return available;
  520.             }
  521.         }
  522.  
  523.         @Override
  524.         public void produce(final DataStreamChannel channel) throws IOException {
  525.             synchronized (exchangeState) {
  526.                 //println("[proxy->origin] " + exchangeState.id + " produce output");
  527.                 exchangeState.requestDataChannel = channel;
  528.                 if (exchangeState.inBuf != null) {
  529.                     if (exchangeState.inBuf.hasData()) {
  530.                         final int bytesWritten = exchangeState.inBuf.write(channel);
  531.                         //println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
  532.                     }
  533.                     if (exchangeState.inputEnd && !exchangeState.inBuf.hasData()) {
  534.                         channel.endStream();
  535.                         //println("[proxy->origin] " + exchangeState.id + " end of output");
  536.                     }
  537.                     if (!exchangeState.inputEnd) {
  538.                         final CapacityChannel capacityChannel = exchangeState.requestCapacityChannel;
  539.                         if (capacityChannel != null) {
  540.                             final int capacity = exchangeState.inBuf.capacity();
  541.                             if (capacity > 0) {
  542.                                 //println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
  543.                                 capacityChannel.update(capacity);
  544.                             }
  545.                         }
  546.                     }
  547.                 }
  548.             }
  549.         }
  550.  
  551.         @Override
  552.         public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
  553.             // ignore
  554.         }
  555.  
  556.         @Override
  557.         public void consumeResponse(
  558.                 final HttpResponse incomingResponse,
  559.                 final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
  560.             synchronized (exchangeState) {
  561.                 //println("[proxy<-origin] " + exchangeState.id + " status " + incomingResponse.getCode());
  562.                 if (entityDetails == null) {
  563.                     //println("[proxy<-origin] " + exchangeState.id + " end of input");
  564.                 }
  565.  
  566.                 final HttpResponse outgoingResponse = new BasicHttpResponse(incomingResponse.getCode());
  567.                 for (final Iterator<Header> it = incomingResponse.headerIterator(); it.hasNext(); ) {
  568.                     final Header header = it.next();
  569.                     if (!HOP_BY_HOP.contains(header.getName().toLowerCase(Locale.ROOT))) {
  570.                         outgoingResponse.addHeader(header);
  571.                     }
  572.                 }
  573.  
  574.                 exchangeState.response = outgoingResponse;
  575.                 exchangeState.responseEntityDetails = entityDetails;
  576.                 exchangeState.outputEnd = entityDetails == null;
  577.  
  578.                 final ResponseChannel responseChannel = exchangeState.responseMessageChannel;
  579.                 if (responseChannel != null) {
  580.                     // responseChannel can be null under load.
  581.                     responseChannel.sendResponse(outgoingResponse, entityDetails, httpContext);
  582.                 }
  583.  
  584.                 //println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
  585.                 if (entityDetails == null) {
  586.                     //println("[client<-proxy] " + exchangeState.id + " end of output");
  587.                     clientEndpoint.releaseAndReuse();
  588.                 }
  589.             }
  590.         }
  591.  
  592.         @Override
  593.         public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
  594.             synchronized (exchangeState) {
  595.                 exchangeState.responseCapacityChannel = capacityChannel;
  596.                 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
  597.                 if (capacity > 0) {
  598.                     //println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
  599.                     capacityChannel.update(capacity);
  600.                 }
  601.             }
  602.         }
  603.  
  604.         @Override
  605.         public void consume(final ByteBuffer src) throws IOException {
  606.             synchronized (exchangeState) {
  607.                 //println("[proxy<-origin] " + exchangeState.id + " " + src.remaining() + " bytes received");
  608.                 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
  609.                 if (dataChannel != null && exchangeState.outBuf != null) {
  610.                     if (exchangeState.outBuf.hasData()) {
  611.                         final int bytesWritten = exchangeState.outBuf.write(dataChannel);
  612.                         //println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
  613.                     }
  614.                     if (!exchangeState.outBuf.hasData()) {
  615.                         final int bytesWritten = dataChannel.write(src);
  616.                         //println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
  617.                     }
  618.                 }
  619.                 if (src.hasRemaining()) {
  620.                     if (exchangeState.outBuf == null) {
  621.                         exchangeState.outBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
  622.                     }
  623.                     exchangeState.outBuf.put(src);
  624.                 }
  625.                 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
  626.                 //println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
  627.                 if (dataChannel != null) {
  628.                     dataChannel.requestOutput();
  629.                 }
  630.             }
  631.         }
  632.  
  633.         @Override
  634.         public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
  635.             synchronized (exchangeState) {
  636.                 //println("[proxy<-origin] " + exchangeState.id + " end of input");
  637.                 exchangeState.outputEnd = true;
  638.                 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
  639.                 if (dataChannel != null && (exchangeState.outBuf == null || !exchangeState.outBuf.hasData())) {
  640.                     //println("[client<-proxy] " + exchangeState.id + " end of output");
  641.                     dataChannel.endStream();
  642.                     clientEndpoint.releaseAndReuse();
  643.                 }
  644.             }
  645.         }
  646.  
  647.         @Override
  648.         public void cancel() {
  649.             clientEndpoint.releaseAndDiscard();
  650.         }
  651.  
  652.         @Override
  653.         public void failed(final Exception cause) {
  654.             //println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
  655.             if (!(cause instanceof ConnectionClosedException)) {
  656.                 cause.printStackTrace(System.out);
  657.             }
  658.             synchronized (exchangeState) {
  659.                 if (exchangeState.response == null) {
  660.                     final int status = cause instanceof IOException ? HttpStatus.SC_SERVICE_UNAVAILABLE : HttpStatus.SC_INTERNAL_SERVER_ERROR;
  661.                     final HttpResponse outgoingResponse = new BasicHttpResponse(status);
  662.                     outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
  663.                     exchangeState.response = outgoingResponse;
  664.  
  665.                     final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
  666.                     final int contentLen = msg.remaining();
  667.                     exchangeState.outBuf = new ProxyBuffer(1024);
  668.                     exchangeState.outBuf.put(msg);
  669.                     exchangeState.outputEnd = true;
  670.  
  671.                     //println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
  672.  
  673.                     try {
  674.                         final EntityDetails entityDetails = new BasicEntityDetails(contentLen, ContentType.TEXT_PLAIN);
  675.                         exchangeState.responseMessageChannel.sendResponse(outgoingResponse, entityDetails, null);
  676.                     } catch (HttpException | IOException ignore) {
  677.                         // ignore
  678.                     }
  679.                 } else {
  680.                     exchangeState.outputEnd = true;
  681.                 }
  682.                 clientEndpoint.releaseAndDiscard();
  683.             }
  684.         }
  685.  
  686.         @Override
  687.         public void releaseResources() {
  688.             synchronized (exchangeState) {
  689.                 exchangeState.requestDataChannel = null;
  690.                 exchangeState.responseCapacityChannel = null;
  691.                 clientEndpoint.releaseAndDiscard();
  692.             }
  693.         }
  694.  
  695.     }
  696.  
  697. }
Advertisement
Add Comment
Please, Sign In to add comment