3

I am running hangfire in a single web application, my application is being run on 2 physical servers but hangfire is in 1 database.

At the moment, i am generating a server for each queue, because each queue i need to run 1 worker at a time and they must be in order. I set them up like this

// core
services.AddHangfire(options =>
{
    options.SetDataCompatibilityLevel(CompatibilityLevel.Version_170);
    options.UseSimpleAssemblyNameTypeSerializer();
    options.UseRecommendedSerializerSettings();
    options.UseSqlServerStorage(appSettings.Data.DefaultConnection.ConnectionString, storageOptions);
});

// add multiple servers, this way we get to control how many workers are in each queue
services.AddHangfireServer(options =>
{
    options.ServerName = "workflow-queue";
    options.WorkerCount = 1;
    options.Queues = new string[] { "workflow-queue" };
    options.SchedulePollingInterval = TimeSpan.FromSeconds(10);
});

services.AddHangfireServer(options =>
{
    options.ServerName = "alert-schedule";
    options.WorkerCount = 1;
    options.Queues = new string[] { "alert-schedule" };
    options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});

services.AddHangfireServer(options =>
{
    options.ServerName = string.Format("trigger-schedule");
    options.WorkerCount = 1;
    options.Queues = new string[] { "trigger-schedule" };
    options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});

services.AddHangfireServer(options =>
{
    options.ServerName = "report-schedule";
    options.WorkerCount = 1;
    options.Queues = new string[] { "report-schedule" };
    options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});

services.AddHangfireServer(options =>
{
    options.ServerName = "maintenance";
    options.WorkerCount = 5;
    options.Queues = new string[] { "maintenance" };
    options.SchedulePollingInterval = TimeSpan.FromMinutes(10);
});

My problem is that it is generating multiple queues on the servers, with different ports. enter image description here

In my code i am then trying to stop jobs from running if they are queued/retrying, but if the job is being run on a different physical server, it is not found and queued again.

Here is the code to check if its running already

public async Task<bool> IsAlreadyQueuedAsync(PerformContext context)
{
    var disableJob = false;
    var monitoringApi = JobStorage.Current.GetMonitoringApi();

    // get the jobId, method and queue using performContext
    var jobId = context.BackgroundJob.Id;
    var methodInfo = context.BackgroundJob.Job.Method;
    var queueAttribute = (QueueAttribute)Attribute.GetCustomAttribute(context.BackgroundJob.Job.Method, typeof(QueueAttribute));
    
    // enqueuedJobs
    var enqueuedjobStatesToCheck = new[] { "Processing" };
    var enqueuedJobs = monitoringApi.EnqueuedJobs(queueAttribute.Queue, 0, 1000);
    var enqueuedJobsAlready = enqueuedJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo) && enqueuedjobStatesToCheck.Contains(e.Value.State));

    if (enqueuedJobsAlready > 0)
        disableJob = true;

    // scheduledJobs
    if (!disableJob)
    {
        // check if there are any scheduledJobs that are processing
        var scheduledJobs = monitoringApi.ScheduledJobs(0, 1000);
        var scheduledJobsAlready = scheduledJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo));

        if (scheduledJobsAlready > 0)
            disableJob = true;
    }

    // failedJobs
    if (!disableJob)
    {
        var failedJobs = monitoringApi.FailedJobs(0, 1000);
        var failedJobsAlready = failedJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo));

        if (failedJobsAlready > 0)
            disableJob = true;
    }

    // if runBefore is true, then lets remove the current job running, else it will write a "successful" message in the logs
    if (disableJob)
    {
        // use hangfire delete, for cleanup
        BackgroundJob.Delete(jobId);

        // create our sqlBuilder to remove the entries altogether including the count
        var sqlBuilder = new SqlBuilder()
            .DELETE_FROM("Hangfire.[Job]")
            .WHERE("[Id] = {0};", jobId);

        sqlBuilder.Append("DELETE TOP(1) FROM Hangfire.[Counter] WHERE [Key] = 'stats:deleted' AND [Value] = 1;");

        using (var cmd = _context.CreateCommand(sqlBuilder))
            await cmd.ExecuteNonQueryAsync();
        
        return true;
    }

    return false;
}

Each method has something like the following attributes as well

public interface IAlertScheduleService
{
    [Hangfire.Queue("alert-schedule")]
    [Hangfire.DisableConcurrentExecution(60 * 60 * 5)]
    Task RunAllAsync(PerformContext context);
}

Simple implementation of the interface

public class AlertScheduleService : IAlertScheduleService
{
    public Task RunAllAsync(PerformContext context)
    {
        if (IsAlreadyQueuedAsync(context))
            return;

        // guess it isnt queued, so run it here....
    }
}

Here is how i am adding my scheduled jobs

//// our recurring jobs
//// set these to run hourly, so they can play "catch-up" if needed
RecurringJob.AddOrUpdate<IAlertScheduleService>(e => e.RunAllAsync(null), Cron.Hourly(0), queue: "alert-schedule");

Why does this happen? How can i stop it happening?

Gillardo
  • 9,518
  • 18
  • 73
  • 141
  • In which context do you run `IsAlreadyQueuedAsync` ? A ServerFilter ? – jbl Jan 26 '22 at 17:00
  • @jbl I run it in the method that the job is running, I have updated my answer to show this for you – Gillardo Jan 29 '22 at 13:48
  • Currently, it does not generate multiple queues, the servers are sharing queues. For example: your `alert-schedule` queue is shared by 2 servers: `ALERT-SCHEDULE: 17764` and `ALERT-SCHEDULE:3336`. Not sure if what you really want is a separate queue for each server, something like `alert-schedule:17764` queue for server `ALERT-SCHEDULE: 17764` and `alert-schedule:3336` queue for `ALERT-SCHEDULE: 3336` – Khanh TO Jan 30 '22 at 05:01
  • I think what you see in the dashboard is expected behaviour and is not a problem. You have two physical servers (first column) for each of your queues (third column). Also I think you are should rely on filters and distributed locks to achieve your goal. Have a look at the implementation of DisableConcurrentExecution https://github.com/HangfireIO/Hangfire/blob/master/src/Hangfire.Core/DisableConcurrentExecutionAttribute.cs. An implementation of IElectStateFilter should help too : https://github.com/HangfireIO/Hangfire/issues/963 – jbl Jan 31 '22 at 18:15
  • @KhanhTO thanks for the reply. Yes i want only 1 queue, as both servers are running the code and this job for example is scheduled to run every hour. When the hour is up, i only want it to run on one of the servers, i dont want it being queued and executed on both, which it seems to be doing sometimes? – Gillardo Feb 01 '22 at 09:41
  • @jbl thank you for the references. So in the filters, can i check if the job is currently running like i do in the hangfireService code above and if so, set this job to not run? Example would be great if possible, happy to give bounty – Gillardo Feb 01 '22 at 09:46
  • @Gillardo: Not sure about how you enqueue your jobs every hour. If duplicated jobs are enqueued, it should be fixed from enqueuing jobs side. I posted an answer below to address the problem of re-execution of the same enqueued job. – Khanh TO Feb 03 '22 at 06:59
  • @KhanhTO I have added how i am adding my recurring job above at the end. I am assuming this is correct? – Gillardo Feb 03 '22 at 15:41
  • @Gillardo: there seems to be a bug with your currently used hangfire storage implementation. I have added more detail to my answer – Khanh TO Feb 06 '22 at 03:44

2 Answers2

1

Somewhat of a blind shot, preventing a job to be queued if a job is already queued in the same queue. The try-catch logic is quite ugly but I have no better idea right now... Also, really not sure the lock logic always prevents from having two jobs in EnqueudState, but it should help anyway. Maybe mixing with an IApplyStateFilter.

public class DoNotQueueIfAlreadyQueued : IElectStateFilter
{
    public void OnStateElection(ElectStateContext context)
    {
        if (context.CandidateState is EnqueuedState)
        {
            EnqueuedState es = context.CandidateState as EnqueuedState;
            IDisposable distributedLock = null;
            try
            {
                while (distributedLock == null)
                {
                    try
                    {
                        distributedLock = context.Connection.AcquireDistributedLock($"{nameof(DoNotQueueIfAlreadyQueued)}-{es.Queue}", TimeSpan.FromSeconds(1));
                    }
                    catch { }
                }

                var m = context.Storage.GetMonitoringApi();
                if (m.EnqueuedCount(es.Queue) > 0)
                {
                    context.CandidateState = new DeletedState();
                }
            }
            finally
            {
                distributedLock.Dispose();
            }
        }
    }
}

The filter can be declared as in this answer

jbl
  • 15,179
  • 3
  • 34
  • 101
0

There seems to be a bug with your currently used hangfire storage implementation:

https://github.com/HangfireIO/Hangfire/issues/1025

The current options are:

  1. Switching to HangFire.LiteDB as commented here: https://github.com/HangfireIO/Hangfire/issues/1025#issuecomment-686433594

  2. Implementing your own logic to enqueue a job, but this would take more effort.

  3. Making your job execution idempotent to avoid side effects in case it's executed multiple times.

In either option, you should still apply DisableConcurrentExecution and make your job execution idempotent as explained below, so i think you can just go with below option:

Applying DisableConcurrentExecution is necessary, but it's not enough as there are no reliable automatic failure detectors in distributed systems. That's the nature of distributed systems, we usually have to rely on timeouts to detect failures, but it's not reliable.

Hangfire is designed to run with at-least-once execution semantics. Explained below:

One of your servers may be executing the job, but it's detected as being failed due to various reasons. For example: your current processing server does not send heartbeats in time due to a temporary network issue or due to temporary high load.

When the current processing server is assumed to be failed (but it's not), the job will be scheduled to another server which causes it to be executed more than once.

The solution should be still applying DisableConcurrentExecution attribute as a best effort to prevent multiple executions of the same job, but the main thing is that you need to make the execution of the job idempotent which does not cause side effects in case it's executed multiple times.

Please refer to some quotes from https://docs.hangfire.io/en/latest/background-processing/throttling.html:

Throttlers apply only to different background jobs, and there’s no reliable way to prevent multiple executions of the same background job other than by using transactions in background job method itself. DisableConcurrentExecution may help a bit by narrowing the safety violation surface, but it heavily relies on an active connection, which may be broken (and lock is released) without any notification for our background job.

As there are no reliable automatic failure detectors in distributed systems, it is possible that the same job is being processed on different workers in some corner cases. Unlike OS-based mutexes, mutexes in this package don’t protect from this behavior so develop accordingly.

DisableConcurrentExecution filter may reduce the probability of violation of this safety property, but the only way to guarantee it is to use transactions or CAS-based operations in our background jobs to make them idempotent.

You can also refer to this as Hangfire timeouts behavior seems to be dependent on storage as well: https://github.com/HangfireIO/Hangfire/issues/1960#issuecomment-962884011

Khanh TO
  • 48,509
  • 13
  • 99
  • 115