21

Currently I am working on a Delphi XE3 client/server application to transfer files (with the Indy FTP components). The client part monitors a folder, gets a list of the files inside, uploads them to the server and deletes the originals. The uploading is done by a separate thread, which processes files one by one. The files can range from 0 to a few thousand and their sizes also vary a lot.

It is a Firemonkey app compiled for both OSX and Windows, so I had to use TThread instead of OmniThreadLibrary, which I preferred. My customer reports that the application randomly freezes. I could not duplicate it, but since I don't have so much experience with TThread, I might have put deadlock condition somewhere. I read quite a lot of examples, but I'm still not sure about some of the multithread specifics.

The app structure is simple:
A timer in the main thread checks the folder and gets information about each file into a record, which goes into a generic TList. This list keeps information about the names of the files, size, the progress, whether the file is completely uploaded or has to be retried. All that is displayed in a grid with progress bars, etc. This list is accessed only by the main thread. After that the items from the list are sent to the thread by calling the AddFile method (code below). The thread stores all files in a thread-safe queue like this one http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/
When the file is uploaded the uploader thread notifies the main thread with a call to Synchronize.
The main thread periodically calls the Uploader.GetProgress method to check the current file progress and display it. This function is not actually thread-safe, but could it cause a deadlock, or only wrong data returned?

What would be a safe and efficient way to do the progress check?

So is this approach OK or I have missed something? How would you do this?
For example I though of making a new thread just to read the folder contents. This means that the TList I use has to be made thread-safe, but it has to be accessed all the time to refresh the displayed info in the GUI grid. Wouldn't all the synchronization just slow down the GUI?

I have posted the simplified code below in case someone wants to look at it. If not, I would be happy to hear some opinions on what I should use in general. The main goals are to work on both OSX and Windows; to be able to display information about all the files and the progress of the current one; and to be responsive regardless of the number and size of the files.

That's the code of the uploader thread. I have removed some of it for easier reading:

type
  TFileStatus = (fsToBeQueued, fsUploaded, fsQueued);
  TFileInfo = record
    ID: Integer;
    Path: String;
    Size: Int64;
    UploadedSize: Int64;
    Status: TFileStatus;
  end;

  TUploader = class(TThread)
  private
    FTP: TIdFTP;
    fQueue: TThreadedQueue<TFileInfo>;
    fCurrentFile: TFileInfo;
    FUploading: Boolean;
    procedure ConnectFTP;
    function UploadFile(aFileInfo: TFileInfo): String;
    procedure OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
    procedure SignalComplete;
    procedure SignalError(aError: String);
  protected
    procedure Execute; override;
  public
    property Uploading: Boolean read FUploading;
    constructor Create;
    destructor Destroy; override;
    procedure Terminate;
    procedure AddFile(const aFileInfo: TFileInfo);
    function GetProgress: TFileInfo;
  end;

procedure TUploader.AddFile(const aFileInfo: TFileInfo);
begin
  fQueue.Enqueue(aFileInfo);
end;

procedure TUploader.ConnectFTP;
begin
  ...
    FTP.Connect;
end;

constructor TUploader.Create;
begin
  inherited Create(false);
  FreeOnTerminate := false;
  fQueue := TThreadedQueue<TFileInfo>.Create;
  // Create the TIdFTP and set ports and other params
  ...
end;

destructor TUploader.Destroy;
begin
  fQueue.Close;
  fQueue.Free;
  FTP.Free;
  inherited;
end;

// Process the whole queue and inform the main thread of the progress
procedure TUploader.Execute;
var
  Temp: TFileInfo;
begin
  try
    ConnectFTP;
  except
    on E: Exception do
      SignalError(E.Message);
  end;

  // Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails
  while fQueue.Peek(fCurrentFile) = wrSignaled do
    try
      if UploadFile(fCurrentFile) = '' then
      begin
        fQueue.Dequeue(Temp);  // Delete the item from the queue if succesful
        SignalComplete;
      end;
    except
      on E: Exception do
        SignalError(E.Message);
    end;
end;

// Return the current file's info to the main thread. Used to update the progress indicators
function TUploader.GetProgress: TFileInfo;
begin
  Result := fCurrentFile;
end;

// Update the uploaded size for the current file. This information is retrieved by a timer from the main thread to update the progress bar
procedure TUploader.OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
begin
  fCurrentFile.UploadedSize := AWorkCount;
end;

procedure TUploader.SignalComplete;
begin
  Synchronize(
    procedure
    begin
      frmClientMain.OnCompleteFile(fCurrentFile);
    end);
end;

procedure TUploader.SignalError(aError: String);
begin
  try
    FTP.Disconnect;
  except
  end;
  if fQueue.Closed then
    Exit;

  Synchronize(
    procedure
    begin
      frmClientMain.OnUploadError(aError);
    end);
end;

// Clear the queue and terminate the thread
procedure TUploader.Terminate;
begin
  fQueue.Close;
  inherited;
end;

function TUploader.UploadFile(aFileInfo: TFileInfo): String;
begin
  Result := 'Error';
  try
    if not FTP.Connected then
      ConnectFTP;
    FUploading := true;
    FTP.Put(aFileInfo.Path, ExtractFileName(aFileInfo.Path));     
    Result := '';
  finally
    FUploading := false;
  end;
end;

And parts of the main thread that interact with the uploader:

......
// Main form
    fUniqueID: Integer;  // This is a unique number given to each file, because there might be several with the same names(after one is uploaded and deleted)
    fUploader: TUploader;         // The uploader thread
    fFiles: TList<TFileInfo>;
    fCurrentFileName: String;     // Used to display the progress
    function IndexOfFile(aID: Integer): Integer;    //Return the index of the record inside the fFiles given the file ID
  public
    procedure OnCompleteFile(aFileInfo: TFileInfo);
    procedure OnUploadError(aError: String);
  end;

// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnUploadError(aError: String);
begin
  // show and log the error
end;

// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnCompleteFile(aFileInfo: TFileInfo);
var
  I: Integer;
begin
  I := IndexOfFile(aFileInfo.ID);
  if (I >= 0) and (I < fFiles.Count) then
  begin
    aFileInfo.Status := fsUploaded;
    aFileInfo.UploadedSize := aFileInfo.Size;
    FFiles.Items[I] := aFileInfo;
    Inc(FFilesUploaded);
    TFile.Delete(aFileInfo.Path);
    colProgressImg.UpdateCell(I);
  end;
end;

procedure TfrmClientMain.ProcessFolder;
var
  NewFiles: TStringDynArray;
  I, J: Integer;
  FileInfo: TFileInfo;
begin
    // Remove completed files from the list if it contains more than XX files
    while FFiles.Count > 1000 do
      if FFiles[0].Status = fsUploaded then
      begin
        Dec(FFilesUploaded);
        FFiles.Delete(0);
      end else
        Break;

    NewFiles := TDirectory.GetFiles(WatchFolder, '*.*',TSearchOption.soAllDirectories);
    for I := 0 to Length(NewFiles) - 1 do
    begin
          FileInfo.ID := FUniqueID;
          Inc(FUniqueID);
          FileInfo.Path := NewFiles[I];
          FileInfo.Size := GetFileSizeByName(NewFiles[I]);
          FileInfo.UploadedSize := 0;
          FileInfo.Status := fsToBeQueued;
          FFiles.Add(FileInfo);

      if (I mod 100) = 0 then
      begin
        UpdateStatusLabel;
        grFiles.RowCount := FFiles.Count;
        Application.ProcessMessages;
        if fUploader = nil then
          break;
      end;
    end;

    // Send the new files and resend failed to the uploader thread
    for I := 0 to FFiles.Count - 1 do
      if (FFiles[I].Status = fsToBeQueued) then
      begin
        if fUploader = nil then
          Break;
        FileInfo := FFiles[I];
        FileInfo.Status := fsQueued;
        FFiles[I] := FileInfo;
        SaveDebug(1, 'Add:    ' + ExtractFileName(FFiles[I].Path));
        FUploader.AddFile(FFiles[I]);
      end;
end;

procedure TfrmClientMain.tmrGUITimer(Sender: TObject);
var
  FileInfo: TFileInfo;
  I: Integer;
begin
  if (fUploader = nil) or not fUploader.Uploading then
    Exit;
  FileInfo := fUploader.GetProgress;
  I := IndexOfFile(FileInfo.ID);
  if (I >= 0) and (I < fFiles.Count) then
  begin
    fFiles.Items[I] := FileInfo;
    fCurrentFileName := ExtractFileName(FileInfo.Path);
    colProgressImg.UpdateCell(I);
  end;
end;

function TfrmClientMain.IndexOfFile(aID: Integer): Integer;
var
  I: Integer;
begin
  Result := -1;
  for I := 0 to FFiles.Count - 1 do
    if FFiles[I].ID = aID then
      Exit(I);
end;
VGeorgiev
  • 491
  • 5
  • 16
  • I'm not sure and haven't tested.. but did you try to add a TIdAntiFreeze and checked if the behaviour is the same? (FMX.IdAntiFreeze) – Whiler Dec 01 '12 at 17:37
  • 2
    TIdAntiFreeze is designed to prevent freezing of the GUI when you use an Indy component from the main thread (e.g. dropped on the form). I use it in a separate thread so I don't see how it would help. At least as far as I know... – VGeorgiev Dec 01 '12 at 17:43
  • At a first look, your error handling looks wrong to me. For example, in the Execute method, if the ConnectFTP call fails, you _eat_ the exception (after notifying about the error), and you still issue calls to UploadFile. IMHO you have to _clean_ that, and let the thread die with a FatalException or properly handle the exception inside the Execute method, for example, retrying the connection a number of times, maybe depending on the type of error. On the other hand, if you have a list in the main thread, I fail to see why you need a queue in the individual threads. – jachguate Dec 01 '12 at 19:28
  • Also why the Upload file _"function"_ returns a string? and always an empty string? mmmmm.. – jachguate Dec 01 '12 at 19:30
  • Actually there is a ConnectFTP call also in the UploadFile method, but I have missed it with the copy/paste. I edited the post to reflect it. The thread stays alive and keeps retrying. I used to return the error message from UploadFile as a string and later changed it with the exception handling. It should be boolean now, but that doesn't change the function much. – VGeorgiev Dec 01 '12 at 19:56
  • As for having a queue, it just seemed like a more flexible approach. I could probably feed the files from the list to the thread one at a time, but wouldn't it need the same synchonisation: e.g. send a file, wait for notification that is's finished, poll for progress in the meantime... – VGeorgiev Dec 01 '12 at 20:00
  • BTW fUploader.Uploading is not thread safe access, and thats not the only one – Sir Rufo Dec 01 '12 at 20:30
  • 1
    [madExcept](http://madshi.net/) can detect a frozen main thread, it then writes a stack trace of all running threads – mjn Dec 01 '12 at 21:35
  • Thanks. I actually tried madExcept already, but my client tests on a Mac and madExcept is not available there unfortunately. – VGeorgiev Dec 02 '12 at 15:17
  • I can't see the need for `fQueue.Peek()`. Just get the queue information and if something fails, put the information back into the queue again. If you are using anything older than XE2 update 4, be sure to upgrade. Severe problems with `TMonitor` before that. And use Delphis `TThreadedQueue` to see if there is a difference. – LU RD Apr 03 '13 at 15:30

3 Answers3

1

This may not be the problem, but TFileInfo is a record.

This means that when passed as a (non const/var) parameter, it gets copied. This can result in issues with things like strings in the record which don't get reference counts updated when the record is copied.

One thing to try would be to make it a class and pass an instance as the parameter (i.e. a Pointer to the data on the heap).

Something else to watch out for is shared Int64's (e.g. your size values) on threaded 32bit systems.

Updating/reading these is not done atomically & you don't have any specific protections, so it is possible for a read of the value to get mismatched upper and lower 32-bits due to threading. (e.g. Read Upper 32 bits, Write Upper 32bits, Write lower 32bits, Read Lower 32bits, with reads & write in different threads). This is probably not causing the problems you are seeing and unless you are working with files transfers of > 4GB, unlikely to ever cause you any issues.

1

Deadlocks are definitely hard to spot, but this may be the problem. In your code, I didn't see that you added any timeout to the enqueue, peek or dequeue - which means it will take the default of Infinite.

The enqueue has this line in it - meaning, like any synchronization object, it will block until either the Enter completes (it locks the monitor) or the Timeout occurs (since you don't have a timeout, it will wait forever)

TSimpleThreadedQueue.Enqueue(const Item: T; Timeout: LongWord): TWaitResult;
...    
if not TMonitor.Enter(FQueue, Timeout)

I'm also going to make the assumption that you implemented PEEK yourself based on the Dequeue - only you don't actually remove the item.

That appears to implement its own timeout - however, you still have the following:

function TSimpleThreadedQueue.Peek/Dequeue(var Item: T; Timeout: LongWord): TWaitResult;
...
if not TMonitor.Enter(FQueue, Timeout)

Where timeout is Infinite - so, if you are in the peek method waiting for it to be signaled with an infinite timeout, then you can't Enqueue something from a second thread without blocking that thread waiting for the peek method to become complete on an infinite timeout.

Here is a snippet of the comment from TMonitor

Enter locks the monitor object with an optional timeout (in ms) value. 
Enter without a timeout will wait until the lock is obtained. 
If the procedure returns it can be assumed that the lock was acquired. 
Enter with a timeout will return a boolean status indicating whether or 
not the lock was obtained (True) or the attempt timed out prior to 
acquire the lock (False). Calling Enter with an INFINITE timeout 
is the same as calling Enter without a timeout.

Since the implementation uses Infinite by default, and a TMonitor.Spinlock value is not provided, that will block the thread until it can acquire the FQueue object.

My suggestion would be to change your code as follows:

  // Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails
  while true do
    case fQueue.Peek(fCurrentFile,10) 
      wrSignaled:
        try
          if UploadFile(fCurrentFile) = '' then
          begin
            fQueue.Dequeue(Temp);  // Delete the item from the queue if succesful
            SignalComplete;
          end;
        except
          on E: Exception do
            SignalError(E.Message);
        end;
      wrTimeout: sleep(10);
      wrIOCompletion,
      wrAbandoned,
      wrError: break;
    end; //case

This way, peek won't hold the lock on FQueue indefinitely, leaving a window for the Enqueue to acquire it and add the file from the main (UI) thread.

SilverKnight
  • 182
  • 1
  • 8
  • Thanks for the detailed answer. I agree that the two TMonitor.Enter() lines could cause a deadlock. The TMonitor.Enter() in TSimpleThreadedQueue.Peek/Dequeue is followed by a TMonitor.Wait(). If I understand it correctly, the Wait releases the lock temporarily and allows the other thread to place a lock in the Enqueue method, therefore it should not cause a deadlock. The Wait then tries to place a lock again. The deadlock that I had happened very rarely, while if it was the case that you describe it would happen almost every time because the thread starts before there is any data in queue. – VGeorgiev Mar 23 '13 at 11:57
  • Hummm.. Looking at the source for TMonitor.Enter, I don't think that is the case if you don't set a SpinCount - most of the code for the spin is skipped where SpinCount=0 Where eventually you get to this line: Result := MonitorSupport.WaitOrSignalObject(nil, GetEvent, Timeout) = WAIT_OBJECT_0; – SilverKnight Mar 27 '13 at 18:56
  • I believe that is the case - however, from my reading and trying to understand what the monitor does, it spins for a specific amount of time (which is supposed to be very short) - when that becomes longer, then you have the possibility of deadlock - take a look at this Wiki article on SpinLock -- http://en.wikipedia.org/wiki/Spinlock – SilverKnight Apr 03 '13 at 19:56
1

This might be a long shot, but here is another possibility [the former answer may be more likely] (something I just ran across, but had known before): The use of Synchronize may be causing the deadlock. Here is a blog about why this happens: Delphi-Workaround-for-TThread-SynchronizeWaitFor-.aspx

The pertinent point from the article:

Thread A calls Synchronize(MethodA)

Thread B calls Synchronize(MethodB)

Then, inside the context of the Main Thread:

Main thread calls CheckSynchronize() while processing messages

CheckSynchronize is implemented to batch-process all waiting calls(*). So it picks up the queue of waiting calls (containing MethodA and MethodB) and loops through them one by one.

MethodA executes in the main thread's context. Assume MethodA calls ThreadB.WaitFor

WaitFor calls CheckSynchronize to process any waiting calls to Synchronize

In theory, this should then process ThreadB's Synchronize(MethodB), allowing Thread B to complete. However, MethodB is already a possession of the first CheckSynchronize call, so it never gets called.

DEADLOCK!

Embarcadero QC article describing the problem in more detail.

While I don't see any ProcessMessages calls in the above code, or for that matter, a WaitFor that would be called during a Synchronize, it could still be a problem that at the point a synchronize is called, another thread calls the synchronize as well - but the main thread has already synchronized and is blocking.

This didn't click with me at first, because I tend to avoid Synchronize calls like the plague and usually design UI updates from threads using other methods like message passing and thread safe lists with message notification instead of synchronize calls.

Community
  • 1
  • 1
SilverKnight
  • 182
  • 1
  • 8
  • Thanks again for getting into so much details on this. And sorry for the delayed replies, I'm traveling these days... What you described here also occurred to me and I thought Synchronize was the problem. I used it because there is no SendMessage/PostMessage on OSX or at least I don't know if there is an alternative. So Synchronize was an easy solution at that moment. Some time ago I rewrote a lot of the code and I don't have this freeze any more, but I don't know where the problem was. Could have been related to the Indy TCP components I used, because they were not very stable on the OSX... – VGeorgiev Apr 03 '13 at 09:25
  • Not a problem. I was looking into something else and came across this post that didn't have an answer. I used it as a learning exercise to see what the monitor class would do (I've never used it). I'm always interested in different techniques that could improve my threaded code (mostly in lowering CPU usage, but also in different communication methods). It has been an interesting dive into the class, and hopefully someone else will benefit from the discussion as well. – SilverKnight Apr 03 '13 at 20:04