0

Let me rephrase.

I have a method that generates strings(paths) after given a start string(path) IF those paths are for a directory I want to enqueue that in the input of the method.

After processing the path synchronously, I want to get the Data and clone it async into multiple paths of a pipeline, were each path needs to get the datablock. So the Broadcastblock is out of the question (it cant send a blocking signal to the blocks before itself), The joinblock, joining the results is relatively straight forward.

So to sum up Is there a Block in Dataflow block, where i can access the inputqueue from the delegate, if when, how? Is there a construct that acts like the broadcastblock but can block the blocks that came before it?

I tried doing it via almighty google:

class subversion
    {
        private static string repo;
        private static string user;
        private static string pw;
        private static DateTime start;
        private static DateTime end;

        private static List<parserObject> output;
        public static List<parserObject> svnOutputList
        {
            get {return output; }
        }

        private static List<string> extension_whitelist;

        public async void run(string link, string i_user, string i_pw, DateTime i_start, DateTime i_end)
        {
            repo = link;
            user = i_user;
            pw = i_pw;
            start = i_start;
            end = i_end;


            output = new List<parserObject>();
        BufferBlock<string> crawler_que = new BufferBlock<string>();
        BufferBlock<svnFile> parser_que = new BufferBlock<svnFile>();

       var svn = crawl(crawler_que, parser_que);

        var broadcaster = new ActionBlock<svnFile>(async file =>
        {//tried to addapt the code from this ensure always send broadcastblock -> see link below
            List<Task> todo = new List<Task>();
            todo.Add(mLoc);//error cannot convert methodgroup to task

            foreach (var task in todo)//error: Only assignment, call, increment, decrement, await, and new object expressions can be used as a statement?

            {
                task.SendAsync(file);//error cannot convert task to targetblock

            }
            await Task.WhenAll(todo.ToArray());
        });

        parser_que.LinkTo(broadcaster);
        await Task.WhenAll(broadcaster, svn);//error cannot convert actionblock to task
    }

    private static async Task crawl(BufferBlock<string> in_queue, BufferBlock<svnFile> out_queue)
    {
        SvnClient client = new SvnClient();
        client.Authentication.ForceCredentials(user, pw);

        SvnListArgs arg = new SvnListArgs
        {
            Depth = SvnDepth.Children,
            RetrieveEntries = SvnDirEntryItems.AllFieldsV15
        };

        while (await in_queue.OutputAvailableAsync())
        {
            string buffer_author = null;
            string prev_author = null;
            System.Collections.ObjectModel.Collection<SvnListEventArgs> contents;

            string link = await in_queue.ReceiveAsync();
            if (client.GetList(new Uri(link), arg, out contents))
            {
                foreach (SvnListEventArgs item in contents)
                {
                    if (item.Entry.NodeKind == SvnNodeKind.Directory)
                    {
                        in_queue.Post(item.Path);
                    }
                    else if (item.Entry.NodeKind == SvnNodeKind.File)
                    {
                        try
                        {
                            int length = item.Name.LastIndexOf(".");
                            if (length <= 0)
                            {
                                continue;
                            }
                            string ext = item.Name.Substring(length);
                            if (extension_whitelist.Contains(ext))
                            {
                                Uri target = new Uri((repo + link));
                                SvnRevisionRange range;
                                SvnBlameArgs args = new SvnBlameArgs
                                {
                                    Start = start.AddDays(-1),
                                    End = end
                                };
                                try
                                {
                                    svnFile file_instance = new svnFile();
                                    client.Blame(target, args, delegate(object sender3, SvnBlameEventArgs e)
                                    {
                                        if (e.Author != null)
                                        {
                                            buffer_author = e.Author;
                                            prev_author = e.Author;
                                        }
                                        else
                                        {
                                            buffer_author = prev_author;
                                        }
                                        file_instance.lines.Add(new svnLine(buffer_author, e.Line));
                                    });
                                    out_queue.Post(file_instance);
                                }
                                catch (Exception a) { Console.WriteLine("exception:" + a.Message);}
                            }
                        }
                        catch (Exception a)
                        {
                        }
                    }
                }
            }
        }
    }
    private static async Task mLoc(svnFile file)
    {
        List<parserPart> parts = new List<parserPart>();
        int find;
        foreach (svnLine line in file.lines)
        {
            if ((find = parts.FindIndex(x => x.uploader_id == line.author)) > 0)
            {
                parts[find].count += 1;
            }
            else
            {
                parts.Add(new parserPart(line.author));
            }
            find = 0;
        }
        parserObject ret = new parserObject(parts, "mLoc");
        await output.Add(ret);
        return;
    }
}

broadcastblock answer: Alternate to Dataflow BroadcastBlock with guaranteed delivery

Community
  • 1
  • 1
Git
  • 214
  • 5
  • 16
  • 1
    In #1 you've declared a parameter named `string` that just happens to be of type string. You coud have named it `path`, if that's what it is. As for #2, what do you mean? Did you try to create and Link two different TransformBlocks, one for GetFile and another for ProcessFile? Eg, a `TransformBlock` and a `TransformBlock`? The question about queues is also confusing - TranformBlocks already buffer their inputs. Please post the *actual* code you tried – Panagiotis Kanavos Mar 22 '16 at 15:59
  • this is almost the code that i used, i tried to mess with it a little bit and know the broadcastblock is completly broken =( – Git Mar 22 '16 at 16:34
  • 2
    Where did that `input` come from? What is it supposed to contain? The actual *input* to the block is the `path` parameter that isn't actually used anywhere. You *don't* need to access the input buffer yourself, the TransformBlock does this for you . Did you want to post a repo link to the block? – Panagiotis Kanavos Mar 22 '16 at 16:43
  • The input variable is the rest of the bufferblock test, but that doesn't pertain to my problem, my problem is, how do I access the transformation block input que from the inside, or do I need to put the crawler part infant of the transformation block – Git Mar 23 '16 at 19:25
  • you just vandalized the question, making an answer impossible. You missed the point completely anyway. That `input` **IS THE BLOCK'S INPUT**. You don't need to do *anything* to get to it. If you call *Post(someURL)* on that block, then your lambda will be called with the `input` parameter containing the URL you posted. Whatever gets returned from your lambda will be passed to the next block – Panagiotis Kanavos Mar 24 '16 at 09:16
  • A TPL Dataflow is just like a Powershell or bash pipeline. You don't need to do anything to read the input or write to the output - just write your function and the library will pass the data between them. You also *shouldn't* try to do everything in a single step, just like with a pipeline. Break it in multiple steps, each of which does *one* job. Eg, step 1 - read the list of entries from one URL and return it. Step 2- read the file pointed to by one URL and return the content. Step 3- analyze the blob and return the analysis results – Panagiotis Kanavos Mar 24 '16 at 09:20
  • For steps that read one input but produce multiple outputs you can use the TransformManyBlock to convert the list of results to individual results. No need for downstream blocks to know about queues, lists etc if they only want to process a single file – Panagiotis Kanavos Mar 24 '16 at 09:23
  • Can I put async methods inside the TransformManyBlock? Because I dont see a way, that i would get a speedboost from waiting async for the whole block to finish then to wait for the whole method to finish in sync? Do you understand what I mean? – Git Mar 24 '16 at 12:02
  • I know that the input part of the input is the GODDAM INPUT we are spinning in circles, – Git Mar 24 '16 at 12:05
  • sorry, you really need to understand how pipelines and dataflows work. The code you deleted showed an attempt to treat that string as if it were a list of inputs. As for asynchronous lambdas - all active blocks support asynchronous methods. Besides, *asynchrony* is guaranteed by the *block* not the `async` keyword. Each function is executed in a separate task. Even if you use a method *without* `async` it will *still* run in a separate Task/Thread. The ManyBlock is used to *split* multiple results so that each one can be processed separately, possibly in parallel. – Panagiotis Kanavos Mar 24 '16 at 12:11
  • And finally, the Many block expect a result of IEnumerable, which means you can use an iterator to `yield return` each individual result without waiting to collect them all – Panagiotis Kanavos Mar 24 '16 at 12:11
  • so i can access a part of the result before the rest of the same many block is not completed? – Git Mar 24 '16 at 12:28

0 Answers0