I am trying to run a Kraken websocket client for two symbols, AUD/USD and AUD/JPY, in separate IOContexts in separate threads, using C++ and the Boost.Beast library. I have 6 cores available and want to run each symbol in a separate thread. However, when I run the code, the program terminates immediately without any errors and does not print any data to the console, which should be defined inside the message handler.
http://coliru.stacked-crooked.com/a/51f248c085656de7
kraken_config.json
{
"AUD/USD": 0,
"AUD/JPY": 1
}
The overall idea of these two functions is to create a multi-threaded architecture that allows for running multiple WebSocket subscriptions for a large number of symbols in separate threads, while limiting the number of threads to the number of available cores :
void run_event_loop(const std::vector<std::string>& symbols, net::io_context& ioc)
{
ssl::context ctx{ssl::context::tlsv12_client};
ctx.set_verify_mode(ssl::verify_peer);
ctx.set_default_verify_paths();
for (const auto& symbol : symbols) {
std::cout << symbol << std::endl;
auto krakenws = std::make_shared<krakenWS>(ioc.get_executor(), ctx);
krakenws->subscribe_orderbook(symbol, 10);
}
ioc.run(); // this will block until all asynchronous operations have completed
}
void run_threads_in_cores(){
const std::size_t num_cores = std::thread::hardware_concurrency();
std::vector<std::string> symbols;
std::map<std::string, int> partition_map = load_symbols_partition_map();
for (const auto& pair : partition_map) {
symbols.push_back(pair.first);
}
std::vector<std::thread> threads;
// partition symbols into groups based on the number of available cores
std::vector<std::vector<std::string>> symbol_groups(num_cores);
std::size_t i = 0;
for (const auto& symbol : symbols) {
symbol_groups[i++ % num_cores].push_back(symbol);
}
for (const auto& symbol_group : symbol_groups) {
if(symbol_group.empty()){ // if symbols is less than number of cores you dont need to start the thread
continue;
}
net::io_context ioc;
threads.emplace_back([&symbol_group, &ioc]() { run_event_loop(symbol_group, ioc); });
}
std::for_each(threads.begin(), threads.end(), [](std::thread& t) { t.join(); });
}
But when i call run_threads_in_core() it launches two threads for run_event_loop() for each symbol (in this case we have two symbols and num_cores = 6), since we have more cores than number of symbols we can create thread for each symbol. ioc.run() seems not to run inside the run_event_loop() function because the whole program returns immediately with symbols being printed to the console.
But when I manually implement the each symbols websocket io_context in seperate thread it runs fine :
http://coliru.stacked-crooked.com/a/2cc33f6037a3b01f Output :
using host_: ws.kraken.com
using host_: ws.kraken.com
Sending : {"event":"subscribe","pair":["AUD/USD"],"subscription":{"depth":10,"name":"book"}}
Sending : {"event":"subscribe","pair":["AUD/JPY"],"subscription":{"depth":10,"name":"book"}}
Kraken Orderbook snapshot : {"channelID":176,"channelName":"book-10","event":"subscriptionStatus","pair":"AUD/USD","status":"subscribed","subscription":{"depth":10,"name":"book"}}
Kraken Orderbook snapshot : {"channelID":144,"channelName":"book-10","event":"subscriptionStatus","pair":"AUD/JPY","status":"subscribed","subscription":{"depth":10,"name":"book"}}
I tried to sleep for 1 second before ioc.run() http://coliru.stacked-crooked.com/a/c6e0d562ab156154
Output :
AUD/JPY
AUD/USD
using host_: ws.kraken.com
using host_: ws.kraken.com
Sending : {"event":"subscribe","pair":["AUD/JPY"],"subscription":{"depth":10,"name":"book"}}
read: Operation canceled
Sending : {"event":"subscribe","pair":["AUD/USD"],"subscription":{"depth":10,"name":"book"}}
Segmentation fault (core dumped)
I suspect that the issue may be related to how I defined those two functions (run_threads_in_core,run_event_loop), but I am not sure. Can someone please help me identify the issue and suggest a solution? Thank you.