I have a simple scenario with two threads where the first thread reads permanently some data and enqueues that data into a queue. The second thread first peeks at a single object from that queue and makes some conditional checks. If these are good the single object will be dequeued and passed to some processing.
I have tried to use the ConcurrentQueue
which is a thread safe implementation of a simple queue, but the problem with this one is that all calls are blocking. This means if the first thread is enqueuing an object, the second thread can't peek or dequeue an object.
In my situation I need to enqueue at the end and dequeue from the beginning of the queue at the same time.
The lock statement of C# would also.
So my question is whether it is possible to do these both operations in parallel without blocking each other in a thread safe way.
These are my first tries and this is an similar example for my problem.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Scenario {
public class Program {
public static void Main(string[] args) {
Scenario scenario = new Scenario();
scenario.Start();
Console.ReadKey();
}
public class Scenario {
public Scenario() {
someData = new Queue<int>();
}
public void Start() {
Task.Factory.StartNew(firstThread);
Task.Factory.StartNew(secondThread);
}
private void firstThread() {
Random random = new Random();
while (true) {
int newData = random.Next(1, 100);
someData.Enqueue(newData);
Console.WriteLine("Enqueued " + newData);
}
}
private void secondThread() {
Random random = new Random();
while (true) {
if (someData.Count == 0) {
continue;
}
int singleData = someData.Peek();
int someValue = random.Next(1, 100);
if (singleData > someValue || singleData == 1 || singleData == 99) {
singleData = someData.Dequeue();
Console.WriteLine("Dequeued " + singleData);
// ... processing ...
}
}
}
private readonly Queue<int> someData;
}
}
}
Second example:
public class Scenario {
public Scenario() {
someData = new ConcurrentQueue<int>();
}
public void Start() {
Task.Factory.StartNew(firstThread);
Task.Factory.StartNew(secondThread);
}
private void firstThread() {
Random random = new Random();
while (true) {
int newData = random.Next(1, 100);
someData.Enqueue(newData);
lock (syncRoot) { Console.WriteLine($"Enqued {enqued++} Dequed {dequed}"); }
}
}
private void secondThread() {
Random random = new Random();
while (true) {
if (!someData.TryPeek(out int singleData)) {
continue;
}
int someValue = random.Next(1, 100);
if (singleData > someValue || singleData == 1 || singleData == 99) {
if (!someData.TryDequeue(out singleData)) {
continue;
}
lock (syncRoot) { Console.WriteLine($"Enqued {enqued} Dequed {dequed++}"); }
// ... processing ...
}
}
}
private int enqued = 0;
private int dequed = 0;
private readonly ConcurrentQueue<int> someData;
private static readonly object syncRoot = new object();
}