Advertisement
Guest User

Untitled

a guest
Nov 21st, 2019
109
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5.42 KB | None | 0 0
  1. package de.hpi.ddm.actors;
  2.  
  3. import java.io.ByteArrayOutputStream;
  4. import java.io.IOException;
  5. import java.io.ObjectOutputStream;
  6. import java.io.Serializable;
  7. import java.util.Arrays;
  8. import java.util.List;
  9.  
  10. import akka.NotUsed;
  11. import akka.actor.AbstractLoggingActor;
  12. import akka.actor.ActorRef;
  13. import akka.actor.ActorSelection;
  14. import akka.actor.Props;
  15. import akka.serialization.Serialization;
  16. import akka.serialization.SerializationExtension;
  17. import akka.serialization.Serializers;
  18. import akka.stream.ActorMaterializer;
  19. import akka.stream.Materializer;
  20. import akka.stream.SourceRef;
  21. import akka.stream.javadsl.Sink;
  22. import akka.stream.javadsl.Source;
  23. import akka.stream.javadsl.StreamRefs;
  24. import de.hpi.ddm.configuration.ConfigurationSingleton;
  25. import lombok.AllArgsConstructor;
  26. import lombok.Data;
  27. import lombok.NoArgsConstructor;
  28. import org.apache.commons.lang3.ArrayUtils;
  29.  
  30. public class LargeMessageProxy extends AbstractLoggingActor {
  31.  
  32.     ////////////////////////
  33.     // Actor Construction //
  34.     ////////////////////////
  35.  
  36.     public static final String DEFAULT_NAME = "largeMessageProxy";
  37.    
  38.     public static Props props() {
  39.         return Props.create(LargeMessageProxy.class);
  40.     }
  41.  
  42.     ////////////////////
  43.     // Actor Messages //
  44.     ////////////////////
  45.    
  46.     @Data @NoArgsConstructor @AllArgsConstructor
  47.     public static class LargeMessage<T> implements Serializable {
  48.         private static final long serialVersionUID = 2940665245810221108L;
  49.         private T message;
  50.         private ActorRef receiver;
  51.     }
  52.  
  53.     @Data @NoArgsConstructor @AllArgsConstructor
  54.     public static class BytesMessage<T> implements Serializable {
  55.         private static final long serialVersionUID = 4057807743872319842L;
  56.         private T bytes;
  57.         private ActorRef sender;
  58.         private ActorRef receiver;
  59.     }
  60.  
  61.     @Data @NoArgsConstructor @AllArgsConstructor
  62.     public static class SerializedByteMessage<T> implements Serializable {
  63.         private static final long serialVersionUID = 1237807743872319842L;
  64.         private byte[] bytes;
  65.         private ActorRef sender;
  66.         private ActorRef receiver;
  67.         private int serializerID;
  68.         private String manifest;
  69.     }
  70.  
  71.     @Data @NoArgsConstructor @AllArgsConstructor
  72.     public static class SourceMessage implements Serializable {
  73.         private static final long serialVersionUID = 6237807743872319842L;
  74.         private SourceRef<List<Byte>> sourceRef;
  75.         private ActorRef sender;
  76.         private ActorRef receiver;
  77.  
  78.         public SourceRef<List<Byte>> getSourceRef(){
  79.             return sourceRef;
  80.         }
  81.  
  82.     }
  83.    
  84.     /////////////////
  85.     // Actor State //
  86.     /////////////////
  87.    
  88.     /////////////////////
  89.     // Actor Lifecycle //
  90.     /////////////////////
  91.  
  92.     ////////////////////
  93.     // Actor Behavior //
  94.     ////////////////////
  95.    
  96.     @Override
  97.     public Receive createReceive() {
  98.         return receiveBuilder()
  99.                 .match(LargeMessage.class, this::handle)
  100.                 .match(BytesMessage.class, this::handle)
  101.                 .match(SourceMessage.class, this::handle)
  102.                 .matchAny(object -> this.log().info("Received unknown message: \"{}\"", object.toString()))
  103.                 .build();
  104.     }
  105.  
  106.     private void handle(LargeMessage<?> message) {
  107.         ActorRef receiver = message.getReceiver();
  108.         ActorSelection receiverProxy = this.context().actorSelection(receiver.path().child(DEFAULT_NAME));
  109.  
  110.         // Serializing
  111.         Serialization serialization = SerializationExtension.get(getContext().getSystem());
  112.         byte[] bytes = serialization.serialize(message).get();
  113.         int serializerId = serialization.findSerializerFor(message).identifier();
  114.         String manifest = Serializers.manifestFor(serialization.findSerializerFor(message), message);
  115.  
  116.         // Creating a serialized byte message, that carries the manifest and the serializer id
  117.         SerializedByteMessage serializedBytesMessage = new SerializedByteMessage(bytes, this.sender(), message.getReceiver(), serializerId, manifest);
  118.  
  119.         ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
  120.         try {
  121.             ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
  122.             objectOutputStream.writeObject(serializedBytesMessage);
  123.             objectOutputStream.flush();
  124.             objectOutputStream.close();
  125.             byteArrayOutputStream.close();
  126.         } catch (IOException ex) {
  127.             System.out.println("IOException is caught");
  128.         }
  129.  
  130.         byte[] byteArrayData = byteArrayOutputStream.toByteArray();
  131.  
  132.  
  133.         // Akka Streaming
  134.         Source<List<Byte>, NotUsed> source = Source.from(Arrays.asList(ArrayUtils.toObject(byteArrayData))).grouped(262144); // max size = 262144
  135.         SourceRef<List<Byte>> sourceRef = source.runWith(StreamRefs.sourceRef(), getContext().getSystem());
  136.  
  137.         // Passing the source reference as a customized "Source Message"
  138.         receiverProxy.tell(new SourceMessage(sourceRef, this.sender(), message.getReceiver()), this.self());
  139.     }
  140.  
  141.     private void handle(SourceMessage message){
  142.         // Receiving the customized "Source Message" and receiving the source reference
  143.         SourceRef<List<Byte>> sourceRef = message.getSourceRef();
  144.         sourceRef.getSource().runWith(Sink.ignore(), getContext().getSystem()) // send the way it is
  145.         .whenComplete((result, exception) -> System.out.println("We have to gather the data now!"));
  146.  
  147.         // we have to forward the final object to the final receiver
  148.         message.getReceiver().tell("Deliver here the final assembled data", message.getSender());
  149.     }
  150.  
  151.     private void handle(BytesMessage<?> message) {
  152. //       Reassemble the message content, deserialize it and/or load the content from some local location before forwarding its content.
  153.         message.getReceiver().tell(message.getBytes(), message.getSender());
  154.     }
  155. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement