0

I have started looking into TPL Dataflow as a solution to a processing workflow.

The gist of the processing workflow is to read in input messages from multiple tables and create four reflecting objects from them and persist them to four other tables, so each input message should result in four new messages getting created.

I cannot identify one of the predefined blocks that can help with the creation of the four objects, at first TransformManyBlock seems to be what I am looking for but it returns multiple objects of the same type where I will have four types.

Example of the Problem

We have two tables containing Employee details from two legacy systems, their entities look like this

public partial class EmployeeTblA
{
    public int Id { get; set; }
    public int System { get; set; }
    public string Forename { get; set; }
    public string Surname { get; set; }
    public int Age { get; set; }
    public int Number { get; set; }
    public string Street { get; set; }
    public string PostCode { get; set; }

    public virtual EmployeeSystem SystemNavigation { get; set; }
}

public partial class EmployeeTblB
{
    public int Id { get; set; }
    public int System { get; set; }
    public string Name { get; set; }
    public int Age { get; set; }
    public string Address { get; set; }
    public string Postcode { get; set; }

    public virtual EmployeeSystem SystemNavigation { get; set; }
}

We want to take the data from the two systems and put the data into our shiny new system, to do this we need to convert the entities from the old system to the entities used in the new system. First we convert the entities from the old system to a base class which look like this

public class BaseEmployee
{
    public int Id { get; set; }
    public int System { get; set; }
    public string Name { get; set; }
    public int Age { get; set; }
    public string Address { get; set; }
    public string Postcode { get; set; }
}

We then want to create three new objects from the base class that represent the entities of the new system which looks like this

public partial class EmployeeName
{
    public int Id { get; set; }
    public int System { get; set; }
    public int LegacyId { get; set; }
    public string Name { get; set; }

    public virtual EmployeeSystem SystemNavigation { get; set; }
}

public partial class EmployeeAge
{
    public int Id { get; set; }
    public int System { get; set; }
    public int LegacyId { get; set; }
    public int Age { get; set; }

    public virtual EmployeeSystem SystemNavigation { get; set; }
}

public partial class EmployeeAddress
{
    public int Id { get; set; }
    public int System { get; set; }
    public int LegacyId { get; set; }
    public string Address { get; set; }
    public string Postcode { get; set; }

    public virtual EmployeeSystem SystemNavigation { get; set; }
}

The rough flow of my TPL for the above example

  1. Read data from tables in DB into a TranformBlock<Employee,BaseEmployee> converting to a common object, this is down twice for each of the legacy systems

  2. Each TranformBlock<Employee, BaseEmployee> is linked to a BatchBlock to group all the input flows.

  3. BatchBlock is linked to a Block<BaseEmployee, ...> which will take the input and create two new objects from the input data, EmployeeName and EmployeeAge.

  4. The Block<BaseEmployee, ...> will then be linked to Action block and Action which take save them to their respective table's in the DB

I know I can create a custom block but I cannot figure out how I could use it to provide the output to four separate linked ActionBlock using dataflow, can someone please point me in the right direction?

beaver559
  • 144
  • 2
  • 12
  • TBH I am a bit confused by the presentation of the problem. It could help if you could describe the whole process not in terms of Dataflow blocks, but in terms of entities. Could you construct a minimal example of specific entities coming out from the database, being transformed, and finally saved back into the DB? – Theodor Zoulias Feb 17 '21 at 11:16
  • @TheodorZoulias I have added an example please let me know if this paints the problem in enough detail – beaver559 Feb 17 '21 at 11:35
  • Each one of the two legacy systems has a different set of `Employee`s, or each `Employee` exists on both systems, and you have to join the data from both systems to get the unified info of each `Employee`? – Theodor Zoulias Feb 17 '21 at 12:06
  • @TheodorZoulias Different sets of employees – beaver559 Feb 17 '21 at 12:10
  • I just wanted to make sure that the existence of multiple legacy systems is not an essential component of the problem. You could have a single legacy system, and the core problem would be exactly the same, correct? – Theodor Zoulias Feb 17 '21 at 12:13
  • Precisely! We need to do some in the middle processing between the systems no matter if there is 1 or 100 input legacy systems, Dataflow just allows for a high degree of pluggability when we have more than 1 input. – beaver559 Feb 17 '21 at 12:16
  • What you are trying to do remainds me of this question: [Combining dataflow results](https://stackoverflow.com/questions/45581714/combining-dataflow-results). But you probably need something simpler. A custom block may not be absolutely necessary. You could just use an `ActionBlock`, that would create one `EmployeeNames` and one `EmployeeAges` for each incoming `BaseEmployee`, and then `Post` or `SendAsync` these entities directly to two other blocks. In other words you could replace the `LinkTo` functionality with manual propagation of messages from block to block. – Theodor Zoulias Feb 17 '21 at 12:28
  • This is something I was thinking of, after the batch block exit the dataflow and perhaps create the objects or whatever and then manually post them to an action block? this is what you mean correct? Additionally I read is possible to supply a filtering function in the LinkTo method, I was thinking I could use a BroadcastBlock and filtering to make sure each linked ActionBlock received the correct type – beaver559 Feb 17 '21 at 12:30
  • The `BatchBlock` sounds like an extra complication that may not be essential to the problem. First and foremost a `BatchBlock` produces arrays, so you can't link it to an `ActionBlock`. The target must be an `ActionBlock`. I don't know if you want to do the "unbatching" there, and propagate single `EmployeeNames` and `EmployeeAges` entities, or you want to propagate arrays of them. Btw `EmployeeNames` and `EmployeeAges` are quite confusing names for classes that contain a single entity per class instance. – Theodor Zoulias Feb 17 '21 at 12:39
  • 1
    @TheodorZoulias You were correct in your assumption of the BatchBlock adding extra complications. I thought it was required to use it to aggregate different streams of data when in reality Dataflow allow me just to link multiple sources to a single input target which achieved the extra functionality I was looking for. – beaver559 Feb 18 '21 at 12:01

1 Answers1

1

The Broadcast block was the component I ended up going with, I used it to broadcast the BaseEmployee object to other output streams splitting out the reflecting objects I needed to create.

Full pipeline same below

         _transEmployeeA = new TransformBlock<EmployeeTblA, BaseMsg>((input) =>
         {
            return new BaseMsg()
            {
                Id = input.Id,
                System = input.System,
                Name = string.Concat(input.Forename, " ", input.Surname),
                Age = input.Age,
                Address = string.Concat(input.Number, " ", input.Street),
                Postcode = input.PostCode
            };
        });

        _transEmployeeB = new TransformBlock<EmployeeTblB, BaseMsg>((input) =>
        {
            return new BaseMsg()
            {
                Id = input.Id,
                System = input.System,
                Name = input.Name,
                Age = input.Age,
                Address = input.Address,
                Postcode = input.Postcode
            };
        });

        _broadcastBaseMsg = new BroadcastBlock<BaseMsg>(null);

        _transEmployeeName = new TransformBlock<BaseMsg, EmployeeName>((baseMsg) =>
        {
            return new EmployeeName()
            {
                System = baseMsg.System,
                LegacyId = baseMsg.Id,
                Name = baseMsg.Name
            };
        });

        _transEmployeeAge = new TransformBlock<BaseMsg, EmployeeAge>((baseMsg) =>
        {
            return new EmployeeAge()
            {
                System = baseMsg.System,
                LegacyId = baseMsg.Id,
                Age = baseMsg.Age
            };
        });

        _transEmployeeAddress = new TransformBlock<BaseMsg, EmployeeAddress>((baseMsg) =>
        {
            return new EmployeeAddress()
            {
                System = baseMsg.System,
                LegacyId = baseMsg.Id,
                Address = baseMsg.Address,
                Postcode = baseMsg.Postcode
            };
        });


        _bufferEmployeeName = new BufferBlock<EmployeeName>();
        _bufferEmployeeAge = new BufferBlock<EmployeeAge>();
        _bufferEmployeeAddress = new BufferBlock<EmployeeAddress>();

        _actionEmployeeName = new ActionBlock<EmployeeName>((output) =>
        {
            using (var cxt = new SandboxContext())
            {
                cxt.EmployeeNames.Add(output);
                cxt.SaveChanges();
            }
        });

        _actionEmployeeAge = new ActionBlock<EmployeeAge>((output) =>
        {                
            using (var cxt = new SandboxContext())
            {
                cxt.EmployeeAges.Add(output);
                cxt.SaveChanges();
            }                
        });

        _actionEmployeeAddress = new ActionBlock<EmployeeAddress>((output) =>
        {                
            using (var cxt = new SandboxContext())
            {
                cxt.EmployeeAddresses.Add(output);
                cxt.SaveChanges();
            }               
        });

        var linkOpts = new DataflowLinkOptions()
        {
            PropagateCompletion = true
        };

        // Transform Employees and pass to Batch
        _transEmployeeA.LinkTo(_broadcastBaseMsg, linkOpts);
        _transEmployeeB.LinkTo(_broadcastBaseMsg, linkOpts);

        // Transform Broadcast to respective outputs
        _broadcastBaseMsg.LinkTo(_transEmployeeName, linkOpts);
        _broadcastBaseMsg.LinkTo(_transEmployeeAge, linkOpts);
        _broadcastBaseMsg.LinkTo(_transEmployeeAddress, linkOpts);

        // Add outputs to Buffer
        _transEmployeeName.LinkTo(_bufferEmployeeName, linkOpts);
        _transEmployeeAge.LinkTo(_bufferEmployeeAge, linkOpts);
        _transEmployeeAddress.LinkTo(_bufferEmployeeAddress, linkOpts);

        // Persist outputs to DB
        _bufferEmployeeName.LinkTo(_actionEmployeeName, linkOpts);
        _bufferEmployeeAge.LinkTo(_actionEmployeeAge, linkOpts);
        _bufferEmployeeAddress.LinkTo(_actionEmployeeAddress, linkOpts);

Additionally comments from @TheodorZoulias helped to simply my usage of TPL dataflow for this particular dataflow.

beaver559
  • 144
  • 2
  • 12