I have a producer-consumer scenario¹ based on a bounded Channel<T>
.
Channel<Item> channel = Channel.CreateBounded<Item>(10);
The items are coming from an RDBMS, to which the producer connects and fetches them one by one. The peculiarity is that I don't have the luxury of keeping the DB connection alive for the whole lifetime of the producer. I have to close the connection when the channel is full, and reopen the connection when the channel has again space available for a new item. So I implemented the producer like this:
// Producer
while (true)
{
await channel.Writer.WaitToWriteAsync();
connection.Open();
Item item;
while (true)
{
item = connection.GetNextItem(); // This is fast
if (!channel.Writer.TryWrite(item)) break;
}
connection.Close();
await channel.Writer.WriteAsync(item);
}
The producer waits until the channel.Writer.WaitToWriteAsync()
task completes, then opens the DB connection, writes in the channel as many items as it can until one is rejected, closes the DB connection, writes asynchronously the rejected item, and loops back to the waiting.
The consumer is pretty standard:
// Consumer
await foreach (Item item in channel.Reader.ReadAllAsync())
{
// Process the item (this is slow)
}
My problem with this design is that the DB connection is opened and closed too often. Both opening and closing the connection has a non-negligible overhead, so I would like to minimize how frequently it happens. Although the capacity of the channel is 10, I would prefer if the WaitToWriteAsync
task was completing when the channel is half-full (5 items), not immediately when the stored items drop from 10 to 9.
My question is: How can I modify my producer, so that it connects to the database when there are 5 or less items in the channel, and closes the connection when the channel is full with 10 items?
Below is the output from a minimal example that I wrote, that reproduces the undesirable behavior:
19:20:55.811 [4] > Opening connection -->
19:20:55.933 [4] > Produced #1
19:20:55.934 [4] > Produced #2
19:20:55.934 [4] > Produced #3
19:20:55.934 [4] > Produced #4
19:20:55.934 [4] > Produced #5
19:20:55.934 [4] > Produced #6
19:20:55.935 [4] > Produced #7
19:20:55.935 [4] > Produced #8
19:20:55.935 [4] > Produced #9
19:20:55.935 [4] > Produced #10
19:20:55.935 [4] > Produced #11
19:20:55.935 [4] > Closing connection <--
19:20:55.936 [6] > Consuming: 1
19:20:56.037 [4] > Consuming: 2
19:20:56.037 [6] > Opening connection -->
19:20:56.137 [6] > Produced #12
19:20:56.137 [6] > Produced #13
19:20:56.137 [6] > Closing connection <--
19:20:56.137 [4] > Consuming: 3
19:20:56.238 [6] > Consuming: 4
19:20:56.238 [4] > Opening connection -->
19:20:56.338 [4] > Produced #14
19:20:56.338 [4] > Produced #15
19:20:56.338 [4] > Closing connection <--
19:20:56.338 [6] > Consuming: 5
19:20:56.439 [4] > Consuming: 6
19:20:56.439 [6] > Opening connection -->
19:20:56.539 [6] > Produced #16
19:20:56.539 [6] > Produced #17
19:20:56.539 [6] > Closing connection <--
19:20:56.539 [4] > Consuming: 7
19:20:56.644 [6] > Consuming: 8
19:20:56.644 [4] > Opening connection -->
19:20:56.744 [4] > Produced #18
19:20:56.745 [7] > Consuming: 9
19:20:56.745 [4] > Produced #19
19:20:56.745 [4] > Produced #20
19:20:56.745 [4] > Closing connection <--
19:20:56.846 [7] > Consuming: 10
19:20:56.847 [4] > Producer completed
19:20:56.946 [4] > Consuming: 11
19:20:57.046 [4] > Consuming: 12
19:20:57.147 [4] > Consuming: 13
19:20:57.247 [4] > Consuming: 14
19:20:57.347 [4] > Consuming: 15
19:20:57.452 [4] > Consuming: 16
19:20:57.552 [4] > Consuming: 17
19:20:57.653 [4] > Consuming: 18
19:20:57.753 [4] > Consuming: 19
19:20:57.854 [4] > Consuming: 20
19:20:57.955 [1] > Finished
As you can see there is a lot of "Opening/Closing connection" going on.
My question has similarities with this older question:
Given an external producer API that can be stopped and started, efficiently stop the producer when local buffer is full.
The difference is that in my case the producer is a loop, and not the event handler of a service, as in the other question.
¹ This scenario is contrived. It was inspired by a relatively recent GitHub API proposal.
Clarification: The channel should not be drained completely before reconnecting to the DB. That's because opening the connection takes some time, and I don't want the consumer to be idle during this time. So the producer should reconnect when the channel has dropped to 5 items or less, not when it is completely empty.