Advertisement
mdrkb

LargeMessageProxy.java

Nov 19th, 2019
114
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 7.67 KB | None | 0 0
  1. package de.hpi.ddm.actors;
  2.  
  3. import java.io.*;
  4. import java.util.Arrays;
  5. import java.util.List;
  6. import java.util.concurrent.CompletionStage;
  7.  
  8. import akka.Done;
  9. import akka.NotUsed;
  10. import akka.actor.AbstractLoggingActor;
  11. import akka.actor.ActorRef;
  12. import akka.actor.ActorSelection;
  13. import akka.actor.Props;
  14. import akka.serialization.Serialization;
  15. import akka.serialization.SerializationExtension;
  16. import akka.serialization.Serializers;
  17. import akka.stream.ActorMaterializer;
  18. import akka.stream.Materializer;
  19. import akka.stream.SourceRef;
  20. import akka.stream.javadsl.Sink;
  21. import akka.stream.javadsl.Source;
  22. import akka.stream.javadsl.StreamRefs;
  23. import akka.util.ByteString;
  24. import de.hpi.ddm.structures.Chunker;
  25. import lombok.AllArgsConstructor;
  26. import lombok.Data;
  27. import lombok.NoArgsConstructor;
  28.  
  29. public class LargeMessageProxy extends AbstractLoggingActor {
  30.  
  31.     ////////////////////////
  32.     // Actor Construction //
  33.     ////////////////////////
  34.  
  35.     public static final String DEFAULT_NAME = "largeMessageProxy";
  36.  
  37.     private ByteString completeByteString = ByteString.empty();
  38.  
  39.     public static Props props() {
  40.         return Props.create(LargeMessageProxy.class);
  41.     }
  42.  
  43.     ////////////////////
  44.     // Actor Messages //
  45.     ////////////////////
  46.  
  47.     @Data
  48.     @NoArgsConstructor
  49.     @AllArgsConstructor
  50.     public static class LargeMessage<T> implements Serializable {
  51.         private static final long serialVersionUID = 2940665245810221108L;
  52.         private T message;
  53.         private ActorRef receiver;
  54.  
  55.         public T getMessage() {
  56.             return message;
  57.         }
  58.  
  59.         public ActorRef getReceiver() {
  60.             return receiver;
  61.         }
  62.     }
  63.  
  64.     @Data
  65.     @NoArgsConstructor
  66.     @AllArgsConstructor
  67.     public static class BytesMessage<T> implements Serializable {
  68.         private static final long serialVersionUID = 4057807743872319842L;
  69.         private T bytes;
  70.         private ActorRef sender;
  71.         private ActorRef receiver;
  72.         private Integer serializerID;
  73.         private String manifest;
  74.  
  75.         public T getBytes() {
  76.             return bytes;
  77.         }
  78.  
  79.         public ActorRef getReceiver() {
  80.             return receiver;
  81.         }
  82.  
  83.         public ActorRef getSender() {
  84.             return sender;
  85.         }
  86.  
  87.         public Integer getSerializerID() {
  88.             return serializerID;
  89.         }
  90.  
  91.         public String getManifest() {
  92.             return manifest;
  93.         }
  94.     }
  95.  
  96.     @Data
  97.     @NoArgsConstructor
  98.     @AllArgsConstructor
  99.     public static class TestMesssage implements Serializable {
  100.         private static final long serialVersionUID = 2237807743872319840L;
  101.         private byte[] bytes;
  102.         private int serializerID;
  103.         private String manifest;
  104.     }
  105.  
  106.     /////////////////
  107.     // Actor State //
  108.     /////////////////
  109.  
  110.     /////////////////////
  111.     // Actor Lifecycle //
  112.     /////////////////////
  113.  
  114.     ////////////////////
  115.     // Actor Behavior //
  116.     ////////////////////
  117.  
  118.     @Override
  119.     public Receive createReceive() {
  120.         return receiveBuilder()
  121.                 .match(LargeMessage.class, this::handle)
  122.                 .match(BytesMessage.class, this::handle)
  123.                 .match(ByteString.class, this::handle)
  124.                 .match(CompletionStage.class, this::handle)
  125.                 .matchEquals("complete", completed -> {
  126.                     complete();
  127.                     System.out.println("ByteString sent successfully.");
  128.                 })
  129.                 .matchAny(object -> {
  130.                     this.log().info("Received unknown message: \"{}\"", object.toString());
  131.                 })
  132.                 .build();
  133.     }
  134.  
  135.     private void handle(CompletionStage completionStage) {
  136.         System.out.println("Completion stage");
  137.         Materializer materializer = ActorMaterializer.create(getContext().getSystem());
  138.         completionStage.thenRun(() -> {
  139.  
  140.         });
  141. //        completionStage.thenApply(o -> {
  142. //            System.out.println(o.toString());
  143. //            return 0;
  144. //        });
  145.     }
  146.  
  147.     private void complete() throws IOException {
  148.         byte[] byteArrayData = completeByteString.toArray();
  149.         ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayData);
  150.         ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
  151. //        MyClass object = (MyClass) objectInputStream.readObject();
  152. //        objectInputStream.close();
  153.         System.out.println("Got message from sender");
  154.     }
  155.  
  156.  
  157.     private void handle(ByteString byteString) {
  158.         completeByteString = completeByteString.concat(byteString);
  159.     }
  160.  
  161.     private void handle(LargeMessage<?> message) throws IOException {
  162.         ActorRef receiver = message.getReceiver();
  163.         ActorSelection receiverProxy = this.context().actorSelection(receiver.path().child(DEFAULT_NAME));
  164.  
  165.         // Serialization using https://doc.akka.io/docs/akka/current/serialization.html#programmatic
  166.         Serialization serialization = SerializationExtension.get(getContext().getSystem());
  167.         byte[] bytes = serialization.serialize(message).get();
  168.         int serializerId = serialization.findSerializerFor(message).identifier();
  169.         String manifest = Serializers.manifestFor(serialization.findSerializerFor(message), message);
  170.  
  171.         TestMesssage testMessage = new TestMesssage(bytes, serializerId, manifest);
  172.  
  173.         ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
  174.         try {
  175.             ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
  176.             objectOutputStream.writeObject(testMessage);
  177.             objectOutputStream.flush();
  178.             objectOutputStream.close();
  179.             byteArrayOutputStream.close();
  180.         } catch (IOException ex) {
  181.             System.out.println("IOException is caught");
  182.         }
  183.  
  184.         byte[] byteArrayData = byteArrayOutputStream.toByteArray();
  185.         ByteString byteString = ByteString.fromArray(byteArrayData);
  186.  
  187.         Materializer mat = ActorMaterializer.create(getContext().getSystem());
  188.         Source<List<byte[]>, NotUsed> source = Source.single(byteArrayData).grouped(100);
  189.  
  190. //
  191.         CompletionStage<SourceRef<List<byte[]>>> stage = source.runWith(StreamRefs.sourceRef(), mat);
  192. //
  193. //        receiverProxy.tell(new BytesMessage<>(message.getMessage(), this.sender(), message.getReceiver(), serializerId, manifest, stage), this.self());
  194.         receiverProxy.tell(stage, this.self());
  195.  
  196.         System.out.println("ByteString is sent!");
  197.  
  198.         // This will definitely fail in a distributed setting if the serialized message is large!
  199.         // Solution options:
  200.         // 1. Serialize the object and send its bytes batch-wise (make sure to use artery's side channel then).
  201.         // 2. Serialize the object and send its bytes via Akka streaming.
  202.         // 3. Send the object via Akka's http client-server component.
  203.         // 4. Other ideas ...
  204.         // receiverProxy.tell(new BytesMessage<>(message.getMessage(), this.sender(), message.getReceiver()), this.self());
  205.     }
  206.  
  207.     private void handle(BytesMessage<?> message) {
  208.         // Reassemble the message content, deserialize it and/or load the content from some local location before forwarding its content.
  209.         // Serialization serialization = SerializationExtension.get(getContext().getSystem());
  210.         // LargeMessage largeMessage = (LargeMessage) serialization.deserialize((byte[]) message.getBytes(), message.getSerializerID(), message.getManifest()).get();
  211.         System.out.println("BytesMessage is received!");
  212.         message.getReceiver().tell(message.getBytes(), message.getSender());
  213.     }
  214. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement