I'm building a tiny muti-threading download program using boost::asio::ip::tcp. I need each thread to deal with a part of data. I know it can solve the problem by adding "Range:bytes:xx-xx" to the request header. But I don't want to let the program connect to the server so many times. Is there any solution?
-
Or just tell me any alternative. – Soha May 14 '20 at 00:48
-
You can't *seek* through a TCP stream, because it's really just a one-directional stream of bytes. Once a byte have been read from the stream there's no way of getting it back, and it's (as you could image) impossible to go forward and read possibly future bytes (because they haven't been sent yet). You *can* (with adequate protection) let multiple threads each read a random amount of bytes from the stream though. – Some programmer dude May 14 '20 at 00:51
-
2With that said, what is the real problem you try to solve? Why do you want to "read randomly ... using multi-threading"? Please take some time to read about [the XY problem](http://xyproblem.info/), as your question is a good example of one. Always ask about the actual underlying problem directly instead, and tell us what you thought up for solution (like the random threaded reading). Also please take some time to read [the help pages](http://stackoverflow.com/help), read [ask], as well as [this question checklist](https://codeblog.jonskeet.uk/2012/11/24/stack-overflow-question-checklist/). – Some programmer dude May 14 '20 at 00:55
-
Sorry about the issue. I'm building a tiny muti-threading download program. I need each thread to deal with a part of data. I know I can add "Range:bytes:xx-xx" to the header to solve the problem. But I don't want to let the program connect to the server so many times. Is there any solution? – Soha May 14 '20 at 01:29
-
It's not really possible. As I said, TCP is just a stream of bytes, which must be read sequentially. Even if you have multiple threads reading from the stream, each one has to read their sequence in turn, and then you have to put it all back together in the right sequence afterward (which together with the thread synchronization will make the program more complex and slower). In short: Use only a single thread per connection to read all data. Perhaps create a multi-threaded protocol which can handle multiple simultaneously connections to read part of the original data. – Some programmer dude May 14 '20 at 01:33
-
Oh...That sounds complicated. Thanks anyway. – Soha May 14 '20 at 01:42
1 Answers
Just read it and dispatch to the worker threads when appropriate.
Having no clue what kind of chunks you want to separately handle, let's assume you read all of the prime numbers from https://www.mathsisfun.com/includes/primes-to-100k.zip, read them in chunks, then do some work on all the primes on separate threads.
What Is The Work?
Here's some lazy prime job:
void handle_batch(std::vector<size_t> params) {
if (!params.empty()) {
std::cout
<< "Batch n:" << params.size()
<< "\tRange [" << params.front() << ".." << params.back() << "]"
<< "\tSum:" << std::accumulate(begin(params), end(params), 0ull)
<< std::endl;
}
}
Yeah, we just print a description of the job params and their sum. We can doodle a bit on it to make it more lifelike, like making it take some time, and being aware that we are on worker threads, so we want to synchronize access to the console.
void handle_batch(std::vector<size_t> params) {
static std::mutex s_mx;
if (!params.empty()) {
// emulate some work, because I'm lazy
auto sum = std::accumulate(begin(params), end(params), 0ull);
// then wait some 100..200ms
{
using namespace std::chrono_literals;
std::mt19937 prng(std::random_device{}());
std::this_thread::sleep_for(
std::uniform_real_distribution<>(100,200)(prng)*1ms);
}
// simple thread id (thread::id displays ugly)
auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100;
// report results to stdout
std::lock_guard lk(s_mx); // make sure the output doesn't intermix
std::cout
<< "Thread #" << std::setw(2) << std::setfill('0') << tid
<< " Batch n:" << params.size()
<< "\tRange [" << params.front() << ".." << params.back() << "]"
<< "\tSum:" << sum
<< std::endl;
}
}
Okay, that's enough gore for the unimportant bits.
The Plan
Well, there's a slight complication with my chosen approach, because not only is that site using https (ugh), also it is serving up ZIP files (ugh). And we're using C++ (ugh?).
At least, we can do the the whole SSL connect business synchronously in not too much code, we want the reading to be asynchronously, though, because that way we can demonstrate that
- you can do a lot of intermixed IO on just the main thread using Boost Asio
- same goes for Boost Process to launch
zcat
as a child process to unzip the primes content (we'll assume UNIX-like system withzcat
installed) - which means we'll be asynchronously writing to that child process stdin
- and also asynchronously reading from its stdout
- spawning off batch jobs along the way as soon as they're ready
This should be pretty good model for your workload, because the workers take more time than the IO, however, we do many IO tasks on a single thread without blocking.
Let's Get The Data
As said, we will use a single thread for IO, and a thread pool for the batch workers:
int main() {
net::io_context io; // main thread does all io
net::thread_pool pool(6); // worker threads
There. That's a start. Now, we want to have a SSL connection, and request that ZIP. Here it is:
http::response_parser<http::buffer_body> res_reader;
beast::flat_buffer lookahead; // for the res_reader
std::array<char,512> buf{0}; // for download content
auto ctx = ssl_context();
ssl::stream<tcp::socket> s(io, ctx);
{ // synchronously write request
std::string host = "www.mathsisfun.com";
connect_https(s, host, tcp::resolver{io}.resolve(host, "https"));
http::write(s, get_request(host, "/includes/primes-to-100k.zip"));
http::read_header(s, lookahead, res_reader);
//std::cerr << "Headers: " << res_reader.get().base() << std::endl;
}
Yup, that already did the reading of the response headers¹. Of course we cheated because we need three helpers:
making an ssl context
auto ssl_context() { ssl::context ctx{ssl::context::sslv23}; ctx.set_default_verify_paths(); ctx.set_verify_mode(ssl::verify_peer); return ctx; }
connecting over SSL
void connect_https(stream& s, std::string const& host, tcp::resolver::iterator eps) { net::connect(s.lowest_layer(), eps); s.lowest_layer().set_option(tcp::no_delay(true)); if (!SSL_set_tlsext_host_name(s.native_handle(), host.c_str())) { throw system_error{ { (int)::ERR_get_error(), net::error::get_ssl_category() } }; } s.handshake(stream::handshake_type::client); }
making the HTTP request
auto get_request(std::string const& host, std::string const& path) { using namespace http; request<string_body> req; req.version(11); req.method(verb::get); req.target("https://" + host + path); req.set(field::user_agent, "test"); req.set(field::host, host); std::cerr << req << std::endl; return req; }
Not bad, for C++.
Pipe It Into zcat
Now we start with the asynchrony: let's have a "pump" or "loop" that sends all the response data into a pipe:
// now, asynchoronusly read contents
process::async_pipe pipe_to_zcat(io);
std::function<void(error_code, size_t)> receive_zip;
receive_zip
is what we call our loop. It's a self-chaining asynchronous operation. So, everytime it is called, it will pump some data into the pipe, and call one more async_read
for the HTTP response:
receive_zip = [&s, &response_reader, &pipe_to_zcat, &buf, &lookahead, &receive_zip]
(error_code ec, size_t /*ignore_this*/)
{
auto& res = response_reader.get();
auto& body = res.body();
if (body.data) {
auto n = sizeof(buf) - body.size;
net::write(pipe_to_zcat, net::buffer(buf, n));
}
bool done = ec && !(ec == http::error::need_buffer);
done += response_reader.is_done();
if (done) {
std::cerr << "receive_zip: " << ec.message() << std::endl;
pipe_to_zcat.close();
} else {
body.data = buf.data();
body.size = buf.size();
http::async_read(s, lookahead, response_reader, receive_zip);
}
};
This slightly complicated looking reading of a buffered response is almost literally from the documentation here.
Now, all we have to do is prime the pump:
// kick off receive loop
receive_zip(error_code{}, 0);
Intermezzo, Unzip
This is not the interesting part, let's go: We are launching a subprocess zcat
and want a second pipe to read the output from:
process::async_pipe zcat_output(io);
process::child zcat(
process::search_path("zcat"),
process::std_in < pipe_to_zcat,
process::std_out > zcat_output,
process::on_exit([](int exitcode, std::error_code ec) {
std::cerr << "Child process exited with " << exitcode << " (" << ec.message() << ")\n";
}), io);
End of intermission :)
(We even threw in error reporting because, why not?)
Ah, The Good Stuff: Primes On Tap!
Now, we have another async read loop, this time to read back the uncompressed primes. This is where we will assemble batch jobs to be handled on the worker pool.
std::function<void(error_code, size_t)> receive_primes;
net::streambuf sb;
Like receive_zip
before, receive_primes
is our loop driver, the sb
buffer is just a buffer which makes it easy to read using std::istream
as you'd normally do from std::cin
.
receive_primes = [&zcat_output, &sb, &receive_primes, &pool](error_code ec, size_t /*transferred*/) {
{
std::istream is(&sb);
size_t n = std::count(net::buffers_begin(sb.data()), net::buffers_end(sb.data()), '\n');
std::vector<size_t> batch(n);
std::copy_n(std::istream_iterator<size_t>(is), n, batch.begin());
is.ignore(1, '\n'); // we know a newline is pending, eat it to keep invariant
post(pool, std::bind(handle_batch, std::move(batch)));
}
if (ec) {
std::cerr << "receive_primes: " << ec.message() << std::endl;
zcat_output.close();
} else {
net::async_read_until(zcat_output, sb, "\n", receive_primes);
}
};
Because async_read_until
may read partial lines, we count the number (n
) of full lines in the buffer and pack them into a vector. After we make sure that we eat the impending newline, we ... post the batch job, finally:
post(pool, std::bind(handle_batch, std::move(batch)));
We move ownership to the task because it will run on a separate thread, and the best way to handle concurrency is to minimize sharing.
Again, priming the pump:
// kick off handler loop as well:
receive_primes(error_code{}, 0);
PUTTING IT ALL TOGETHER
Well. Prepare for the anticlimax. With all the async chains setup, all we need to do is... wait.
io.run();
pool.join();
} // end of main
The io.run()
keeps running both pumps and awaits the child process, all on the main thread, as we like.
The pool.join()
waits for all batch jobs to be completed, before stopping the thread pool. If you leave out that line, you might not run all the tasks, because the destructor of thread_pool
calls stop()
before it calls join()
.
Toy around with the buffer size (512 bytes in my example) to see how large batches become. Note that 512 bytes is compressed bytes.
"UNLIVE" DEMO
Sadly no online compiler that I know of supports external network access, so you'll have to run this one yourself. For convenience, here's a full listing, and sample output from a run on my computer:
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/http.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iomanip>
#include <iostream>
void handle_batch(std::vector<size_t> params) {
static std::mutex s_mx;
if (!params.empty()) {
// emulate some work, because I'm lazy
auto sum = std::accumulate(begin(params), end(params), 0ull);
// then wait some 100..200ms
{
using namespace std::chrono_literals;
std::mt19937 prng(std::random_device{}());
std::this_thread::sleep_for(
std::uniform_real_distribution<>(100,200)(prng)*1ms);
}
// simple thread id (thread::id displays ugly)
auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100;
// report results to stdout
std::lock_guard lk(s_mx); // make sure the output doesn't intermix
std::cout
<< "Thread #" << std::setw(2) << std::setfill('0') << tid
<< " Batch n:" << params.size()
<< "\tRange [" << params.front() << ".." << params.back() << "]"
<< "\tSum:" << sum
<< std::endl;
}
}
namespace net = boost::asio;
namespace ssl = net::ssl;
namespace beast = boost::beast;
namespace http = beast::http;
namespace process = boost::process;
using boost::system::error_code;
using boost::system::system_error;
using net::ip::tcp;
using stream = ssl::stream<tcp::socket>;
auto ssl_context() {
ssl::context ctx{ssl::context::sslv23};
ctx.set_default_verify_paths();
ctx.set_verify_mode(ssl::verify_peer);
return ctx;
}
void connect_https(stream& s, std::string const& host, tcp::resolver::iterator eps) {
net::connect(s.lowest_layer(), eps);
s.lowest_layer().set_option(tcp::no_delay(true));
if (!SSL_set_tlsext_host_name(s.native_handle(), host.c_str())) {
throw system_error{ { (int)::ERR_get_error(), net::error::get_ssl_category() } };
}
s.handshake(stream::handshake_type::client);
}
auto get_request(std::string const& host, std::string const& path) {
using namespace http;
request<string_body> req;
req.version(11);
req.method(verb::get);
req.target("https://" + host + path);
req.set(field::user_agent, "test");
req.set(field::host, host);
std::cerr << req << std::endl;
return req;
}
int main() {
net::io_context io; // main thread does all io
net::thread_pool pool(6); // worker threads
// outside for lifetime
http::response_parser<http::buffer_body> response_reader;
beast::flat_buffer lookahead; // for the response_reader
std::array<char,512> buf{0}; // for download content
auto ctx = ssl_context();
ssl::stream<tcp::socket> s(io, ctx);
{ // synchronously write request
std::string host = "www.mathsisfun.com";
connect_https(s, host, tcp::resolver{io}.resolve(host, "https"));
http::write(s, get_request(host, "/includes/primes-to-100k.zip"));
http::read_header(s, lookahead, response_reader);
//std::cerr << "Headers: " << response_reader.get().base() << std::endl;
}
// now, asynchoronusly read contents
process::async_pipe pipe_to_zcat(io);
std::function<void(error_code, size_t)> receive_zip;
receive_zip = [&s, &response_reader, &pipe_to_zcat, &buf, &lookahead, &receive_zip](error_code ec, size_t /*ignore_this*/) {
auto& res = response_reader.get();
auto& body = res.body();
if (body.data) {
auto n = sizeof(buf) - body.size;
net::write(pipe_to_zcat, net::buffer(buf, n));
}
bool done = ec && !(ec == http::error::need_buffer);
done += response_reader.is_done();
if (done) {
std::cerr << "receive_zip: " << ec.message() << std::endl;
pipe_to_zcat.close();
} else {
body.data = buf.data();
body.size = buf.size();
http::async_read(s, lookahead, response_reader, receive_zip);
}
};
// kick off receive loop
receive_zip(error_code{}, 0);
process::async_pipe zcat_output(io);
process::child zcat(
process::search_path("zcat"),
process::std_in < pipe_to_zcat,
process::std_out > zcat_output,
process::on_exit([](int exitcode, std::error_code ec) {
std::cerr << "Child process exited with " << exitcode << " (" << ec.message() << ")\n";
}), io);
std::function<void(error_code, size_t)> receive_primes;
net::streambuf sb;
receive_primes = [&zcat_output, &sb, &receive_primes, &pool](error_code ec, size_t /*transferred*/) {
{
std::istream is(&sb);
size_t n = std::count(net::buffers_begin(sb.data()), net::buffers_end(sb.data()), '\n');
std::vector<size_t> batch(n);
std::copy_n(std::istream_iterator<size_t>(is), n, batch.begin());
is.ignore(1, '\n'); // we know a newline is pending, eat it to keep invariant
post(pool, std::bind(handle_batch, std::move(batch)));
}
if (ec) {
std::cerr << "receive_primes: " << ec.message() << std::endl;
zcat_output.close();
} else {
net::async_read_until(zcat_output, sb, "\n", receive_primes);
}
};
// kick off handler loop as well:
receive_primes(error_code{}, 0);
io.run();
pool.join();
}
Output:
GET https://www.mathsisfun.com/includes/primes-to-100k.zip HTTP/1.1
User-Agent: test
Host: www.mathsisfun.com
receive_zip: Success
Child process exited with 0 (Success)
receive_primes: End of file
Thread #11 Batch n:95 Range [599..1237] Sum:86587
Thread #58 Batch n:170 Range [1249..2549] Sum:320714
Thread #34 Batch n:170 Range [2551..3919] Sum:549880
Thread #54 Batch n:170 Range [3923..5407] Sum:790922
Thread #30 Batch n:170 Range [5413..6863] Sum:1040712
Thread #60 Batch n:108 Range [2..593] Sum:28697
Thread #58 Batch n:170 Range [8429..9923] Sum:1560462
Thread #11 Batch n:170 Range [6869..8423] Sum:1298732
Thread #30 Batch n:146 Range [12703..14087] Sum:1956410
Thread #34 Batch n:147 Range [9929..11329] Sum:1563023
Thread #54 Batch n:146 Range [11351..12697] Sum:1758964
Thread #60 Batch n:146 Range [14107..15473] Sum:2164462
Thread #11 Batch n:146 Range [16943..18313] Sum:2576764
Thread #34 Batch n:146 Range [19861..21313] Sum:3003048
Thread #30 Batch n:146 Range [18329..19853] Sum:2790654
Thread #58 Batch n:146 Range [15493..16937] Sum:2365198
Thread #60 Batch n:146 Range [22721..24109] Sum:3422310
Thread #54 Batch n:146 Range [21317..22717] Sum:3212180
Thread #30 Batch n:146 Range [27179..28661] Sum:4081540
Thread #11 Batch n:146 Range [24113..25693] Sum:3640476
Thread #34 Batch n:146 Range [25703..27143] Sum:3859484
Thread #60 Batch n:146 Range [30223..31741] Sum:4525378
Thread #54 Batch n:146 Range [31751..33211] Sum:4746372
Thread #58 Batch n:146 Range [28663..30211] Sum:4297314
Thread #30 Batch n:146 Range [33223..34693] Sum:4958972
Thread #34 Batch n:146 Range [36307..37799] Sum:5408028
Thread #11 Batch n:146 Range [34703..36299] Sum:5184000
Thread #54 Batch n:146 Range [39371..40973] Sum:5865356
Thread #60 Batch n:146 Range [37811..39367] Sum:5637612
Thread #58 Batch n:146 Range [40993..42433] Sum:6091022
Thread #34 Batch n:146 Range [44029..45613] Sum:6541984
Thread #54 Batch n:146 Range [47287..48817] Sum:7013764
Thread #30 Batch n:146 Range [42437..44027] Sum:6308156
Thread #11 Batch n:146 Range [45631..47279] Sum:6780582
Thread #58 Batch n:146 Range [50341..51913] Sum:7470486
Thread #34 Batch n:146 Range [51929..53569] Sum:7701048
Thread #60 Batch n:146 Range [48821..50333] Sum:7239008
Thread #54 Batch n:146 Range [53591..55147] Sum:7934798
Thread #11 Batch n:146 Range [56713..58211] Sum:8388956
Thread #58 Batch n:146 Range [58217..59771] Sum:8617316
Thread #30 Batch n:146 Range [55163..56711] Sum:8169020
Thread #60 Batch n:146 Range [61519..63197] Sum:9100594
Thread #34 Batch n:146 Range [59779..61511] Sum:8856806
Thread #54 Batch n:146 Range [63199..64849] Sum:9339328
Thread #11 Batch n:146 Range [64853..66457] Sum:9580694
Thread #58 Batch n:146 Range [66463..67979] Sum:9816826
Thread #30 Batch n:146 Range [67987..69779] Sum:10057662
Thread #54 Batch n:146 Range [72931..74573] Sum:10770902
Thread #34 Batch n:146 Range [71347..72923] Sum:10529702
Thread #60 Batch n:146 Range [69809..71341] Sum:10304156
Thread #11 Batch n:146 Range [74587..76231] Sum:11008056
Thread #58 Batch n:146 Range [76243..77801] Sum:11251048
Thread #30 Batch n:146 Range [77813..79561] Sum:11491034
Thread #34 Batch n:146 Range [81119..82729] Sum:11963076
Thread #60 Batch n:146 Range [82757..84449] Sum:12207776
Thread #58 Batch n:146 Range [86183..87767] Sum:12700772
Thread #54 Batch n:146 Range [79579..81101] Sum:11732042
Thread #11 Batch n:146 Range [84457..86179] Sum:12455242
Thread #30 Batch n:146 Range [87793..89527] Sum:12951322
Thread #34 Batch n:146 Range [89533..91153] Sum:13187046
Thread #54 Batch n:146 Range [94441..96013] Sum:13904802
Thread #30 Batch n:146 Range [97829..99487] Sum:14403556
Thread #58 Batch n:146 Range [92779..94439] Sum:13665032
Thread #60 Batch n:146 Range [91159..92767] Sum:13431876
Thread #11 Batch n:146 Range [96017..97813] Sum:14148718
Thread #34 Batch n:46 Range [99497..99991] Sum:4588078
¹ Which you could print by uncommenting that line. Note that Boost 1.70 doesn't have the streaming implemented, b1.72 has a bug regarding boost::process::async_pipe, so you need 1.73 to actually print the headers like that.

- 374,641
- 47
- 450
- 633
-
1Thanks a lot. I've just encountered another trouble which is about https (boost::asio::ip::tcp::iostream seems not to support https). I had no idea to solve it until I saw your answer. – Soha May 15 '20 at 07:32
-
Actually, you are using multi-processing rather than multi-threading which brought me a lot of trouble. Such as: terminate called after throwing an instance of 'boost::process::process_error' what(): CreateProcess failed: No such device or address This application has requested the Runtime to terminate it in an unusual way. Please contact the application's support team for more information. – Soha May 16 '20 at 13:16
-
1I'm not. Just the sample requires extracting the zip. Why does your code require it? It would be highly unlikely I made up your exact use case. Also your error messages suggest you are running on windows so you can't expect the 'zcat' program to exist/run. Which handsomely explains that the program exits with an unhandled exception. – sehe May 17 '20 at 10:54
-
Those are hardly trouble. If you have a concrete question about programming, you can just post a question about that. – sehe May 17 '20 at 10:56
-
My understanding is that the mutex `s_mx` within `handle_batch` is not shared among threads. Why does it help to avoid output intermix? – q0987 Mar 10 '22 at 21:04
-
1@q0987 Very good spot. I think I accidentally dropped a `static` there (probably refactored somewhere). Fixing. – sehe Mar 10 '22 at 21:55
-
1