Advertisement
Guest User

Untitled

a guest
Nov 21st, 2019
132
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.26 KB | None | 0 0
  1. package de.hpi.ddm.actors;
  2.  
  3. import java.io.Serializable;
  4. import java.util.Arrays;
  5. import java.util.concurrent.CompletableFuture;
  6. import java.util.concurrent.CompletionStage;
  7.  
  8. import akka.Done;
  9. import akka.NotUsed;
  10. import akka.actor.*;
  11. import akka.serialization.Serialization;
  12. import akka.serialization.SerializationExtension;
  13. import akka.serialization.Serializers;
  14. import akka.stream.ActorMaterializer;
  15. import akka.stream.Materializer;
  16. import akka.stream.SourceRef;
  17. import akka.stream.impl.StreamSupervisor;
  18. import akka.stream.javadsl.*;
  19. import akka.util.ByteString;
  20. import lombok.AllArgsConstructor;
  21. import lombok.Data;
  22. import lombok.NoArgsConstructor;
  23.  
  24. public class LargeMessageProxy extends AbstractLoggingActor {
  25.  
  26.     ////////////////////////
  27.     // Actor Construction //
  28.     ////////////////////////
  29.  
  30.     public static final String DEFAULT_NAME = "largeMessageProxy";
  31.  
  32.     public static Props props() {
  33.         return Props.create(LargeMessageProxy.class);
  34.     }
  35.  
  36.     ////////////////////
  37.     // Actor Messages //
  38.     ////////////////////
  39.  
  40.     @Data @NoArgsConstructor @AllArgsConstructor
  41.     public static class LargeMessage<T> implements Serializable {
  42.         private static final long serialVersionUID = 2940665245810221108L;
  43.         private T message;
  44.         private ActorRef receiver;
  45.     }
  46.  
  47.     @Data @NoArgsConstructor @AllArgsConstructor
  48.     public static class BytesMessage<T> implements Serializable {
  49.         private static final long serialVersionUID = 4057807743872319842L;
  50.         private T bytes;
  51.         private ActorRef sender;
  52.         private ActorRef receiver;
  53.         private Integer serializerId;
  54.         private String manifest;
  55.         private CompletionStage<SourceRef<ByteString>> stage;
  56.  
  57.         public String getManifest(){
  58.             return manifest;
  59.         }
  60.  
  61.         public Integer getSerializerId(){
  62.             return serializerId;
  63.         }
  64.  
  65.         public CompletionStage<SourceRef<ByteString>> getStage(){
  66.             return stage;
  67.         }
  68.  
  69.     }
  70.  
  71.     /////////////////
  72.     // Actor State //
  73.     /////////////////
  74.  
  75.     /////////////////////
  76.     // Actor Lifecycle //
  77.     /////////////////////
  78.  
  79.     ////////////////////
  80.     // Actor Behavior //
  81.     ////////////////////
  82.  
  83.     @Override
  84.     public Receive createReceive() {
  85.         return receiveBuilder()
  86.                 .match(LargeMessage.class, this::handle)
  87.                 .match(BytesMessage.class, this::handle)
  88.                 .matchAny(object -> this.log().info("Received unknown message: \"{}\"", object.toString()))
  89.                 .build();
  90.     }
  91.  
  92.     private void handle(LargeMessage<?> message) {
  93.         ActorRef receiver = message.getReceiver();
  94.         ActorSelection receiverProxy = this.context().actorSelection(receiver.path().child(DEFAULT_NAME));
  95.  
  96.         // Just serializing the data
  97.         Serialization serialization = SerializationExtension.get(getContext().getSystem());
  98.         byte[] bytes = serialization.serialize(message).get();
  99.         int serializerId = serialization.findSerializerFor(message).identifier();
  100.         String manifest = Serializers.manifestFor(serialization.findSerializerFor(message), message);
  101.  
  102.         ByteString byteString = ByteString.fromArray(bytes);
  103.  
  104.         // Akka Streaming
  105.         Source<ByteString, NotUsed> source = Source.from(Arrays.asList(byteString));
  106.         Materializer mat = ActorMaterializer.create(getContext().getSystem());
  107.         CompletionStage<SourceRef<ByteString>> stage = source.runWith(StreamRefs.sourceRef(), mat);
  108.  
  109.         receiverProxy.tell(new BytesMessage<>(message.getMessage(), this.sender(), message.getReceiver(), serializerId, manifest, stage), this.self());
  110.     }
  111.  
  112.  
  113.     private void handle(BytesMessage<?> message) {
  114.         // Reassemble the message content, deserialize it and/or load the content from some local location before forwarding its content.
  115. //        Sink<ByteString, NotUsed> sink = Sink.actorRef(getContext().getSelf(), "complete");
  116.         CompletionStage<SourceRef<ByteString>> stage = message.getStage();
  117.  
  118.         CompletionStage<Void> future = stage.thenApply(SourceRef::getSource).thenRun(() -> ActorMaterializer.create(getContext().getSystem()));
  119.         message.getReceiver().tell(message.getBytes(), message.getSender());
  120.  
  121.  
  122.     }
  123. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement