Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package de.hpi.ddm.actors;
- import java.io.*;
- import java.util.Arrays;
- import java.util.List;
- import java.util.concurrent.CompletionStage;
- import akka.Done;
- import akka.NotUsed;
- import akka.actor.AbstractLoggingActor;
- import akka.actor.ActorRef;
- import akka.actor.ActorSelection;
- import akka.actor.Props;
- import akka.serialization.Serialization;
- import akka.serialization.SerializationExtension;
- import akka.serialization.Serializers;
- import akka.stream.ActorMaterializer;
- import akka.stream.Materializer;
- import akka.stream.SourceRef;
- import akka.stream.javadsl.Sink;
- import akka.stream.javadsl.Source;
- import akka.stream.javadsl.StreamRefs;
- import akka.util.ByteString;
- import de.hpi.ddm.structures.Chunker;
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.NoArgsConstructor;
- public class LargeMessageProxy extends AbstractLoggingActor {
- ////////////////////////
- // Actor Construction //
- ////////////////////////
- public static final String DEFAULT_NAME = "largeMessageProxy";
- private ByteString completeByteString = ByteString.empty();
- public static Props props() {
- return Props.create(LargeMessageProxy.class);
- }
- ////////////////////
- // Actor Messages //
- ////////////////////
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- public static class LargeMessage<T> implements Serializable {
- private static final long serialVersionUID = 2940665245810221108L;
- private T message;
- private ActorRef receiver;
- public T getMessage() {
- return message;
- }
- public ActorRef getReceiver() {
- return receiver;
- }
- }
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- public static class BytesMessage<T> implements Serializable {
- private static final long serialVersionUID = 4057807743872319842L;
- private T bytes;
- private ActorRef sender;
- private ActorRef receiver;
- private Integer serializerID;
- private String manifest;
- public T getBytes() {
- return bytes;
- }
- public ActorRef getReceiver() {
- return receiver;
- }
- public ActorRef getSender() {
- return sender;
- }
- public Integer getSerializerID() {
- return serializerID;
- }
- public String getManifest() {
- return manifest;
- }
- }
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- public static class TestMesssage implements Serializable {
- private static final long serialVersionUID = 2237807743872319840L;
- private byte[] bytes;
- private int serializerID;
- private String manifest;
- }
- /////////////////
- // Actor State //
- /////////////////
- /////////////////////
- // Actor Lifecycle //
- /////////////////////
- ////////////////////
- // Actor Behavior //
- ////////////////////
- @Override
- public Receive createReceive() {
- return receiveBuilder()
- .match(LargeMessage.class, this::handle)
- .match(BytesMessage.class, this::handle)
- .match(ByteString.class, this::handle)
- .match(CompletionStage.class, this::handle)
- .matchEquals("complete", completed -> {
- complete();
- System.out.println("ByteString sent successfully.");
- })
- .matchAny(object -> {
- this.log().info("Received unknown message: \"{}\"", object.toString());
- })
- .build();
- }
- private void handle(CompletionStage completionStage) {
- System.out.println("Completion stage");
- Materializer materializer = ActorMaterializer.create(getContext().getSystem());
- completionStage.thenRun(() -> {
- });
- // completionStage.thenApply(o -> {
- // System.out.println(o.toString());
- // return 0;
- // });
- }
- private void complete() throws IOException {
- byte[] byteArrayData = completeByteString.toArray();
- ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayData);
- ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
- // MyClass object = (MyClass) objectInputStream.readObject();
- // objectInputStream.close();
- System.out.println("Got message from sender");
- }
- private void handle(ByteString byteString) {
- completeByteString = completeByteString.concat(byteString);
- }
- private void handle(LargeMessage<?> message) throws IOException {
- ActorRef receiver = message.getReceiver();
- ActorSelection receiverProxy = this.context().actorSelection(receiver.path().child(DEFAULT_NAME));
- // Serialization using https://doc.akka.io/docs/akka/current/serialization.html#programmatic
- Serialization serialization = SerializationExtension.get(getContext().getSystem());
- byte[] bytes = serialization.serialize(message).get();
- int serializerId = serialization.findSerializerFor(message).identifier();
- String manifest = Serializers.manifestFor(serialization.findSerializerFor(message), message);
- TestMesssage testMessage = new TestMesssage(bytes, serializerId, manifest);
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- try {
- ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
- objectOutputStream.writeObject(testMessage);
- objectOutputStream.flush();
- objectOutputStream.close();
- byteArrayOutputStream.close();
- } catch (IOException ex) {
- System.out.println("IOException is caught");
- }
- byte[] byteArrayData = byteArrayOutputStream.toByteArray();
- ByteString byteString = ByteString.fromArray(byteArrayData);
- Materializer mat = ActorMaterializer.create(getContext().getSystem());
- Source<List<byte[]>, NotUsed> source = Source.single(byteArrayData).grouped(100);
- //
- CompletionStage<SourceRef<List<byte[]>>> stage = source.runWith(StreamRefs.sourceRef(), mat);
- //
- // receiverProxy.tell(new BytesMessage<>(message.getMessage(), this.sender(), message.getReceiver(), serializerId, manifest, stage), this.self());
- receiverProxy.tell(stage, this.self());
- System.out.println("ByteString is sent!");
- // This will definitely fail in a distributed setting if the serialized message is large!
- // Solution options:
- // 1. Serialize the object and send its bytes batch-wise (make sure to use artery's side channel then).
- // 2. Serialize the object and send its bytes via Akka streaming.
- // 3. Send the object via Akka's http client-server component.
- // 4. Other ideas ...
- // receiverProxy.tell(new BytesMessage<>(message.getMessage(), this.sender(), message.getReceiver()), this.self());
- }
- private void handle(BytesMessage<?> message) {
- // Reassemble the message content, deserialize it and/or load the content from some local location before forwarding its content.
- // Serialization serialization = SerializationExtension.get(getContext().getSystem());
- // LargeMessage largeMessage = (LargeMessage) serialization.deserialize((byte[]) message.getBytes(), message.getSerializerID(), message.getManifest()).get();
- System.out.println("BytesMessage is received!");
- message.getReceiver().tell(message.getBytes(), message.getSender());
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement