Advertisement
Guest User

Untitled

a guest
Aug 4th, 2015
191
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.35 KB | None | 0 0
  1. package com.movingimage24.download;
  2.  
  3. import com.google.api.client.repackaged.com.google.common.base.Strings;
  4. import io.vertx.core.AbstractVerticle;
  5. import io.vertx.core.Future;
  6. import io.vertx.core.Handler;
  7. import io.vertx.core.eventbus.Message;
  8. import io.vertx.core.file.AsyncFile;
  9. import io.vertx.core.file.OpenOptions;
  10. import io.vertx.core.http.HttpClient;
  11. import io.vertx.core.http.HttpClientOptions;
  12. import io.vertx.core.http.HttpClientRequest;
  13. import io.vertx.core.http.HttpClientResponse;
  14. import io.vertx.core.json.JsonObject;
  15. import io.vertx.core.logging.Logger;
  16. import io.vertx.core.logging.LoggerFactory;
  17. import io.vertx.core.streams.Pump;
  18.  
  19. import java.io.File;
  20. import java.net.URL;
  21. import java.util.UUID;
  22.  
  23. /**
  24. * Created by joan on 31/07/15.
  25. */
  26. public class DownloadVerticle extends AbstractVerticle {
  27. private Logger logger = LoggerFactory.getLogger(this.getClass());
  28.  
  29. private String downloadDir;
  30. private ZookeeperUtils zookeeperUtils;
  31.  
  32. public enum ADRESS_IN {
  33. DOWNLOAD(DownloadVerticle.class.getCanonicalName() + "download");
  34.  
  35. private String adress;
  36.  
  37. ADRESS_IN(String adress) {
  38. this.adress = adress;
  39. }
  40.  
  41. public String getAdress() {
  42. return adress;
  43. }
  44. }
  45.  
  46. public enum MESSAGE_RESPONSE_CODES {
  47. DOWNLOAD_FAILED_FILE_CREATION(1),
  48. DOWNLOAD_FAILED_FILE_OPEN(2),
  49. DOWNLOAD_FAILED_NOT_200(3),
  50. DOWNLOAD_FAILED_EXCEPTION_DURING_DOWNLOAD(4);
  51.  
  52. private int code;
  53.  
  54. MESSAGE_RESPONSE_CODES(int code) {
  55.  
  56. this.code = code;
  57. }
  58.  
  59. public int getCode() {
  60. return code;
  61. }
  62. }
  63.  
  64. @Override
  65. public void start(Future<Void> startFuture) {
  66. if (!isConfigValid()) {
  67. startFuture.fail("Invalid config");
  68. return;
  69. }
  70.  
  71. try {
  72. zookeeperUtils = new ZookeeperUtils(config().getJsonObject("zookeeper"));
  73. } catch (Exception e) {
  74. String message = "Error while starting zookeeper connection";
  75. logger.error(message, e);
  76. startFuture.fail(message);
  77. return;
  78. }
  79.  
  80. downloadDir = config().getString("downloadDir");
  81. vertx.eventBus().consumer(ADRESS_IN.DOWNLOAD.getAdress(), getDownloadHandler());
  82. startFuture.complete();
  83. }
  84.  
  85. private boolean isConfigValid() {
  86. JsonObject config = config();
  87. String downloadDir = config.getString("downloadDir");
  88. if (Strings.isNullOrEmpty(downloadDir)) {
  89. logger.info("Download dir was null or empty");
  90. return false;
  91. }
  92.  
  93. // blocking is no problem, because it only happens on startup
  94. if (!vertx.fileSystem().existsBlocking(downloadDir)) {
  95. logger.info(String.format("Download directory '%s' does not exists", downloadDir));
  96. return false;
  97. }
  98.  
  99. //TODO validate zookeeper conf
  100.  
  101. return true;
  102. }
  103.  
  104. private Handler<Message<String>> getDownloadHandler() {
  105. return message -> {
  106. //message.reply("/opt/downloadDir/ent.mp4");
  107. createFile(message);
  108. //
  109. // String url = message.body();
  110. //
  111. // HttpClient client = vertx.createHttpClient();
  112. // client.get(url, httpClientResponse -> {
  113. // httpClientResponse.pause();
  114. //// Pump pump = Pump.factory.pump(httpClientResponse);
  115. //
  116. // httpClientResponse.resume();
  117. // });
  118. };
  119. }
  120.  
  121. private void createFile(Message<String> message) {
  122. final String filePath = downloadDir + File.separator + UUID.randomUUID().toString();
  123. // create the file
  124.  
  125. vertx.fileSystem().createFile(filePath, fileCreationResponse -> {
  126. if (fileCreationResponse.succeeded()) {
  127. // open the file
  128. OpenOptions openOptions = new OpenOptions();
  129. openOptions.setWrite(true);
  130. vertx.fileSystem().open(filePath, openOptions, fileOpenResponse -> {
  131. if (fileOpenResponse.succeeded()) {
  132. AsyncFile asyncFile = fileOpenResponse.result();
  133.  
  134. String url = message.body();
  135. try {
  136. URL resolvedUrl = zookeeperUtils.resolveUrl(url);
  137. HttpClientOptions options = new HttpClientOptions()
  138. .setDefaultHost(resolvedUrl.getHost());
  139.  
  140. logger.info(String.format("Using this resolved url as download url: %s", resolvedUrl.toString()));
  141.  
  142. if (resolvedUrl.getPort() != -1) {
  143. options.setDefaultPort(resolvedUrl.getPort());
  144. }
  145.  
  146. HttpClient client = vertx.createHttpClient(options);
  147.  
  148. HttpClientRequest httpClientRequest = client.get(resolvedUrl.getFile(), getHttpClientResponseHandler(asyncFile, filePath, message));
  149. httpClientRequest.putHeader("Accept", "*/*");
  150. httpClientRequest.exceptionHandler(getExceptionHandler(url, message, asyncFile));
  151. httpClientRequest.end();
  152. client.close();
  153. } catch (Exception e) {
  154. String errorMessage = String.format("Error while resolving service url: %s", url);
  155. logger.error(errorMessage, e);
  156. message.fail(2, errorMessage);
  157. }
  158. } else {
  159. String errorMessage = String.format("Could not open file at : %s", filePath);
  160. message.fail(MESSAGE_RESPONSE_CODES.DOWNLOAD_FAILED_FILE_OPEN.getCode(), errorMessage);
  161. }
  162. });
  163. } else {
  164. String errorMessage = String.format("Could not create file at : %s", filePath);
  165. message.fail(MESSAGE_RESPONSE_CODES.DOWNLOAD_FAILED_FILE_CREATION.getCode(), errorMessage);
  166. }
  167. });
  168. }
  169.  
  170. private Handler<Throwable> getExceptionHandler(String url, Message<String> message, AsyncFile asyncFile) {
  171. return e -> {
  172. String errorMessage = String.format("Exception during download file: '%s' ", url);
  173. logger.error(errorMessage, e.getCause());
  174. message.fail(MESSAGE_RESPONSE_CODES.DOWNLOAD_FAILED_EXCEPTION_DURING_DOWNLOAD.getCode(), errorMessage);
  175. asyncFile.close();
  176. };
  177. }
  178.  
  179. private Handler<HttpClientResponse> getHttpClientResponseHandler(final AsyncFile asyncFile, final String filePath, Message<String> message) {
  180. return httpClientResponse -> {
  181. int statusCode = httpClientResponse.statusCode();
  182. httpClientResponse.pause();
  183. if (statusCode != 200) {
  184. String errorMessage = String.format("Could not download file. Status code was not 200, got: %s", statusCode);
  185. logger.error(errorMessage);
  186. message.fail(MESSAGE_RESPONSE_CODES.DOWNLOAD_FAILED_NOT_200.getCode(), errorMessage);
  187. asyncFile.close();
  188. return;
  189. }
  190. // httpClientResponse.endHandler(httpEndHandler -> {
  191. // message.reply(filePath);
  192. // asyncFile.flush(flushResp -> {
  193. // asyncFile.close(event -> {
  194. // if (event.succeeded()) {
  195. // logger.info(String.format("Closed file: %s", filePath));
  196. // message.reply(filePath);
  197. // } else {
  198. // logger.info(String.format("Could not close file: %s", filePath));
  199. //
  200. // }
  201. // });
  202. // });
  203. // });
  204.  
  205. httpClientResponse.endHandler(httpEndHandler -> {
  206. message.reply(filePath);
  207. });
  208. Pump pump = Pump.factory.pump(httpClientResponse, asyncFile);
  209. pump.start();
  210. httpClientResponse.resume();
  211. };
  212. }
  213.  
  214. private Handler<Void> getHttpEndHandler(String filePath, Message<String> message) {
  215. return endHandler -> {
  216. message.reply(filePath);
  217. };
  218. }
  219. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement