Its weird nobody sought to solve this problem even 4.9 years later. Here's how I did it using ZMQ. I used PUSH/PULL architecture to send image (.ppm
) file from a Python3 client (PUSH) to C++ server (PULL).
The process requires manually obtaining the length/size of the image-encoded-into-base64-string, printed in my python program. This size is then used to determine the buffer size required by the C++ server before it can receive the data.
client.py
import time
import zmq
import base64
def producer():
# setup socket
context = zmq.Context()
zmq_socket = context.socket(zmq.PUSH)
zmq_socket.bind("tcp://127.0.0.1:5557")
# Read file content
f = open("/home/virus/Desktop/optimisation/test.ppm",'rb')
bytes = bytearray(f.read())
# Encode to send
strng = base64.b64encode(bytes)
print("Sending file over")
print("\n\nEncoded message size: ",len(strng)) # 4194328 in my case
zmq_socket.send(strng)
f.close()
producer()
server.cpp
#include <zmq.h>
#include <vector>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <fstream> // for writing files
#include <iostream>
typedef unsigned char uchar;
using namespace std;
static string base64_decode(const string &in) {
// Ref: https://stackoverflow.com/a/34571089/9625777
string out;
vector<int> T(256,-1);
for (int i=0; i<64; i++)
T["ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"[i]] = i;
int val=0, valb=-8;
for (uchar c : in) {
if (T[c] == -1) break;
val = (val << 6) + T[c];
valb += 6;
if (valb >= 0) {
out.push_back(char((val>>valb)&0xFF));
valb -= 8;
}
}
return out;
}
int main ()
{
while(1)
{
// Create socket
void *context = zmq_ctx_new ()
void *requester = zmq_socket (context, ZMQ_PULL)
if(zmq_connect (requester, "tcp://localhost:5557")!=0)
{
// The zmq_connect() function returns zero if successful.
// Otherwise it returns -1
cout << "No connection found\n";
continue;
}
cout << "Got a connectopn\n";
// Receive data
char buffer [4194328];
zmq_recv (requester, buffer, 4194328, 0)
//cout << buffer;
// decode data
auto decodedData = base64_decode(buffer);
// write results to a file
ofstream myFile("res.ppm");
myFile << decodedData;
myFile.close();
// Close socket
zmq_close (requester);
zmq_ctx_destroy (context);
}
return 0;
}
As an option, PUSH/PULL
can also be substituted with REQ/REP
client.py
import time
import zmq
import base64
def producer():
context = zmq.Context()
zmq_socket = context.socket(zmq.REQ)
zmq_socket.connect("tcp://127.0.0.1:5556")
# Start your result manager and workers before you start your producers
f = open("/home/virus/Desktop/optimisation/test.ppm",'rb')
#data = f.read
bytes = bytearray(f.read());#print('\n \t bytes: ',bytes)
print("bytes len",len(bytes))
strng = base64.b64encode(bytes)
#serializedImage = { 'data' : strng }
#print("sending string: ",strng);
print("\n\n size: ",len(strng));
#request = zmq_socket.recv();
zmq_socket.send(strng)
print("sent\n")
result = zmq_socket.recv();
m = memoryview(result)
print("\n\nreceived: ",len(result))
f.close()
zmq_socket.close()
producer()
server.cpp
#include <zmq.h>
#include <assert.h>
#include <vector>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <fstream> // for writing files
#include <iostream>
typedef unsigned char uchar;
using namespace std;
vector<pair<int,int>> arr;
static std::string base64_encode(const std::string &in) {
std::string out;
int val = 0, valb = -6;
for (uchar c : in) {
val = (val << 8) + c;
valb += 8;
while (valb >= 0) {
out.push_back("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"[(val>>valb)&0x3F]);
valb -= 6;
}
}
if (valb>-6) out.push_back("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"[((val<<8)>>(valb+8))&0x3F]);
while (out.size()%4) out.push_back('=');
return out;
}
static string base64_decode(const string &in) {
// Ref: https://stackoverflow.com/a/34571089/9625777
string out;
vector<int> T(256,-1);
for (int i=0; i<64; i++)
T["ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"[i]] = i;
int val=0, valb=-8;
for (uchar c : in) {
if (T[c] == -1) break;
val = (val << 6) + T[c];
valb += 6;
if (valb >= 0) {
out.push_back(char((val>>valb)&0xFF));
valb -= 8;
}
}
return out;
}
int main ()
{
while(1)
{
// Create socket
void *context = zmq_ctx_new ();
void *responder = zmq_socket (context, ZMQ_REP);
int rc = zmq_bind(responder, "tcp://*:5556");
assert (rc == 0);
//{
// The zmq_connect() function returns zero if successful.
// Otherwise it returns -1
//}
cout << "Got a connectopn\n";
// Send Request
//int rc = zmq_send(responder, "ABC", 3,ZMQ_DONTWAIT);
//assert(rc != -1);
// Receive data/Request
char buffer [4194328];
auto nbytes = zmq_recv (responder, buffer, 4194328, 0);assert (nbytes != -1);
cout << "Got a request";
// Generate data/results/ dummy data rn
/*for(int i=0;i<10;i++){
for(int j=0;j<10;j++){
arr.push_back(i);
}
}*/
int a[2];a[0]=3;a[1]=5;
// Send Result/Response
rc = zmq_send(responder, a, sizeof(a),0);
assert(rc != -1);
// decode data
auto decodedData = base64_decode(buffer);
// write results to a file
ofstream myFile("res.ppm");
myFile << decodedData;
myFile.close();
// Close socket
zmq_close (responder);
zmq_ctx_destroy (context);
}
return 0;
}
Run as
gcc -Wall -g server.cpp -lzmq -lstdc++ -o s
Hope it helps .