0

I need to traverse a collection of disjoint folders; each folder is associated to a visited time configurated somewhere in the folder.

I then sort the folders, and process the one with the earliest visited time first. Note the processing is generally slower than the traversing.

My code targets Framework4.8.1; Currently my implementation is as follows:

    public class BySeparateThread
    {
        ConcurrentDictionary<string, DateTime?> _dict = new ConcurrentDictionary<string, DateTime?>();
        private object _lock;

        /// <summary>
        /// this will be called by producer thread;
        /// </summary>
        /// <param name="address"></param>
        /// <param name="time"></param>
        public void add(string address,DateTime? time) {
            _dict.TryAdd(address, time);
            
        }

        /// <summary>
        /// called by subscriber thread;
        /// </summary>
        /// <returns></returns>
        public string? next() {
            lock (_lock) {
                var r = _dict.FirstOrDefault();
                
                //return sortedList.FirstOrDefault().Value;


                if (r.Key is null)
                {
                    return r.Key;
                }

                if (r.Value is null)
                {
                    _dict.TryRemove(r.Key, out var _);
                    return r.Key;
                }

                var key = r.Key;

                foreach (var item in _dict.Skip(1) )
                {
                    if (item.Value is null)
                    {
                        _dict.TryRemove(item.Key, out var _);
                        return item.Key;
                    }
                    if (item.Value< r.Value)
                    {
                        r=item;
                    }
                }
                _dict.TryRemove(key, out var _);

                return key;
            }
        }

        /// <summary>
        /// this will be assigned of false by producer thread;
        /// </summary>
        public bool _notComplete = true;

        /// <summary>
        /// shared configuration for subscribers;
        /// </summary>
        fs.addresses_.disjoint.deV_._bak.Io io; //.io_._CfgX.Create(cancel, git)

        /// <summary>
        /// run this in a separate thread other than <see cref="add(string, DateTime?)"/>
        /// </summary>
        /// <param name="sln"></param>
        /// <returns></returns>
        public async Task _asyn_ofAddress(string sln)
        {
            while (_notComplete)
            {
                var f = next();
                if (f is null )
                {
                    await Task.Delay(30*1000);
                    //await Task.Yield();
                    continue;
                }
                /// degree of concurrency is controlled by a semophore; for instance, at most 4 are tackled:
                new dev.srcs.each.sln_.delvable.Bak_srcsInAddresses(io)._startTask_ofAddress(sln);
            }
        }
    }

For the above, I'm concerned about the while(_notComplete) part, as it looks like there would be many loops doing nothing there. I think there should be better ways to remove the while by utilizing the fact that the collection can notify whether it's empty or not at some/various stages such as when we add.

There would be better implementation which can be based on some mature framework such as those being considered by me these days but I often stopped wondering at some implementation details:

  1. BlockingCollection for this one, I don't know how to make the collection added and sorted dynamically while producer and subscriber are on the run;
  2. Channel Again, I could not come up with one fitting my need after I read its examples;
  3. Pipeline I havenot fully understood it;
  4. Rx I tried to implement an observable and an observer. It only gives me a macroscope framework, but when I get into the details, I ended with what I'm currently doing and I begin to wonder: with what I'm doing, I don't need Rx here.
  5. Dataflow Shall I implement my own BufferBlock or ActionBlock? It seems the built-in bufferBlock cannot be customized to sort things before releasing them to the next block.

Sorting buffered Observables seems similar to my problem; but it ends with a solution similar to the one I currently have but am not satisfied with, as stated in the above.

Could some one give me a sample code? Please give as concrete code as you can; As you can see, I have researched some general ideas/paths and finally what stops me short is the details, which are often glossed over in some docs.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
NilNul
  • 19
  • 5
  • 2
    Is this useful? [Concurrent collection with priority](https://stackoverflow.com/questions/23470196/concurrent-collection-with-priority) – Theodor Zoulias Dec 15 '22 at 08:19
  • I guess you won't know if new visits are making their way into the buffer to update your stalest folder, would that trigger a cancellation? – Jodrell Dec 15 '22 at 08:26
  • @jodrell I didnot fully grasp your question. I have some cancellation mechanism (not shown here, as that's not the main problem here) in place: the traversing can be cancelled and assign the `_notComplete` to false, effectively ending the subscriber from proceeding; – NilNul Dec 15 '22 at 08:32
  • 1
    @Theodor Zoulias I now find https://stackoverflow.com/questions/14941027/prioritized-queues-in-task-parallel-library is very helpful – NilNul Dec 15 '22 at 10:19
  • You describe your solution as 1) traverse, 2) sort, 3) process. It seems to me that a simple `List` would suffice; not sure why you need an ordered concurrent collection here. – Stephen Cleary Dec 15 '22 at 10:37
  • @StephenCleary, the traversing would be faster than processing. And I need processing to run even when the traversal hasnot finished; – NilNul Dec 15 '22 at 11:09
  • Your question is currently not very "appetizing", because you have devoted too little space for describing the problem, and too much space for describing the candidate solutions. People generally don't enjoy reverse-engineering solutions in order to understand the problem. I would suggest to edit the question and describe in more details what you are trying to do. – Theodor Zoulias Dec 15 '22 at 13:13
  • @TheodorZoulias, the question is really simple. I just cannot find a good solution founded solid on some framework as I hoped, and that's partially why I put some second to good implementation here. – NilNul Dec 15 '22 at 13:35
  • NilNul we can't help you find a good solution to your problem, if we don't have a clear view of what the problem is. If you don't expand the question with a more detailed description of your problem, I don't expect that you'll get any quality answer. I hope that you'll do, but I find it unlikely. – Theodor Zoulias Dec 15 '22 at 14:02
  • @TheodorZoulias, Thank you; I currently settled with my own answer, which seems to work for the moment as I plugged that into one of my projects. It's not very ideal though. I believe there is some simple and better implementation; – NilNul Dec 15 '22 at 15:32

1 Answers1

0

I just found one solution which is better than my current one. I believe there are some even better ones, so please do post your answers if you find some; my current one is just what I can hack for what I know so far.

I found Prioritized queues in Task Parallel Library, and I write a similar one for my case:

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;

namespace nilnul.dev.srcs.every.slns._bak
{

    public class BySortedSet : IProducerConsumerCollection<(string, DateTime)>
    {
        private class _Comparer : IComparer<(string, DateTime)>
        {
            public int Compare((string, DateTime) first, (string, DateTime) second)
            {
                var returnValue = first.Item2.CompareTo(second.Item2);
                if (returnValue == 0)
                    returnValue = first.Item1.CompareTo(second.Item1);
                return returnValue;
            }
            static public _Comparer Singleton
            {
                get
                {
                    return nilnul._obj.typ_.nilable_.unprimable_.Singleton<_Comparer>.Instance;// just some magic to get an instance
                }
            }
        }

        SortedSet<(string, DateTime)> _dict = new SortedSet<(string, DateTime)>(
            _Comparer.Singleton
        );

        private object _lock=new object();

        public int Count
        {
            get
            {
                lock(_lock){
                    return _dict.Count;
                }
            }
        }

        public object SyncRoot => _lock;

        public bool IsSynchronized => true;

        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
            //throw new NotImplementedException();
        }

        public void CopyTo((string, DateTime)[] array, int index)
        {
            lock (_lock)
            {
                foreach (var item in _dict)
                {
                    array[index++] = item;
                }
            }

        }
        public void CopyTo(Array array, int index)
        {
            lock (_lock)
            {
                foreach (var item in _dict)
                {
                    array.SetValue(item, index++);
                }
            }

        }

        public bool TryAdd((string, DateTime) item)
        {
            lock (_lock)
            {
                return _dict.Add(item);
            }
        }

        public bool TryTake(out (string, DateTime) item)
        {
            lock (_lock)
            {
                item = _dict.Min;
                if (item==default)
                {
                    return false;
                }
                return _dict.Remove(item);
            }
        }

        public (string, DateTime)[] ToArray()
        {
            lock (_lock)
            {
                return this._dict.ToArray();

            }
        }

        public IEnumerator<(string, DateTime)> GetEnumerator()
        {
            return ToArray().AsEnumerable().GetEnumerator();
        }

        /// <summary>
        /// </summary>
        /// <returns></returns>
        public BlockingCollection<(string, DateTime)> asBlockingCollection() {
            return new BlockingCollection<(string, DateTime)>(
                this
            );
        }
    }
}

Then I can use that like:

        static public void ExampleUse(CancellationToken cancellationToken) {
            var s = new BySortedSet().asBlockingCollection();

            /// traversal thread:
            s.Add(("", DateTime.MinValue));

            //...
            s.CompleteAdding();

            /// tackler thread:
            ///
            foreach (var item in s.GetConsumingEnumerable(cancellationToken))
            {
                ///  process the item;
                /// todo: degree of parallelism is controlled by the tackler, or is there a better way like in dataflow or Rx or sth else?
            }
        }

Thanks!

NilNul
  • 19
  • 5
  • @TheodorZoulias, as the tags (such as dataflow) to the question implied, and as the problem itself involves more than one thread( traversing and consuming are at least two), the degree of concurrency is generally an option for consumer logic, I think separating it into another thread would render the context lost and pose as not a good candidate for an independent question; that's why I put that into my own answer. Now I have moved that into comment though as it looks somewhat like a followup, thanks to your suggestion. – NilNul Dec 15 '22 at 13:30
  • (As noted in the comment, I plan to control the degree of parallelism in the tackler(subscriber/consumer). Or is there some intrinsic supporting by the BlockingCollection itself, or any other framework such as dataflow or Rx which can take in my custom BlockingCollection and consume subsequently in parallelism? Please give some detailed code when possible.) – NilNul Dec 15 '22 at 13:31