2

I want to sent data from a stream in one actor to a stream in another actor. In the end this should work with remote actors. With Akka.NET Streams this should be an easy task but maybe I have misconcepted it.

This is part of the SenderActor:

var stream = new FileStream(FILENAME, FileMode.Open);
var materializer = Context.Materializer();
var source = StreamConverters.FromInputStream(() => stream, CHUNK_SIZE);
var result = source.To(Sink.ActorRef<ByteString>(this.receiver, new Messages.StreamComplete()))
    .Run(materializer);

Note: this.receiver is an IActorRef where the data should be send to.

The ReceiverActor now gets all the ByteString messages and Messages.StreamCompleted after the stream ended.

How can this easily be put together in the ReceiverActor? In best case as a Stream again.

In the ReceiverActor I tried to send all that ByteString messages to a Source which should fill a MemoryStream:

class ReceiverActor : ReceiveActor
{
    private readonly IActorRef streamReceiver;
    private readonly MemoryStream stream;

    public ReceiverActor()
    {
        this.stream = new MemoryStream();
        this.streamReceiver = Source.ActorRef<ByteString>(128, Akka.Streams.OverflowStrategy.Fail)
            .To(StreamConverters.FromOutputStream(() => this.stream, true))
            .Run(Context.Materializer());
        Context.Watch(this.streamReceiver);

        Receive<ByteString>((message) => ReceivedStreamChunk(message));
        Receive<Messages.StreamComplete>((message) => ReceivedStreamComplete(message));
        Receive<Terminated>((message) => ReceivedTerminated(message));
        ReceiveAny((message) => ReceivedAnyMessage(message));
    }
    private void ReceivedTerminated(Terminated message)
    {
        Console.WriteLine($"[receiver] {message.ActorRef.Path.ToStringWithoutAddress()} terminated, local stream length {(this.stream.CanRead ? this.stream.Length : -1)}");
    }
    private void ReceivedStreamComplete(Messages.StreamComplete message)
    {
        Console.WriteLine($"[receiver] got signaled that the stream completed, local stream length {this.stream.Length}");
    }
    private void ReceivedStreamChunk(object message)
    {
        Console.WriteLine($"[receiver] got chunk, previous stream length {this.stream.Length}");
        this.streamReceiver.Forward(message);
    }
    private void ReceivedAnyMessage(object message)
    {
        Console.WriteLine($"[receiver] got message {message.GetType().FullName}");
    }
}

But the MemoryStream is filled async and when the streamReceiver terminates it closes the stream so I cannot get the data.

How can I retrieve the stream properly?


Update I got it locally working:

Thanks to input from Horusiath in Akka.NET's gitter channel I was able to directly receive the ByteStrings, not necessary to use Akka.Stream there.

Receive<ByteString>((message) => ReceivedStreamChunk(message));

And:

private void ReceivedStreamChunk(ByteString message)
{
    var bytes = message.ToArray();
    targetStream.Write(bytes, 0, bytes.Length);
    Console.WriteLine($"[receiver] got chunk, now stream length {this.stream.Length}");
}

Note that Akka.NET 1.3 will add the methods WriteTo(Stream) and WriteToAsync(Stream, CancellationToken) to ByteString.

Still this does not work with remote actors as the receiving actor system gets this error (Serializer is Hyperion):

Error [No parameterless constructor defined for this object.]

Actually ByteString has a parameterless constructor but it is protected.

I take it that ByteString is not serializable?

ZoolWay
  • 5,411
  • 6
  • 42
  • 76

1 Answers1

0

I got it working with a remote actor when converting the ByteString to byte[] by inserting a transformation flow between the source and the sink.

Example:

FileIO.FromFile(...).Via(Flow.Create<ByteString>().Select(x => x.ToArray())).To(...)

Harry13
  • 733
  • 1
  • 4
  • 15
  • 1
    I came to this question as I have the same need: Send a file from one actor to another, remote, actor. I also found this answer: https://stackoverflow.com/a/49545111/1060314 Specifically **In general sources and sinks working with actor refs have not been designed to work over remote connections - they don't cover message retries, which can cause deadlocks in your system if some stream control message won't be passed in.** Which seems that StreamsRefs in 1.4 solve. – AJ Venturella Nov 06 '18 at 16:41