I want to sent data from a stream in one actor to a stream in another actor. In the end this should work with remote actors. With Akka.NET Streams this should be an easy task but maybe I have misconcepted it.
This is part of the SenderActor:
var stream = new FileStream(FILENAME, FileMode.Open);
var materializer = Context.Materializer();
var source = StreamConverters.FromInputStream(() => stream, CHUNK_SIZE);
var result = source.To(Sink.ActorRef<ByteString>(this.receiver, new Messages.StreamComplete()))
.Run(materializer);
Note: this.receiver
is an IActorRef
where the data should be send to.
The ReceiverActor now gets all the ByteString
messages and Messages.StreamCompleted
after the stream ended.
How can this easily be put together in the ReceiverActor? In best case as a Stream
again.
In the ReceiverActor I tried to send all that ByteString
messages to a Source
which should fill a MemoryStream
:
class ReceiverActor : ReceiveActor
{
private readonly IActorRef streamReceiver;
private readonly MemoryStream stream;
public ReceiverActor()
{
this.stream = new MemoryStream();
this.streamReceiver = Source.ActorRef<ByteString>(128, Akka.Streams.OverflowStrategy.Fail)
.To(StreamConverters.FromOutputStream(() => this.stream, true))
.Run(Context.Materializer());
Context.Watch(this.streamReceiver);
Receive<ByteString>((message) => ReceivedStreamChunk(message));
Receive<Messages.StreamComplete>((message) => ReceivedStreamComplete(message));
Receive<Terminated>((message) => ReceivedTerminated(message));
ReceiveAny((message) => ReceivedAnyMessage(message));
}
private void ReceivedTerminated(Terminated message)
{
Console.WriteLine($"[receiver] {message.ActorRef.Path.ToStringWithoutAddress()} terminated, local stream length {(this.stream.CanRead ? this.stream.Length : -1)}");
}
private void ReceivedStreamComplete(Messages.StreamComplete message)
{
Console.WriteLine($"[receiver] got signaled that the stream completed, local stream length {this.stream.Length}");
}
private void ReceivedStreamChunk(object message)
{
Console.WriteLine($"[receiver] got chunk, previous stream length {this.stream.Length}");
this.streamReceiver.Forward(message);
}
private void ReceivedAnyMessage(object message)
{
Console.WriteLine($"[receiver] got message {message.GetType().FullName}");
}
}
But the MemoryStream
is filled async and when the streamReceiver
terminates it closes the stream so I cannot get the data.
How can I retrieve the stream properly?
Update I got it locally working:
Thanks to input from Horusiath in Akka.NET's gitter channel I was able to directly receive the ByteStrings
, not necessary to use Akka.Stream there.
Receive<ByteString>((message) => ReceivedStreamChunk(message));
And:
private void ReceivedStreamChunk(ByteString message)
{
var bytes = message.ToArray();
targetStream.Write(bytes, 0, bytes.Length);
Console.WriteLine($"[receiver] got chunk, now stream length {this.stream.Length}");
}
Note that Akka.NET 1.3 will add the methods WriteTo(Stream)
and WriteToAsync(Stream, CancellationToken)
to ByteString
.
Still this does not work with remote actors as the receiving actor system gets this error (Serializer is Hyperion):
Error [No parameterless constructor defined for this object.]
Actually ByteString
has a parameterless constructor but it is protected
.
I take it that ByteString
is not serializable?