Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.movingimage24.download;
- import com.google.api.client.repackaged.com.google.common.base.Strings;
- import io.vertx.core.AbstractVerticle;
- import io.vertx.core.Future;
- import io.vertx.core.Handler;
- import io.vertx.core.eventbus.Message;
- import io.vertx.core.file.AsyncFile;
- import io.vertx.core.file.OpenOptions;
- import io.vertx.core.http.HttpClient;
- import io.vertx.core.http.HttpClientOptions;
- import io.vertx.core.http.HttpClientRequest;
- import io.vertx.core.http.HttpClientResponse;
- import io.vertx.core.json.JsonObject;
- import io.vertx.core.logging.Logger;
- import io.vertx.core.logging.LoggerFactory;
- import io.vertx.core.streams.Pump;
- import java.io.File;
- import java.net.URL;
- import java.util.UUID;
- /**
- * Created by joan on 31/07/15.
- */
- public class DownloadVerticle extends AbstractVerticle {
- private Logger logger = LoggerFactory.getLogger(this.getClass());
- private String downloadDir;
- private ZookeeperUtils zookeeperUtils;
- public enum ADRESS_IN {
- DOWNLOAD(DownloadVerticle.class.getCanonicalName() + "download");
- private String adress;
- ADRESS_IN(String adress) {
- this.adress = adress;
- }
- public String getAdress() {
- return adress;
- }
- }
- public enum MESSAGE_RESPONSE_CODES {
- DOWNLOAD_FAILED_FILE_CREATION(1),
- DOWNLOAD_FAILED_FILE_OPEN(2),
- DOWNLOAD_FAILED_NOT_200(3),
- DOWNLOAD_FAILED_EXCEPTION_DURING_DOWNLOAD(4);
- private int code;
- MESSAGE_RESPONSE_CODES(int code) {
- this.code = code;
- }
- public int getCode() {
- return code;
- }
- }
- @Override
- public void start(Future<Void> startFuture) {
- if (!isConfigValid()) {
- startFuture.fail("Invalid config");
- return;
- }
- try {
- zookeeperUtils = new ZookeeperUtils(config().getJsonObject("zookeeper"));
- } catch (Exception e) {
- String message = "Error while starting zookeeper connection";
- logger.error(message, e);
- startFuture.fail(message);
- return;
- }
- downloadDir = config().getString("downloadDir");
- vertx.eventBus().consumer(ADRESS_IN.DOWNLOAD.getAdress(), getDownloadHandler());
- startFuture.complete();
- }
- private boolean isConfigValid() {
- JsonObject config = config();
- String downloadDir = config.getString("downloadDir");
- if (Strings.isNullOrEmpty(downloadDir)) {
- logger.info("Download dir was null or empty");
- return false;
- }
- // blocking is no problem, because it only happens on startup
- if (!vertx.fileSystem().existsBlocking(downloadDir)) {
- logger.info(String.format("Download directory '%s' does not exists", downloadDir));
- return false;
- }
- //TODO validate zookeeper conf
- return true;
- }
- private Handler<Message<String>> getDownloadHandler() {
- return message -> {
- //message.reply("/opt/downloadDir/ent.mp4");
- createFile(message);
- };
- }
- private void createFile(Message<String> message) {
- final String filePath = downloadDir + File.separator + UUID.randomUUID().toString();
- // create the file
- vertx.fileSystem().createFile(filePath, fileCreationResponse -> {
- if (fileCreationResponse.succeeded()) {
- // open the file
- OpenOptions openOptions = new OpenOptions();
- openOptions.setWrite(true);
- vertx.fileSystem().open(filePath, openOptions, fileOpenResponse -> {
- if (fileOpenResponse.succeeded()) {
- AsyncFile asyncFile = fileOpenResponse.result();
- String url = message.body();
- try {
- URL resolvedUrl = zookeeperUtils.resolveUrl(url);
- HttpClientOptions options = new HttpClientOptions()
- .setDefaultHost(resolvedUrl.getHost());
- logger.info(String.format("Using this resolved url as download url: %s", resolvedUrl.toString()));
- if (resolvedUrl.getPort() != -1) {
- options.setDefaultPort(resolvedUrl.getPort());
- }
- HttpClient client = vertx.createHttpClient(options);
- HttpClientRequest httpClientRequest = client.get(resolvedUrl.getFile(), getHttpClientResponseHandler(asyncFile, filePath, message));
- httpClientRequest.putHeader("Accept", "*/*");
- httpClientRequest.exceptionHandler(getExceptionHandler(url, message, asyncFile));
- httpClientRequest.end();
- } catch (Exception e) {
- String errorMessage = String.format("Error while resolving service url: %s", url);
- logger.error(errorMessage, e);
- message.fail(2, errorMessage);
- }
- } else {
- String errorMessage = String.format("Could not open file at : %s", filePath);
- message.fail(MESSAGE_RESPONSE_CODES.DOWNLOAD_FAILED_FILE_OPEN.getCode(), errorMessage);
- }
- });
- } else {
- String errorMessage = String.format("Could not create file at : %s", filePath);
- message.fail(MESSAGE_RESPONSE_CODES.DOWNLOAD_FAILED_FILE_CREATION.getCode(), errorMessage);
- }
- });
- }
- private Handler<Throwable> getExceptionHandler(String url, Message<String> message, AsyncFile asyncFile) {
- return e -> {
- String errorMessage = String.format("Exception during download file: '%s' ", url);
- logger.error(errorMessage, e.getCause());
- message.fail(MESSAGE_RESPONSE_CODES.DOWNLOAD_FAILED_EXCEPTION_DURING_DOWNLOAD.getCode(), errorMessage);
- asyncFile.close();
- };
- }
- private Handler<HttpClientResponse> getHttpClientResponseHandler(final AsyncFile asyncFile, final String filePath, Message<String> message) {
- return httpClientResponse -> {
- int statusCode = httpClientResponse.statusCode();
- httpClientResponse.pause();
- if (statusCode != 200) {
- String errorMessage = String.format("Could not download file. Status code was not 200, got: %s", statusCode);
- logger.error(errorMessage);
- message.fail(MESSAGE_RESPONSE_CODES.DOWNLOAD_FAILED_NOT_200.getCode(), errorMessage);
- asyncFile.close();
- return;
- }
- httpClientResponse.endHandler(httpEndHandler -> {
- message.reply(filePath);
- });
- Pump pump = Pump.factory.pump(httpClientResponse, asyncFile);
- pump.start();
- httpClientResponse.resume();
- };
- }
- private Handler<Void> getHttpEndHandler(String filePath, Message<String> message) {
- return endHandler -> {
- message.reply(filePath);
- };
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement