Advertisement
Guest User

Untitled

a guest
Nov 22nd, 2019
114
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5.42 KB | None | 0 0
  1. package de.hpi.ddm.actors;
  2.  
  3. import java.io.*;
  4. import java.util.Arrays;
  5. import java.util.List;
  6.  
  7. import akka.NotUsed;
  8. import akka.actor.*;
  9. import akka.serialization.JavaSerializer;
  10. import akka.serialization.Serialization;
  11. import akka.serialization.SerializationExtension;
  12. import akka.serialization.Serializers;
  13. import akka.stream.SourceRef;
  14. import akka.stream.javadsl.Sink;
  15. import akka.stream.javadsl.Source;
  16. import akka.stream.javadsl.StreamRefs;
  17. import com.esotericsoftware.kryo.Kryo;
  18. import com.esotericsoftware.kryo.io.Input;
  19. import com.esotericsoftware.kryo.io.Output;
  20. import lombok.AllArgsConstructor;
  21. import lombok.Data;
  22. import lombok.NoArgsConstructor;
  23. import org.apache.commons.lang3.ArrayUtils;
  24.  
  25. public class LargeMessageProxy extends AbstractLoggingActor {
  26.  
  27.     ////////////////////////
  28.     // Actor Construction //
  29.     ////////////////////////
  30.  
  31.     public static final String DEFAULT_NAME = "largeMessageProxy";
  32.  
  33.     public static Props props() {
  34.         return Props.create(LargeMessageProxy.class);
  35.     }
  36.  
  37.     ////////////////////
  38.     // Actor Messages //
  39.     ////////////////////
  40.  
  41.     @Data
  42.     @NoArgsConstructor
  43.     @AllArgsConstructor
  44.     public static class LargeMessage<T> implements Serializable {
  45.         private static final long serialVersionUID = 2940665245810221108L;
  46.         private T message;
  47.         private ActorRef receiver;
  48.     }
  49.  
  50.     @Data
  51.     @NoArgsConstructor
  52.     @AllArgsConstructor
  53.     public static class BytesMessage<T> implements Serializable {
  54.         private static final long serialVersionUID = 4057807743872319842L;
  55.         private T bytes;
  56.         private ActorRef sender;
  57.         private ActorRef receiver;
  58.     }
  59.  
  60.     @Data
  61.     @NoArgsConstructor
  62.     @AllArgsConstructor
  63.     public static class SerializedByteMessage<T> implements Serializable {
  64.         private static final long serialVersionUID = 1234507743872319842L;
  65.         private byte[] bytes;
  66.         private ActorRef sender;
  67.         private ActorRef receiver;
  68.     }
  69.  
  70.     @Data
  71.     @NoArgsConstructor
  72.     @AllArgsConstructor
  73.     public static class SourceMessage implements Serializable {
  74.         private static final long serialVersionUID = 2432507743872319842L;
  75.         private SourceRef<List<Byte>> sourceRef;
  76.         private int length;
  77.         private ActorRef sender;
  78.         private ActorRef receiver;
  79.  
  80.         public SourceRef<List<Byte>> getSourceRef() {
  81.             return sourceRef;
  82.         }
  83.  
  84.         public ActorRef getReceiver() {
  85.             return receiver;
  86.         }
  87.  
  88.         public ActorRef getSender() {
  89.             return sender;
  90.         }
  91.  
  92.  
  93.     }
  94.  
  95.     /////////////////
  96.     // Actor State //
  97.     /////////////////
  98.  
  99.     /////////////////////
  100.     // Actor Lifecycle //
  101.     /////////////////////
  102.  
  103.     ////////////////////
  104.     // Actor Behavior //
  105.     ////////////////////
  106.  
  107.     @Override
  108.     public Receive createReceive() {
  109.         return receiveBuilder()
  110.                 .match(LargeMessage.class, this::handle)
  111.                 .match(BytesMessage.class, this::handle)
  112.                 .match(SourceMessage.class, this::handle)
  113.                 .match(SerializedByteMessage.class, this::deserializer)
  114.                 .matchAny(object -> this.log().info("Received unknown message: \"{}\"", object.toString()))
  115.                 .build();
  116.     }
  117.     private void deserializer(SerializedByteMessage<?> message){
  118.         try {
  119.             ByteArrayInputStream bai = new ByteArrayInputStream(message.bytes);
  120.             Kryo kryo = new Kryo();
  121.             Input input = new Input(bai);
  122.             Object messageObject = kryo.readClassAndObject(input);
  123.             input.close();
  124.             bai.close();
  125.             // Finally, we send the deserialize object to its destination
  126.             message.receiver.tell(messageObject, message.sender);
  127.         } catch (Exception e) {
  128.             e.printStackTrace();
  129.         }
  130.     }
  131.  
  132.     private void handle(LargeMessage<?> message) {
  133.         ActorRef receiver = message.getReceiver();
  134.         ActorSelection receiverProxy = this.context().actorSelection(receiver.path().child(DEFAULT_NAME));
  135.  
  136.         // Serializing the message
  137.         ByteArrayOutputStream bos = new ByteArrayOutputStream();
  138.         Kryo kryo = new Kryo();
  139.         Output output = new Output(bos);
  140.         kryo.writeClassAndObject(output, message.getMessage());
  141.         output.close();
  142.         byte[] byteArrayData = bos.toByteArray();
  143.  
  144.         // Akka Streaming
  145.         Source<List<Byte>, NotUsed> source = Source.from(Arrays.asList(ArrayUtils.toObject(byteArrayData))).grouped(262144); // max size = 262144
  146.         SourceRef<List<Byte>> sourceRef = source.runWith(StreamRefs.sourceRef(), this.context().system());
  147.  
  148.         // Passing the source reference as a customized "SourceMessage"
  149.         receiverProxy.tell(new SourceMessage(sourceRef, byteArrayData.length, this.sender(), message.getReceiver()), this.self());
  150.     }
  151.  
  152.     private void handle(SourceMessage message) {
  153.         // Receiving the customized "SourceMessage" and retrieving the source reference
  154.         SourceRef<List<Byte>> sourceRef = message.getSourceRef();
  155.         byte[] bytes = new byte[message.getLength()];
  156.         sourceRef.getSource().runWith(Sink.seq(), this.context().system()).whenComplete((data, exception) -> {
  157.             int index = 0;
  158.             for (List<Byte> list : data) {
  159.                 for (Byte abyte : list) {
  160.                     bytes[index] = abyte;
  161.                     index++;
  162.                 }
  163.             }
  164.  
  165.             ActorRef receiver = message.getReceiver();
  166.             ActorSelection receiverProxy = this.context().actorSelection(receiver.path().child(DEFAULT_NAME));
  167.             SerializedByteMessage serializedByteMessage = new SerializedByteMessage(bytes, message.getReceiver(), message.getSender());
  168.             receiverProxy.tell(serializedByteMessage, message.getSender());
  169.  
  170.         });
  171.     }
  172.  
  173.     private void handle(BytesMessage<?> message) {
  174.         // Reassemble the message content, deserialize it and/or load the content from some local location before forwarding its content.
  175.         message.getReceiver().tell(message.getBytes(), message.getSender());
  176.     }
  177. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement