0

I am running a Parallel.ForEach in C#. I am getting below TaskCanceledException: A task was canceled. Can anyone suggest where I can fix my Task cancellation, or increase my connection timeout?

My code:

private static void Main()
{
    var SQLServerName = "XXXXXXXXXX.database.windows.net";
    var SQLServerAdmin = "XXXXXXXXXX";
    var SQLServerAdminPasword = "XXXXXXXXXX";
    var DatabaseName = "XXXXXXXXXX";

    string SQLStatement = ($"DELETE FROM ResourceStatus WHERE ResourceType = 'RBAC' " +
        $"TRUNCATE TABLE RBACStaging ");
    Helper.ExecuteTSQL(SQLServerName, SQLServerAdmin, SQLServerAdminPasword,
        DatabaseName, SQLStatement);

    SqlConnectionStringBuilder builder = new SqlConnectionStringBuilder();

    builder.DataSource = SQLServerName;
    builder.UserID = SQLServerAdmin;
    builder.Password = SQLServerAdminPasword;
    builder.InitialCatalog = DatabaseName;
    List<string> Subscriptions = new List<string>();

    using (SqlConnection connection = new SqlConnection(builder.ConnectionString))
    {
        connection.Open();
        StringBuilder sb = new StringBuilder();
        sb.Append($"SELECT SubscriptionID from Subscriptions " +
            $"where SOX = 'SR0' and Environment = 'Prod'");
        string sql = sb.ToString();

        using (SqlCommand command = new SqlCommand(sql, connection))
        {
            using (SqlDataReader Reader = command.ExecuteReader())
            {
                while (Reader.Read())
                {
                    Subscriptions.Add(Reader.GetString(0).ToString());
                }
                connection.Close();
                SqlConnection.ClearPool(connection);
            }
        }
    }
    Parallel.ForEach(Subscriptions, s =>
    {
        string SubscriptionID = s.Replace(" ", String.Empty);
        RBACSnapshot.GetRBACSnapshot(SubscriptionID).Wait();
    });
}

GetRBACSnapshot method:

public static async Task GetRBACSnapshot(string SubscriptionID)
{
    var ClientID = "de9f7784-93e4-42d0-a68d-7f1457ce4e56";
    var AppKey = "XXXXXXXX";
    var SQLServerName = "XXXXXXXX.database.windows.net";
    var SQLServerAdmin = "XXXXXXXX";
    var SQLServerAdminPasword = "XXXXXXXX";
    var DatabaseName = "XXXXXXXX";
    string TenantID = null;
    string ServiceGroupName = null;
    string TeamGroupName = null;
    string ServiceName = null;
    string ServiceTreeID = null;
    string Level = null;
    string SOX = null;
    string SubscriptionName = null;
    string EnvironmentScope = null;
    string Tenant = null;

    DateTime RawDate = DateTime.Now;
    string RefreshedAt = RawDate.ToString("yyyy-MM-dd HH:mm:ss.fff");
    string SQLStatement = null;
    SQLStatement = ($"INSERT INTO ResourceStatus " +
                    $"SELECT 'RBAC', '{SubscriptionID}', 'Started', '{RefreshedAt}'");
    Helper.ExecuteTSQL(SQLServerName, SQLServerAdmin, SQLServerAdminPasword,
        DatabaseName, SQLStatement);

    SqlConnectionStringBuilder builder = new SqlConnectionStringBuilder();

    builder.DataSource = SQLServerName;
    builder.UserID = SQLServerAdmin;
    builder.Password = SQLServerAdminPasword;
    builder.InitialCatalog = DatabaseName;

    using (SqlConnection connection = new SqlConnection(builder.ConnectionString))
    {
        connection.Open();
        StringBuilder sb = new StringBuilder();
        sb.Append($"SELECT ServiceGroupName, TeamGroupName, ServiceName, " +
            $"ServiceTreeID, Level, SOX, SubscriptionName, Environment, Tenant " +
            $"from Subscriptions where SubscriptionID = '{SubscriptionID}'");
        string sql = sb.ToString();

        using (SqlCommand command = new SqlCommand(sql, connection))
        {
            using (SqlDataReader Reader = command.ExecuteReader())
            {
                while (Reader.Read())
                {
                    ServiceGroupName = Reader.GetString(0).ToString();
                    TeamGroupName = Reader.GetString(1).ToString();
                    ServiceName = Reader.GetString(2).ToString();
                    ServiceTreeID = Reader.GetString(3).ToString();
                    Level = Reader.GetString(4).ToString();
                    SOX = Reader.GetString(5).ToString();
                    SubscriptionName = Reader.GetString(6).ToString();
                    EnvironmentScope = Reader.GetString(7).ToString();
                    Tenant = Reader.GetString(8).ToString();
                }
            }
        }
        connection.Close();
        SqlConnection.ClearPool(connection);
    }

    // AARE Logic
    if (Tenant.Equals("AME", StringComparison.OrdinalIgnoreCase))
    {
        TenantID = "33e01921-4d64-4f8c-a055-5bdaffd5e33d";
    }
    else if (Tenant.Equals("GME", StringComparison.OrdinalIgnoreCase))
    {
        TenantID = "124edf19-b350-4797-aefc-3206115ffdb3";
    }
    else if (Tenant.Equals("PME", StringComparison.OrdinalIgnoreCase))
    {
        TenantID = "975f013f-7f24-47e8-a7d3-abc4752bf346";
    }
    else if (Tenant.Equals("Corp", StringComparison.OrdinalIgnoreCase))
    {
        TenantID = "72f988bf-86f1-41af-91ab-2d7cd011db47";
    }
    string AzToken = await Helper.GetAccessTokenAsync(TenantID, ClientID, AppKey);
    string MSGraphToken = await Helper.GetGraphAccessToken(TenantID, ClientID, AppKey)
        .ConfigureAwait(true);

    var httpClient = new HttpClient
    {
        BaseAddress = new Uri("https://management.azure.com/subscriptions/")
    };
    string URI = $"{SubscriptionID}/providers/Microsoft.Authorization/roleAssignments?" +
        $"api-version=2018-09-01-preview";
    httpClient.DefaultRequestHeaders.Remove("Authorization");
    httpClient.DefaultRequestHeaders.Add("Authorization", "Bearer " + AzToken);
    HttpResponseMessage response = await httpClient.GetAsync(URI).ConfigureAwait(false);

    var HttpsResponse = await response.Content.ReadAsStringAsync();
    dynamic Result = JsonConvert.DeserializeObject<object>(HttpsResponse);

    if (Result["value"] != null)
    {
        foreach (dynamic item in Result["value"])
        {
            string ObjectID = item.properties.principalId;
            string ObjectType = item.properties.principalType;
            string ObjectCategory = null;

            if (ObjectType.Equals("ServicePrincipal", StringComparison.OrdinalIgnoreCase))
            {
                ObjectCategory = "servicePrincipals";
            }

            else if (ObjectType.Equals("User", StringComparison.OrdinalIgnoreCase))
            {
                ObjectCategory = "users";
            }

            else if (ObjectType.Equals("Group", StringComparison.OrdinalIgnoreCase))
            {
                ObjectCategory = "groups";
            }

            string AccessScope = item.properties.scope;
            string DisplayName = await Helper.GetDisplayName(TenantID, ObjectID,
                MSGraphToken, ObjectCategory);
            string RoleDefinitionIDFullText = item.properties.roleDefinitionId;
            string RoleDefinitionID = RoleDefinitionIDFullText.Substring(
                RoleDefinitionIDFullText.Length - 36);
            string AccessLevel = await Helper.GetRoleDefinitionName(RoleDefinitionID,
                SubscriptionID, AzToken);
            string ProvisionedByObjID = item.properties.createdBy;
            string ProvisionedBy = await Helper.GetDisplayName(TenantID,
                ProvisionedByObjID, MSGraphToken, "Decide");
            string ProvisionedAt = item.properties.createdOn;
            string Compliant = Helper.ComplianceValidation(SOX, EnvironmentScope,
                DisplayName, ObjectType, AccessLevel, ProvisionedBy, AccessScope);
            RawDate = DateTime.Now;
            RefreshedAt = RawDate.ToString("yyyy-MM-dd HH:mm:ss.fff");
            string Action = null;
            if (Compliant.Equals("Yes"))
            {
                Action = "No Action Needed";
            }
            else if (Compliant.Equals("No"))
            {
                Action = "Revoke";
            }
            else
            {
                Action = "Yet To Decide";
            }

            Console.WriteLine($"{ServiceGroupName}, {TeamGroupName}, {ServiceName}, " +
                $"{ServiceTreeID}, {Level}, {SOX}, {SubscriptionName}, {SubscriptionID}, " +
                $"{EnvironmentScope}, {Tenant}, {DisplayName}, {ObjectType}, {ObjectID}, " +
                $"{AccessLevel}, {ProvisionedBy}, {ProvisionedAt}, {AccessScope}, " +
                $"{Compliant}, {Action}, {ProvisionedByObjID}, {RefreshedAt}"); ;

            SQLStatement = ($"INSERT INTO RBACStaging Select '{ServiceGroupName}'," +
                $"'{TeamGroupName}','{ServiceName}','{ServiceTreeID}','{Level}','{SOX}'" +
                $",'{SubscriptionName}','{SubscriptionID}','{EnvironmentScope}'," +
                $"'{Tenant}','{DisplayName}','{ObjectType}','{ObjectID}','{AccessLevel}'," +
                $"'{ProvisionedBy}','{ProvisionedAt}','{AccessScope}','{Compliant}'," +
                $"'{Action}','{ProvisionedByObjID}','{RefreshedAt}'");
            try
            {
                Helper.ExecuteTSQL(SQLServerName, SQLServerAdmin, SQLServerAdminPasword,
                    DatabaseName, SQLStatement);
            }
            catch (System.Data.SqlClient.SqlException sqlException)
            {
                Console.WriteLine(sqlException.Message);
            }
        }
    } // End of IF Condition to check Null value from Rest API Calls.
    RawDate = DateTime.Now;
    RefreshedAt = RawDate.ToString("yyyy-MM-dd HH:mm:ss.fff");
    SQLStatement = ($"update ResourceStatus Set Status = 'Completed', " +
        $"LastUpdatedTime = '{RefreshedAt}' where ResourceType = 'RBAC' and " +
        $"SubscriptionID = '{SubscriptionID}'");
    Helper.ExecuteTSQL(SQLServerName, SQLServerAdmin, SQLServerAdminPasword,
        DatabaseName, SQLStatement);
} // End of Method

The exception:

TaskCanceledException: A task was canceled
at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at System.Threading.Tasks.Task.Wait()
   at System.Threading.Tasks.Parallel.ForWorker[TLocal](Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Func`4 bodyWithLocal, Func`1 localInit, Action`1 localFinally)
   at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](IList`1 list, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally)
   at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](IEnumerable`1 source, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally)
   at System.Threading.Tasks.Parallel.ForEach[TSource](IEnumerable`1 source, Action`1 body)
   at CPXFundamentals.Program.Main() in C:\Users\venkatag\source\repos\CPXFundamentals\Program.cs:line 47
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Vinny
  • 461
  • 1
  • 5
  • 18
  • As a side note the `N++;` [is not thread-safe](https://stackoverflow.com/questions/4628243/is-the-operator-thread-safe), and you are calling it inside the body of a parallel loop. – Theodor Zoulias Jul 31 '21 at 07:08
  • Regarding your main question, we need to see the `RBACSnapshot.GetRBACSnapshot` method before being able to provide any useful advice. – Theodor Zoulias Jul 31 '21 at 07:10
  • This is really not the way to parallelize async functions anyway, see https://stackoverflow.com/questions/15136542/parallel-foreach-with-asynchronous-lambda – Charlieface Aug 01 '21 at 03:00
  • Theodor Zoulias: I added the GETRBACSnapshot Method. Can someone please tell me how to fix the Task Cancelled issue. – Vinny Aug 02 '21 at 23:06
  • I added both my Main Method and GetRBACSnapshot Method – Vinny Aug 02 '21 at 23:08

1 Answers1

2

You may want to consider just catching any TaskCanceledExceptions if you have no access to the implementation of GetRBACSnapshot.

For example you could catch the TaskCanceledException and add that subscription back into a queue to be retried at a later time.

Just be sure not to consume all errors, and solely consume TaskCanceledException. You should avoid unintentionally consuming critical runtime errors.

Something similar to this might work for you:

int N = 1;
Parallel.ForEach(Subscriptions, s =>
{
    string SubscriptionID = s.Replace(" ", String.Empty);
    Console.WriteLine($"Working on {N}. {SubscriptionID}");
    
    try
    {
        RBACSnapshot.GetRBACSnapshot(SubscriptionID).Wait();
        Interlocked.Increment(ref N);
    }
    catch(TaskCanceledException)
    {
        Console.WriteLine($"{SubscriptionID} Timed out");
        // do something else like add it back to a queue to be tried again
        // just make sure any collection you add the subscription to is thread safe
    }
});
DekuDesu
  • 2,224
  • 1
  • 5
  • 19