8

I'm writing an application that uses Socket and it will be very intensive then I really need use every core we have in our big server. I see the question ( how to using ThreadPool to run socket thread parallel? ) here in stackoverflow there is only one answer that point to this MSDN Sample.

But I think it point only how to make it Concurrent and not Parallel, here is someone asking How cpu intensive is opening a socket and its looks be very intensive, someone here tell its dont help TPL TaskFactory.FromAsync vs Tasks with blocking methods and someone teach how to do it here whith TaskFactory.FromAsync (Is there a pattern for wrapping existing BeginXXX/EndXXX async methods into async tasks?).

How can I keep socket operations parallel and performant and if deal whith socket problems like disconnections, half connected sockets and message boundaries are a headache in normal async way. How to deal with it if its put together TPL and Task.

Community
  • 1
  • 1
dhj
  • 81
  • 1
  • 3
  • someone did it here: http://www.cachelog.net/using-reactive-extensions-rx-tpl-for-socket-programming/ –  Mar 01 '12 at 17:59
  • I hope it help you.
    [Non-blocking server: with Tasks](http://www.albahari.com/nutshell/cs4ch23.aspx)
    [Blocking server: with Tasks](http://books.google.com.br/books?id=IzU3B7mjI90C&lpg=PA418&ots=DwXsEocwiu&dq=parallel%20tpl%20socket&pg=PA418#v=onepage&q=parallel%20tpl%20socket&f=false) Bye
    –  Apr 29 '11 at 19:12

2 Answers2

3

see that:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace skttool
{
    public class StateObject
    {
        public Socket workSocket = null;
        public const int BufferSize = 1024;
        public byte[] buffer = new byte[BufferSize];
        public int bytesRead = 0;
        public StringBuilder sb = new StringBuilder();
    }

    public class tool
    {
        //-------------------------------------------------
        private ManualResetEvent evtConnectionDone = new ManualResetEvent(false);
        private Socket skttool = null;
        private bool running = false;
        private StateObject state = null;
        //-------------------------------------------------
        toolConfig _cfg;
        public tool(toolConfig cfg)
        {
            _cfg = cfg;
        }
        //-------------------------------------------------
        public void socketListeningSet()
        {
            IPEndPoint localEndPoint;
            Socket skttool;
            byte[] bytes = new Byte[1024];
            localEndPoint = new IPEndPoint(IPAddress.Any, _cfg.addressPort);
            skttool = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            skttool.Bind(localEndPoint);
            skttool.Listen(_cfg.maxQtdSockets);
        }
        //-------------------------------------------------
        public void start()
        {
            running = true;
            Task T1 = Task.Factory.StartNew(socketListeningSet);
            T1.ContinueWith(prev =>
            {
                while (running)
                {
                    evtConnectionDone.Reset();
                    Task<Socket> accepetChunk = Task<Socket>.Factory.FromAsync(
                                                                       skttool.BeginAccept,
                                                                       skttool.EndAccept,
                                                                       accept,
                                                                       skttool,
                                                                       TaskCreationOptions.AttachedToParent);
                    accepetChunk.ContinueWith(accept, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
                    evtConnectionDone.WaitOne();
                }
            });
        }
        //-------------------------------------------------
        void accept(Task<Socket> accepetChunk)
        {
            state = new StateObject();
            evtConnectionDone.Set();
            state.workSocket = accepetChunk.Result;
            Task<int> readChunk = Task<int>.Factory.FromAsync(
                                                       state.workSocket.BeginReceive,
                                                       state.workSocket.EndReceive,
                                                       state.buffer,
                                                       state.bytesRead,
                                                       state.buffer.Length - state.bytesRead,
                                                       null,
                                                       TaskCreationOptions.AttachedToParent);
            readChunk.ContinueWith(read, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
        }
        //-------------------------------------------------
        void read(Task<int> readChunk)
        {
            state.bytesRead += readChunk.Result;
            if (readChunk.Result > 0 && state.bytesRead < state.buffer.Length)
            {
                read();
                return;
            }
            _data = doTask(_data);
            Task<int> sendChunk = Task<int>.Factory.FromAsync(
                                                       state.workSocket.BeginSend,
                                                       state.workSocket.EndSend,
                                                       state.buffer,
                                                       state.bytesRead,
                                                       state.buffer.Length - state.bytesRead,
                                                       null,
                                                       TaskCreationOptions.AttachedToParent);
            sendChunk.ContinueWith(send, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
        }
        //-------------------------------------------------
        void send(Task<int> readChunk)
        {
            state.workSocket.Shutdown(SocketShutdown.Both);
            state.workSocket.Close();
        }
        //-------------------------------------------------
        byte[] doTask(byte[] data)
        {
            return Array.Reverse(data);
        }
        //-------------------------------------------------
    }
}
hd5ye
  • 31
  • 2
  • Hi hd5ye, very good sample. But there are several errors which make it very confusing. Can you make this a working sample?: 1) Constructor contains a second declaration of skttool, 2) recursive call to read doesn't work. 3) The FromAsync calls are not correct. Especially the double declaration of the skttool took me an hour of searching. – markwilde Dec 20 '12 at 07:13
1

See this link about TPL and Traditional .NET Asynchronous Programming, it dont answer but maybe can help you. There is information about Asynchronous Programming Model (APM) and Event-based asynchronous pattern (EAP)

kiresh
  • 11
  • 1