Advertisement
Guest User

Untitled

a guest
Nov 19th, 2019
140
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.40 KB | None | 0 0
  1. package de.hpi.ddm.actors;
  2.  
  3. // custom imports
  4. import akka.Done;
  5. import akka.stream.javadsl.*;
  6. import com.google.common.primitives.Bytes;
  7. import java.util.List;
  8.  
  9.  
  10. import java.io.Serializable;
  11. import java.util.Arrays;
  12. import java.util.concurrent.CompletionStage;
  13.  
  14. import akka.NotUsed;
  15. import akka.actor.AbstractLoggingActor;
  16. import akka.actor.ActorRef;
  17. import akka.actor.ActorSelection;
  18. import akka.actor.Props;
  19. import akka.serialization.Serialization;
  20. import akka.serialization.SerializationExtension;
  21. import akka.serialization.Serializers;
  22. import lombok.AllArgsConstructor;
  23. import lombok.Data;
  24. import lombok.NoArgsConstructor;
  25.  
  26. public class LargeMessageProxy extends AbstractLoggingActor {
  27.  
  28.     ////////////////////////
  29.     // Actor Construction //
  30.     ////////////////////////
  31.  
  32.     public static final String DEFAULT_NAME = "largeMessageProxy";
  33.    
  34.     public static Props props() {
  35.         return Props.create(LargeMessageProxy.class);
  36.     }
  37.  
  38.     ////////////////////
  39.     // Actor Messages //
  40.     ////////////////////
  41.    
  42.     @Data @NoArgsConstructor @AllArgsConstructor
  43.     public static class LargeMessage<T> implements Serializable {
  44.         private static final long serialVersionUID = 2940665245810221108L;
  45.         private T message;
  46.         private ActorRef receiver;
  47.     }
  48.  
  49.     @Data   @NoArgsConstructor  @AllArgsConstructor
  50.     public static class BytesMessage<T> implements Serializable {
  51.         private static final long serialVersionUID = 4057807743872319842L;
  52.         private T bytes;
  53.         private ActorRef sender;
  54.         private ActorRef receiver;
  55.         private Integer serializerID;
  56.         private String manifest;
  57.         private RunnableGraph runnable;
  58.  
  59.         public T getBytes() {
  60.             return bytes;
  61.         }
  62.  
  63.         public ActorRef getReceiver() {
  64.             return receiver;
  65.         }
  66.  
  67.         public ActorRef getSender() {
  68.             return sender;
  69.         }
  70.  
  71.         public Integer getSerializerID() {
  72.             return serializerID;
  73.         }
  74.  
  75.         public String getManifest() {
  76.             return manifest;
  77.         }
  78.  
  79.         public RunnableGraph getRunnable(){
  80.             return runnable;
  81.         }
  82.  
  83.     }
  84.    
  85.     /////////////////
  86.     // Actor State //
  87.     /////////////////
  88.    
  89.     /////////////////////
  90.     // Actor Lifecycle //
  91.     /////////////////////
  92.  
  93.     ////////////////////
  94.     // Actor Behavior //
  95.     ////////////////////
  96.    
  97.     @Override
  98.     public Receive createReceive() {
  99.         return receiveBuilder()
  100.                 .match(LargeMessage.class, this::handle)
  101.                 .match(BytesMessage.class, this::handle)
  102.                 .matchAny(object -> this.log().info("Received unknown message: \"{}\"", object.toString()))
  103.                 .build();
  104.     }
  105.  
  106.     private void handle(LargeMessage<?> message) {
  107.         ActorRef receiver = message.getReceiver();
  108.         ActorSelection receiverProxy = this.context().actorSelection(receiver.path().child(DEFAULT_NAME));
  109.        
  110.         // This will definitely fail in a distributed setting if the serialized message is large!
  111.         // Solution options:
  112.         // 1. Serialize the object and send its bytes batch-wise (make sure to use artery's side channel then).
  113.         // 2. Serialize the object and send its bytes via Akka streaming.
  114.         // 3. Send the object via Akka's http client-server component.
  115.         // 4. Other ideas ...
  116.  
  117.         // Here we serialize our data
  118.         Serialization serialization = SerializationExtension.get(getContext().getSystem());
  119.         byte[] bytes = serialization.serialize(message).get();
  120.         String bytesAsString = new String(bytes);
  121. //      List<Byte> iterableByteArray = Bytes.asList(bytes); // this one can be an option to make a byte array iterable
  122.         int serializerId = serialization.findSerializerFor(message).identifier();
  123.         String manifest = Serializers.manifestFor(serialization.findSerializerFor(message), message);
  124.  
  125.         final Source<String, NotUsed> source = Source.from(Arrays.asList(bytesAsString));
  126.         final Flow<String, String, NotUsed> flow = Flow.fromFunction((String x) ->x);
  127.         final Sink<String, CompletionStage<Done>> sink = Sink.foreach(x -> System.out.println(x));
  128.         final RunnableGraph<NotUsed> runnable = source.via(flow).to(sink);
  129.  
  130.         receiverProxy.tell(new BytesMessage<>(message.getMessage(), this.sender(), message.getReceiver(), serializerId, manifest, runnable), this.self());
  131.     }
  132.  
  133.     private void handle(BytesMessage<?> message) {
  134.         // Reassemble the message content, deserialize it and/or load the content from some local location before forwarding its content.
  135.         Integer serializerId = message.serializerID;
  136.         String manifest = message.manifest;
  137.         RunnableGraph runnable = message.runnable;
  138.    
  139.         runnable.run();
  140.         message.getReceiver().tell(message.getBytes(), message.getSender());
  141.     }
  142. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement