1

I'm new to the world of Reactive Extensions and I'm still trying to learn.

I'm developing an app with a datagrid that displays certain running Windows processes and their memory usage. The memory usage of each process should be updated frequently i.e. every 200ms.

Requirements

  • When checkbox is checked
    • the datagrid should be filled with processes and the memory usage is updated be using a timer with an interval of 200 ms.
    • monitor (all should be done on a background thread)

-- if a process has exited, it should be removed from the source.

-- if a process starts, it should be added to the source

-- a file for changes

  • When checkbox is unchecked
    • all the monitor activity should be stopped
    • the datagrid is cleared

Any help would be greatly appreciated! Notes:

  • In the past I tried several approaches like using a ObservableConcurrentDictionary as a resource and a timer for periodically updating the resource, but I ran into troubles (concurrency, locking, etc), so I would like to have a solution based on Rx/ReactiveUI
  • Do to technical limitation I can use only .NET Framework 4.0, Reactive-core.Net40

Update

ViewModel

private ReactiveList<IProcessModel> _processes = new ReactiveList<IProcessModel>() { ChangeTrackingEnabled = true };
public ReactiveList<IProcessModel> Processes { get { return _processes; } }

public MainViewModel(IMonitorService monitorService)
{
   this.WhenAnyValue(vm => vm.ShowProcessesIsChecked).Subscribe((b) => DoShowProcesses(b));
}


private void DoShowProcesses(bool checkboxChecked)
{
    IDisposable timer;
    Processes.Clear();
    if (checkboxChecked)
    {
        //checkbox checked
        lock (Processes)
            Processes.AddRange(_monitorService.GetProcesses());
        timer = Observable.Timer(TimeSpan.FromMilliseconds(200.0))
            .Select(x =>
        {
            lock (Processes)
            {
                foreach (var process in Processes) //throws the 'Collection was modified; enumeration operation may not execute.'
                    process.UpdateMemory(); 

                return Processes.Where(p => p.ProcessObject.HasExited).ToList();
            }
        }).
        ObserveOnDispatcher()
        .Subscribe(processesExited =>
        {
            if (processesExited.Count() > 0)
            {
                lock (Processes)
                    Processes.RemoveAll(processesExited); //remove all processes that have exited
            }

        });
    }
    else
    {
        if (timer != null)
            timer.Dispose();
    }
}

I started a new thread

Original

ViewModel

public class MainViewModel : ReactiveObject
{
    public ReactiveList<IProcessModel> Processes { get; private set; }
    IMonitorService _monitorService;

    public MainViewModel(IMonitorService monitorService)
    {
        _monitorService = monitorService;

        Processes = new ReactiveList<IProcessModel>() { ChangeTrackingEnabled = true };
        this.WhenAnyValue(vm => vm.ShowProcessesIsChecked)
            .Where(value => value == true) //checkbox checked
            .ObserveOn(Scheduler.Default) //raise notifications on thread-pool thread to keep UI responsive
            .Select((isChecked) =>
            {
                return monitorService.GetProcesses();
            })
            .ObserveOn(SynchronizationContext.Current)
            .Subscribe(processes => {
                Processes.AddRange(processes); }
            );
        //start the MonitorService with MonitorService.Start(Processes)
        //start a timer with an interval of 200ms --> at interval
        //- do UpdateMemory() foreach IProcessModel in Processes
        //- if ProcessObject.HasExited --> remove it from the collection source
        ;
        this.WhenAnyValue(vm => vm.ShowProcessesIsChecked)
            .Where(value => value == false) //checkbox unchecked
            .Subscribe((isChecked) =>
            {
                monitorService.Stop(); //this stops monitoring for starting processes and clears the Processes
            });
    }

    private bool _showProcessesIsChecked;

    public bool ShowProcessesIsChecked
    {
        get { return _showProcessesIsChecked; }
        set { this.RaiseAndSetIfChanged(ref _showProcessesIsChecked, value); }
    }
}

Model

public class ProcessModel : ProcessModelBase, IProcessModel
{

    public ProcessModel(Process process)
    {
        ProcessObject = process;
    }      

    public void UpdateMemory()
    {
        try
        {
            if (!ProcessObject.HasExited)
            {
                long mem = ProcessObject.PagedMemorySize64;
                ProcessObject.Refresh();
                if (mem != ProcessObject.PagedMemorySize64)
                    OnPropertyChanged(nameof(ProcessObject));
            }
        }
        catch (Exception)
        {
            //log it
        }
    }
}

Service

public class MonitorService : IMonitorService
{
    ManagementEventWatcher managementEventWatcher;
    ReactiveList<IProcessModel> _processes;

    public List<IProcessModel> GetProcesses()
    {
        List<IProcessModel> processes = new List<IProcessModel>();

        foreach (var process in Process.GetProcesses().Where(p => p.ProcessName.Contains("chrome")))
            processes.Add(new ProcessModel(process));
        return processes;
    }

    /// <summary>
    /// Starts the manager. Monitor a starting process and changes in log file
    /// </summary>
    /// <param name="processes"></param>
    public void Start(ReactiveList<IProcessModel> processes)
    {
        _processes = processes;

        var qStart = "SELECT * FROM Win32_ProcessStartTrace WHERE ProcessName like 'chrome'";
        managementEventWatcher = new ManagementEventWatcher(new WqlEventQuery(qStart));
        managementEventWatcher.EventArrived += new EventArrivedEventHandler(OnProcessStarted);
        try
        {
            managementEventWatcher.Start();
        }
        catch (Exception)
        {
            //log it
        }
        Task.Factory.StartNew(() => MonitorLogFile());
    }


    public void Stop()
    {
        if (managementEventWatcher != null)
            managementEventWatcher.Stop();
        if (_processes != null)
            _processes.Clear();
    }

    private void MonitorLogFile()
    {
        //this code monitors a log file for changes. It is possible that the IsChecked property of a ProcessModel object is set in the Processes collection
    }


    private void OnProcessStarted(object sender, EventArrivedEventArgs e)
    {

        try
        {
            Process process = Process.GetProcessById(Convert.ToInt32(e.NewEvent.Properties["ProcessID"].Value));
            _processes.Add(new ProcessModel(process));
        }
        catch (ArgumentException)
        {
            //log it
        }
        catch (InvalidOperationException)
        {
            //log it
        }

    }
}

XAML

<CheckBox Content='Show Processes' IsChecked='{Binding ShowProcessesIsChecked}' />
<DataGrid  ItemsSource="{Binding Processes}">
    <DataGrid.Resources>
      <DataGridTemplateColumn Header='Process'
                              x:Key='dgProcessName'
                              IsReadOnly='True'
                              x:Shared='False'>
        <DataGridTemplateColumn.CellTemplate>
            <DataTemplate>
                <StackPanel Orientation='Horizontal' VerticalAlignment='Center'>
                    <CheckBox IsChecked='{Binding IsChecked, Mode=TwoWay, UpdateSourceTrigger=PropertyChanged}' HorizontalAlignment='Stretch' VerticalAlignment='Stretch'> </CheckBox>
                    <TextBlock Text='{Binding ProcessObject.ProcessName}' />
                </StackPanel>
            </DataTemplate>
        </DataGridTemplateColumn.CellTemplate>
          </DataGridTemplateColumn>
          <DataGridTextColumn Header="PID"
                              Binding="{Binding ProcessObject.Id}"
                              IsReadOnly='True'
                              x:Key='dgPID'
                              x:Shared='False' />
          <DataGridTextColumn Header="Commit Size"
                              Binding='{Binding ProcessObject.PagedMemorySize64}'
                              IsReadOnly='True'
                              x:Key='dgCommitSize'
                              x:Shared='False' />
    </DataGrid.Resources>
</DataGrid>
BertAR
  • 425
  • 3
  • 18
  • You really shouldn't be locking on a public variable. That's generally considered bad. – Enigmativity May 04 '18 at 00:12
  • Also, why are you locking? Using Rx can usually eliminate locking, but if you have to then there are some awesome collections that do the locking for you. – Enigmativity May 04 '18 at 00:13

2 Answers2

0

You basically want to use this kind of pattern:

IObservable<bool> checkBoxChecked = /* your checkbox observable here */
IObservable<long> timer = Observable.Interval(TimeSpan.FromMilliseconds(200.0));

IObservable<long> query =
    checkBoxChecked
        .Select(x => x ? timer : Observable.Never<long>().StartWith(-1L))
        .Switch();

IDisposable subscription =
    query
        .Subscribe(n =>
        {
            if (n == -1L)
            {
                // Clear UI
            }
            else
            {
                // Update UI
            }
        });

This switches between running and not running based on the value of the checkbox.

You need to make sure that you are observing on the UI thread, but other than some minor tweaking then this should work fine.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • Thank you! I've updated my code (see "Update"). Unfortunately I still have issues and questions. Can you have look please? Question e.g. does the timer wait until everything is completed before firing again – BertAR May 03 '18 at 14:19
0

I wanted to practice my Rx skills anyways, so I went ahead and created a WPF project and gave it a shot. I got it working so I'll share how I went about it.

  • Remove the process list from MonitorService. This will help isolate the source of list modifications, making our debugging life easier. It also narrows MonitorService's responsibilities down to providing the initial list and emitting changes.

  • We're already "being reactive" so we may as well turn the EventArrived event into an observable, too, using FromEventPattern. Then we can transform those fired events into ProcessModels and push them out to the subscriber(s).

  • I moved the ManagementEventWatcher creation to the constructor, so we don't have to recreate it every time the checkbox is checked. Now the Start/Stop methods are now simply wrappers for the _managementEventWatcher versions.

.

public class MonitorService
{
    ManagementEventWatcher _managementEventWatcher;

    public IObservable<ProcessModel> NewProcessObservable { get; }

    public MonitorService()
    {
        var qStart = "SELECT * FROM Win32_ProcessStartTrace where ProcessName='chrome.exe'";
        _managementEventWatcher = new ManagementEventWatcher(new WqlEventQuery(qStart));
        var eventArrivedObservable = Observable.FromEventPattern<EventArrivedEventHandler, EventArrivedEventArgs>(
            x => _managementEventWatcher.EventArrived += x,
            x => _managementEventWatcher.EventArrived -= x);

        NewProcessObservable = eventArrivedObservable
            .Select(x => GetProcessModel(x.EventArgs))
            .Where(x => x != null);
    }

    public List<ProcessModel> GetProcesses()
    {
        List<ProcessModel> processes = new List<ProcessModel>();

        foreach(var process in Process.GetProcesses().Where(p => p.ProcessName.Contains("chrome")))
            processes.Add(new ProcessModel(process));
        return processes;
    }

    public void Start()
    {
        try
        {
            _managementEventWatcher.Start();
        }
        catch(Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }

    public void Stop()
    {
        if(_managementEventWatcher != null)
            _managementEventWatcher.Stop();
    }

    private ProcessModel GetProcessModel(EventArrivedEventArgs e)
    {
        ProcessModel model = null;
        try
        {
            Process process = Process.GetProcessById(Convert.ToInt32(e.NewEvent.Properties["ProcessID"].Value));
            model = new ProcessModel(process);
        }
        catch(ArgumentException)
        {
            //log it
        }
        catch(InvalidOperationException)
        {
            //log it
        }

        return model;
    }
}

MainViewModel

  • To avoid errors such as 'Collection was modified; enumeration operation may not execute' a simple solution is to iterate backward using a for loop.

  • Immediately preceding each subscribe call, use .ObserveOnDispatcher() so the onNext calls will execute on the UI thread. The cool part is that the only place we're modifying the list, now, is inside subscribe. So, we don't have to use locks and all that.

  • I divided the logic into 3 different subscriptions: Start/stop the monitoring, Update the memory usage and remove processes that have exited (used @Enigmativity's suggested pattern), and add newly started processes to our reactive list. Hopefully, this makes it easy to follow the logic.

.

public class MainViewModel : ReactiveObject
{
    public ReactiveList<ProcessModel> Processes { get; private set; }
    MonitorService _monitorService;

    public MainViewModel(MonitorService monitorService)
    {
        _monitorService = monitorService;

        Processes = new ReactiveList<ProcessModel>() { ChangeTrackingEnabled = true };
        RxApp.SupportsRangeNotifications = false;

        IObservable<bool> checkboxObservable = this.WhenAnyValue(vm => vm.ShowProcessesIsChecked);
        IObservable<long> intervalObservable = Observable.Interval(TimeSpan.FromMilliseconds(200.0));

        // Start/stop the monitoring.
        checkboxObservable
            // Skip the default unchecked state.
            .Skip(1)
            .ObserveOnDispatcher()
            .Subscribe(
                isChecked =>
                {
                    if(isChecked)
                    {
                        Processes.AddRange(monitorService.GetProcesses());
                        monitorService.Start();
                    }
                    else
                    {
                        Processes.Clear();
                        monitorService.Stop();
                    }
                });

        // Update the memory usage and remove processes that have exited.
        checkboxObservable
            .Select(isChecked => isChecked ? intervalObservable : Observable.Never<long>())
            // Switch disposes of the previous internal observable
            // (either intervalObservable or Never) and "switches" to the new one.
            .Switch()
            .ObserveOnDispatcher()
            .Subscribe(
                _ =>
                {
                    // Loop backwards in a normal for-loop to avoid the modification errors.
                    for(int i = Processes.Count - 1; i >= 0; --i)
                    {
                        if(Processes[i].ProcessObject.HasExited)
                        {
                            Processes.RemoveAt(i);
                        }
                        else
                        {
                            Processes[i].UpdateMemory();
                        }
                    }
                });

        // Add newly started processes to our reactive list.
        monitorService.NewProcessObservable
            .Where(_ => ShowProcessesIsChecked)
            .ObserveOnDispatcher()
            .Subscribe(process => Processes.Add(process));
    }

    private bool _showProcessesIsChecked;

    public bool ShowProcessesIsChecked
    {
        get { return _showProcessesIsChecked; }
        set { this.RaiseAndSetIfChanged(ref _showProcessesIsChecked, value); }
    }
}

More info If this is the only page/window in you application, then this is enough. If not, you have a little more work to do, to avoid memory leaks. In the latter case, I recommend Google searching "ReactiveUI WhenActivated" (and maybe even throw wpf in there). You'll find lots of examples that you can use and learn from.

Colt Bauman
  • 564
  • 3
  • 9
  • Appreciate your effort Colt Bauman, I'll have a closer look soon. I'll keep you up-to-date! Many thanks. – BertAR May 04 '18 at 13:57