Advertisement
Guest User

Untitled

a guest
Aug 4th, 2015
189
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 7.33 KB | None | 0 0
  1. package com.movingimage24.download;
  2.  
  3. import com.google.api.client.repackaged.com.google.common.base.Strings;
  4. import io.vertx.core.AbstractVerticle;
  5. import io.vertx.core.Future;
  6. import io.vertx.core.Handler;
  7. import io.vertx.core.eventbus.Message;
  8. import io.vertx.core.file.AsyncFile;
  9. import io.vertx.core.file.OpenOptions;
  10. import io.vertx.core.http.HttpClient;
  11. import io.vertx.core.http.HttpClientOptions;
  12. import io.vertx.core.http.HttpClientRequest;
  13. import io.vertx.core.http.HttpClientResponse;
  14. import io.vertx.core.json.JsonObject;
  15. import io.vertx.core.logging.Logger;
  16. import io.vertx.core.logging.LoggerFactory;
  17. import io.vertx.core.streams.Pump;
  18.  
  19. import java.io.File;
  20. import java.net.URL;
  21. import java.util.UUID;
  22.  
  23. /**
  24.  * Created by joan on 31/07/15.
  25.  */
  26. public class DownloadVerticle extends AbstractVerticle {
  27.     private Logger logger = LoggerFactory.getLogger(this.getClass());
  28.  
  29.     private String downloadDir;
  30.     private ZookeeperUtils zookeeperUtils;
  31.  
  32.     public enum ADRESS_IN {
  33.         DOWNLOAD(DownloadVerticle.class.getCanonicalName() + "download");
  34.  
  35.         private String adress;
  36.  
  37.         ADRESS_IN(String adress) {
  38.             this.adress = adress;
  39.         }
  40.  
  41.         public String getAdress() {
  42.             return adress;
  43.         }
  44.     }
  45.  
  46.     public enum MESSAGE_RESPONSE_CODES {
  47.         DOWNLOAD_FAILED_FILE_CREATION(1),
  48.         DOWNLOAD_FAILED_FILE_OPEN(2),
  49.         DOWNLOAD_FAILED_NOT_200(3),
  50.         DOWNLOAD_FAILED_EXCEPTION_DURING_DOWNLOAD(4);
  51.  
  52.         private int code;
  53.  
  54.         MESSAGE_RESPONSE_CODES(int code) {
  55.  
  56.             this.code = code;
  57.         }
  58.  
  59.         public int getCode() {
  60.             return code;
  61.         }
  62.     }
  63.  
  64.     @Override
  65.     public void start(Future<Void> startFuture) {
  66.         if (!isConfigValid()) {
  67.             startFuture.fail("Invalid config");
  68.             return;
  69.         }
  70.  
  71.         try {
  72.             zookeeperUtils = new ZookeeperUtils(config().getJsonObject("zookeeper"));
  73.         } catch (Exception e) {
  74.             String message = "Error while starting zookeeper connection";
  75.             logger.error(message, e);
  76.             startFuture.fail(message);
  77.             return;
  78.         }
  79.  
  80.         downloadDir = config().getString("downloadDir");
  81.         vertx.eventBus().consumer(ADRESS_IN.DOWNLOAD.getAdress(), getDownloadHandler());
  82.         startFuture.complete();
  83.     }
  84.  
  85.     private boolean isConfigValid() {
  86.         JsonObject config = config();
  87.         String downloadDir = config.getString("downloadDir");
  88.         if (Strings.isNullOrEmpty(downloadDir)) {
  89.             logger.info("Download dir was null or empty");
  90.             return false;
  91.         }
  92.  
  93.         // blocking is no problem, because it only happens on startup
  94.         if (!vertx.fileSystem().existsBlocking(downloadDir)) {
  95.             logger.info(String.format("Download directory '%s' does not exists", downloadDir));
  96.             return false;
  97.         }
  98.  
  99.         //TODO validate zookeeper conf
  100.  
  101.         return true;
  102.     }
  103.  
  104.     private Handler<Message<String>> getDownloadHandler() {
  105.         return message -> {
  106.             //message.reply("/opt/downloadDir/ent.mp4");
  107.             createFile(message);
  108.         };
  109.     }
  110.  
  111.     private void createFile(Message<String> message) {
  112.         final String filePath = downloadDir + File.separator + UUID.randomUUID().toString();
  113.         // create the file
  114.  
  115.         vertx.fileSystem().createFile(filePath, fileCreationResponse -> {
  116.             if (fileCreationResponse.succeeded()) {
  117.                 // open the file
  118.                 OpenOptions openOptions = new OpenOptions();
  119.                 openOptions.setWrite(true);
  120.                 vertx.fileSystem().open(filePath, openOptions, fileOpenResponse -> {
  121.                     if (fileOpenResponse.succeeded()) {
  122.                         AsyncFile asyncFile = fileOpenResponse.result();
  123.  
  124.                         String url = message.body();
  125.                         try {
  126.                             URL resolvedUrl = zookeeperUtils.resolveUrl(url);
  127.                             HttpClientOptions options = new HttpClientOptions()
  128.                                     .setDefaultHost(resolvedUrl.getHost());
  129.  
  130.                             logger.info(String.format("Using this resolved url as download url: %s", resolvedUrl.toString()));
  131.  
  132.                             if (resolvedUrl.getPort() != -1) {
  133.                                 options.setDefaultPort(resolvedUrl.getPort());
  134.                             }
  135.  
  136.                             HttpClient client = vertx.createHttpClient(options);
  137.  
  138.                             HttpClientRequest httpClientRequest = client.get(resolvedUrl.getFile(), getHttpClientResponseHandler(asyncFile, filePath, message));
  139.                             httpClientRequest.putHeader("Accept", "*/*");
  140.                             httpClientRequest.exceptionHandler(getExceptionHandler(url, message, asyncFile));
  141.                             httpClientRequest.end();
  142.                         } catch (Exception e) {
  143.                             String errorMessage = String.format("Error while resolving service url: %s", url);
  144.                             logger.error(errorMessage, e);
  145.                             message.fail(2, errorMessage);
  146.                         }
  147.                     } else {
  148.                         String errorMessage = String.format("Could not open file at : %s", filePath);
  149.                         message.fail(MESSAGE_RESPONSE_CODES.DOWNLOAD_FAILED_FILE_OPEN.getCode(), errorMessage);
  150.                     }
  151.                 });
  152.             } else {
  153.                 String errorMessage = String.format("Could not create file at : %s", filePath);
  154.                 message.fail(MESSAGE_RESPONSE_CODES.DOWNLOAD_FAILED_FILE_CREATION.getCode(), errorMessage);
  155.             }
  156.         });
  157.     }
  158.  
  159.     private Handler<Throwable> getExceptionHandler(String url, Message<String> message, AsyncFile asyncFile) {
  160.         return e -> {
  161.             String errorMessage = String.format("Exception during download file: '%s' ", url);
  162.             logger.error(errorMessage, e.getCause());
  163.             message.fail(MESSAGE_RESPONSE_CODES.DOWNLOAD_FAILED_EXCEPTION_DURING_DOWNLOAD.getCode(), errorMessage);
  164.             asyncFile.close();
  165.         };
  166.     }
  167.  
  168.     private Handler<HttpClientResponse> getHttpClientResponseHandler(final AsyncFile asyncFile, final String filePath, Message<String> message) {
  169.         return httpClientResponse -> {
  170.             int statusCode = httpClientResponse.statusCode();
  171.             httpClientResponse.pause();
  172.             if (statusCode != 200) {
  173.                 String errorMessage = String.format("Could not download file. Status code was not 200, got: %s", statusCode);
  174.                 logger.error(errorMessage);
  175.                 message.fail(MESSAGE_RESPONSE_CODES.DOWNLOAD_FAILED_NOT_200.getCode(), errorMessage);
  176.                 asyncFile.close();
  177.                 return;
  178.             }
  179.  
  180.             httpClientResponse.endHandler(httpEndHandler -> {
  181.                 message.reply(filePath);
  182.             });
  183.             Pump pump = Pump.factory.pump(httpClientResponse, asyncFile);
  184.             pump.start();
  185.             httpClientResponse.resume();
  186.         };
  187.     }
  188.  
  189.     private Handler<Void> getHttpEndHandler(String filePath, Message<String> message) {
  190.         return endHandler -> {
  191.             message.reply(filePath);
  192.         };
  193.     }
  194. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement