Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package de.hpi.ddm.actors;
- import java.io.Serializable;
- import java.util.Arrays;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.CompletionStage;
- import akka.Done;
- import akka.NotUsed;
- import akka.actor.*;
- import akka.serialization.Serialization;
- import akka.serialization.SerializationExtension;
- import akka.serialization.Serializers;
- import akka.stream.ActorMaterializer;
- import akka.stream.Materializer;
- import akka.stream.SourceRef;
- import akka.stream.impl.StreamSupervisor;
- import akka.stream.javadsl.*;
- import akka.util.ByteString;
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.NoArgsConstructor;
- public class LargeMessageProxy extends AbstractLoggingActor {
- ////////////////////////
- // Actor Construction //
- ////////////////////////
- public static final String DEFAULT_NAME = "largeMessageProxy";
- public static Props props() {
- return Props.create(LargeMessageProxy.class);
- }
- ////////////////////
- // Actor Messages //
- ////////////////////
- @Data @NoArgsConstructor @AllArgsConstructor
- public static class LargeMessage<T> implements Serializable {
- private static final long serialVersionUID = 2940665245810221108L;
- private T message;
- private ActorRef receiver;
- }
- @Data @NoArgsConstructor @AllArgsConstructor
- public static class BytesMessage<T> implements Serializable {
- private static final long serialVersionUID = 4057807743872319842L;
- private T bytes;
- private ActorRef sender;
- private ActorRef receiver;
- private Integer serializerId;
- private String manifest;
- private CompletionStage<SourceRef<ByteString>> stage;
- public String getManifest(){
- return manifest;
- }
- public Integer getSerializerId(){
- return serializerId;
- }
- public CompletionStage<SourceRef<ByteString>> getStage(){
- return stage;
- }
- }
- /////////////////
- // Actor State //
- /////////////////
- /////////////////////
- // Actor Lifecycle //
- /////////////////////
- ////////////////////
- // Actor Behavior //
- ////////////////////
- @Override
- public Receive createReceive() {
- return receiveBuilder()
- .match(LargeMessage.class, this::handle)
- .match(BytesMessage.class, this::handle)
- .matchAny(object -> this.log().info("Received unknown message: \"{}\"", object.toString()))
- .build();
- }
- private void handle(LargeMessage<?> message) {
- ActorRef receiver = message.getReceiver();
- ActorSelection receiverProxy = this.context().actorSelection(receiver.path().child(DEFAULT_NAME));
- // Just serializing the data
- Serialization serialization = SerializationExtension.get(getContext().getSystem());
- byte[] bytes = serialization.serialize(message).get();
- int serializerId = serialization.findSerializerFor(message).identifier();
- String manifest = Serializers.manifestFor(serialization.findSerializerFor(message), message);
- ByteString byteString = ByteString.fromArray(bytes);
- // Akka Streaming
- Source<ByteString, NotUsed> source = Source.from(Arrays.asList(byteString));
- Materializer mat = ActorMaterializer.create(getContext().getSystem());
- CompletionStage<SourceRef<ByteString>> stage = source.runWith(StreamRefs.sourceRef(), mat);
- receiverProxy.tell(new BytesMessage<>(message.getMessage(), this.sender(), message.getReceiver(), serializerId, manifest, stage), this.self());
- }
- private void handle(BytesMessage<?> message) {
- // Reassemble the message content, deserialize it and/or load the content from some local location before forwarding its content.
- // Sink<ByteString, NotUsed> sink = Sink.actorRef(getContext().getSelf(), "complete");
- CompletionStage<SourceRef<ByteString>> stage = message.getStage();
- CompletionStage<Void> future = stage.thenApply(SourceRef::getSource).thenRun(() -> ActorMaterializer.create(getContext().getSystem()));
- message.getReceiver().tell(message.getBytes(), message.getSender());
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement