4

Developing a c# application that will be running a series of tasks based on files the user uploads; this can take from a few seconds to a few days. I plan to implement some kind of logging system to resume work if the process gets interrupted. What is this log-for-interrupt/resume called and where can I learn more about existing implementations?

edit:

Useful things I've found so far:

A journalled file system would use the same basic procedure, with a few additional steps. Something like:

  • Journal entry: Moving file from A to B
    • Physically copy old file to new location
    • Update directory entry on new drive
    • Remove directory entry from old drive
    • Free space on old drive
  • Journal entry: Done moving file from A to B

from https://serverfault.com/questions/173176/what-is-a-journaling-file-system


Checkpointing: The process of writing journaled metadata and data to their fixed-locations is known as checkpointing. Checkpointing is triggered when various thresholds are crossed, e.g., when file system buffer space is low, when there is little free space left in the journal, or when a timer expires.

Crash Recovery: Crash recovery is straightforward in ext3 (as it is in many journaling file systems); a basic form of redo logging is used. Because new updates (whether to data or just metadata) are written to the log, the process of restoring in-place file system structures is easy. During recovery, the file system scans the log for committed complete transactions; incomplete transactions are discarded. Each update in a completed transaction is simply replayed into the fixed-place ext2 structures.

from http://research.cs.wisc.edu/adsl/Publications/sba-usenix05.pdf (page 5)


edit 2:

I don't feel like I know much more about fault tolerance than when I first posted this question, but here's an outline of what I implemented.

Main first attempts for the job manager to load any existing saved state from a file, or creates a new empty manager.

public static void Main(string[] args)
{
    try
    {
            _jxman = JobManager<Job>.Load(Properties.Settings.Default.JobJournalFilename);
    }
    catch
    {
            _jxman = new JobManager<Job>(Properties.Settings.Default.JobJournalFilename);
    }
    ...
    _jxman.Start();
    ...
}

And the JobManager class looks like this

public sealed class JobManager<T> : WorkGroupBase<JobBase>, IWorkSerializable where T : IJob
{
        #region Fields

        /// <summary>
        /// Hash goes here in file
        /// </summary>
        private const string _hashHeading = "SHA-256";

        /// <summary>
        /// Flag to know whether to update the journal
        /// </summary>
        private bool _isDirty = false;

        /// <summary>
        /// Last time the journal was written to disk
        /// </summary>
        private DateTime _lastSaveTime = DateTime.MinValue;

        /// <summary>
        /// Minimum time to wait before writing journal to disk again
        /// </summary>
        private TimeSpan _minTimeToSave = new TimeSpan(0,0,60);

        /// <summary>
        /// Threading object for lock
        /// </summary>
        private object _lock = new object();

        /// <summary>
        /// Thread to monitor status
        /// </summary>
        private Thread _watchDirtyFlag;

        #endregion

        #region Properties

        /// <summary>
        /// journal file to track changes
        /// </summary>
        public string Filename
        {
                get;
                private set;
        }

        #endregion

        #region Constructors

        /// <summary>
        /// default constructor
        /// </summary>
        /// <param name="filename">Path to filename to write journal file</param>
        public JobManager(string filename) : base()
        {
                ConstructorHelper();

                Filename = filename;
        }

        /// <summary>
        /// Parses XML element to recreate the item
        /// </summary>
        /// <param name="xe">XML element used to create object</param>
        public JobManager(XElement xe)
                : base(xe)
        {
                // Checksum validation before doing anything else.
                // Will throw exception on failure.
                ValidateChecksum(xe);

                ConstructorHelper();

                string myName = "JobManager";

                XElement myself;
                try
                {
                        myself = xe.DescendantsAndSelf(myName).First();
                }
                catch
                {
                        throw new ArgumentException("Attempting to instantiate object, but no relevant information was found in the XML element");
                }

                Filename = myself.FirstElementValue("Filename");

                // Load up all the jobs
                XElement[] wq = myself.Descendants("WorkQueue").Elements().ToArray();

                foreach (XElement x in wq)
                {
                        try
                        {
                                IJob blarg = (IJob)Activator.CreateInstance(typeof(T), x);
                                if (blarg != null)
                                        WorkQueue.Enqueue((JobBase)blarg);
                        }
                        catch
                        { }
                }
        }

        /// <summary>
        /// Helper for common constructing 
        /// </summary>
        private void ConstructorHelper()
        {
                // need to wait for the base constructor to finish before attempting to
                // hook events there
                base.QueueChanged += new EventHandler(JobManager_QueueChanged);
                base.HookQueueChangedEvents();

                _watchDirtyFlag = new Thread(WatchDirtyFlag);
                _watchDirtyFlag.Start();
        }

        #endregion

        #region Methods

        /// <summary>
        /// Saves the state of the JobManager to Filename using XML
        /// </summary>
        public void Save()
        {
                TextWriter writer = null;
                try
                {
                        writer = new StreamWriter(Filename);
                        writer.Write(this.ToXElement());
                }
                catch (Exception ex)
                {
                        throw ex;
                }
                finally
                {
                        writer.Close();
                }
        }

        /// <summary>
        /// Loads the filename and attempts to parse it as XML to 
        /// create a JobManager. Pass the type of job to manage.
        /// </summary>
        /// <param name="filename">File storing the JobManager as XML</param>
        /// <returns>JobManager with values loaded from file</returns>
        public static JobManager<T> Load(string filename)
        {
                if (filename == "")
                        throw new ArgumentException("Can not load JobManager: Filename not set");

                TextReader reader = null;
                string text;
                try
                {
                        reader = new StreamReader(filename);
                        text = reader.ReadToEnd();
                }
                catch (Exception ex)
                {
                        throw ex;
                }
                finally
                {
                        reader.Close();
                }

                XElement loadFrom = null;
                try
                {
                        loadFrom = XElement.Parse(text);
                }
                catch //(Exception ex)
                {
                        //throw ex;
                        loadFrom = new XElement("empty");
                }

                JobManager<T> output = new JobManager<T>(loadFrom);
                output.Filename = filename;
                return output;
        }

        /// <summary>
        /// Converts the item to an XML element
        /// </summary>
        /// <returns></returns>
        new public XElement ToXElement()
        {
                XElement bxe = base.ToXElement();

                //string myName = this.GetType().Name;
                string myName = "JobManager";

                XElement wq = new XElement("WorkQueue");
                foreach (IWorkSerializable t in WorkQueue.ToArray())
                {
                        XElement addee = t.ToXElement();

                        wq.Add(addee);
                }

                bxe.Add(wq);

                XElement xe = new XElement(myName,
                        bxe,
                        new XElement("Filename", Filename)
                        );

                xe.Add(
                        new XElement(_hashHeading, Generic.ComputeSha256Hash(xe.ToString()))
                        );

                return xe;
        }

        /// <summary>
        /// Validates the checksum for the current xelement. Throws exceptions on failure
        /// </summary>
        /// <param name="xe">XML tree of the itme to validate</param>
        private void ValidateChecksum(XElement xe)
        {
                XElement checksum;
                try
                {
                        checksum = xe.DescendantsAndSelf(_hashHeading).First();
                }
                catch (Exception ex)
                {
                        throw new Exception("Unable to find checksum node", ex);
                }

                XElement withoutChecksum = new XElement(xe);
                withoutChecksum.Elements(_hashHeading).Remove();

                string computedChecksum = Generic.ComputeSha256Hash(withoutChecksum.ToString());

                if (computedChecksum != checksum.Value)
                        throw new Exception("Checksum from XML element and checksum from contents of XML element do not match: \n" + xe.Value);


        }

        /// <summary>
        /// This thread will watch the dirty flag, which is set everytime the 
        /// queues are changed. Every _minTimeToSave the flag is checked, and
        /// if the flag is set, Save() is called.
        /// </summary>
        private void WatchDirtyFlag()
        {
                while (true)
                {
                        // sleep until there's something to update
                        while (_isDirty == false)
                        {
                                Thread.Sleep(_minTimeToSave);
                        }

                        // but don't update too frequently
                        if (DateTime.Now.Subtract(_lastSaveTime) > _minTimeToSave)
                        {
                                // save first ...
                                this.Save();
                                // then update items ...
                                _lastSaveTime = DateTime.Now;
                                lock (_lock)
                                {
                                        _isDirty = false;
                                }
                        }
                }
        }

        #endregion

        #region Event Handlers

        /// <summary>
        /// updates flag when any underlying queue changes
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        void JobManager_QueueChanged(object sender, EventArgs e)
        {
                lock (_lock)
                {
                        _isDirty = true;
                }
        }

        #endregion
}

Things to note:

  • This is incomplete code, in case anyone attempts to copy this (missing base class and things)
  • Regular (binary) and XML serialization never quite worked correctly, so I implemented custom serialization which saves the object as XML. This is the ToXElement() method, and the constructor which takes an XElement argument.
  • There's a checksum (SHA-256) included at the top level (JobManager) of serialization. When a new object is instantiated from an XElement, the checksum for the saved serialization object is compared to the checksum in the file.
  • There's a static method .Load(file) which returns a new JobManager object by reading a file and attempting to deserialize the contents.
  • Not shown here is the custom ConcurrentQueue class. This is a wrapper for the MSDN ConcurrentQueue class, but with an added event to notify when the queue changes.
  • This JobManager class implements a base class with the aforementioned ConcurrentQueue; those queue change events are hooked in the constructor helper
  • When the event fires, the JobManager sets a flag _isDirty
  • The JobManager starts a thread on instantiation, which monitors the _isDirty flag. Most of the time is spent sleeping, but if at least _minTimeToSave has elapsed, the contents of the JobManager are serialized to disk. This should keep the JobManager from writing to disk too frequently.

Noted concerns not addressed:

  • Are threads really the correct solution to watch the _isDirty flag?
  • The JobManager (a single thread) manages Jobs (one at a time, but a different thread) which contain Tasks; there is no class-to-base-class synchronization to lock states while serializing
  • Old completed jobs are serialized to disk and then reloaded
Community
  • 1
  • 1
BurnsBA
  • 4,347
  • 27
  • 39

1 Answers1

1

I am not sure about any existing system for this purpose but Serialization is the key with which you can achieve this sort implementation. You just need to design your objects to support Serialization.

  • In case of task interrupt, you can save or serialize the state of the object in any format (Binary or XML) on the filesystem.

  • In order to resume the task, you just need to deserialize the objects and you are back in business.

To understand more about Serialization use following references:

  1. What is [Serializable] and when should I use it?
  2. .NET Serialization (using BinaryFormater, SoapFormatter and XmlSerializer)
Community
  • 1
  • 1
Furqan Safdar
  • 16,260
  • 13
  • 59
  • 93