0

I use UDP socket to stream image from client to server. For encoding and decoding I use OpenCV. Sometimes I get a wrong decoded image, because one or some packet is lossed (only the header is sent, please see my terminal screen for some info). I have to reduce the quality of jpeg to only 30 to decrease wrong decode image ratio. How can I use condition code to ignore that frame (not do decode work) in which some packet is lossed , or not show the wrong decoded image in imshow function.

Here's the wrong decoded image:

sample

Terminal trace screen:

image

My Client code:

#include "PracticalSocket.h"
#include <iostream>
#include <cstdlib>

#include "cv.hpp"
#include "config.h"

#include "logger.h" // For trace
using namespace ModernCppCI;

using namespace cv;
using namespace std;

int main(int argc, char * argv[]) {

Logger log{__func__};

if ((argc < 4) || (argc > 4)) { // Test for correct number of arguments
    log.error("Usage: {} <Server> <Server Port>\n <RTSP link>", argv[0]);
    exit(1);
}

string servAddress = argv[1]; // First arg: server address
unsigned short servPort = Socket::resolveService(argv[2], "udp");

try {
    UDPSocket sock;
    int jpegqual =  ENCODE_QUALITY; // It's 30

    Mat frame, send;
    vector < uchar > encoded;
    //VideoCapture cap("rtsp://admin:centic.vn@10.49.34.234/Streaming/Channels/1?tcp"); // Grab the camera
    VideoCapture cap(argv[3]);
    if (!cap.isOpened()) {
        log.error("OpenCV failed to open camera");
        exit(1);
    }

    clock_t last_cycle = clock();
    unsigned char pressed_key;
    while (1) {
        vector < int > compression_params;
        cap >> send;
        if(send.empty())continue;
        // JPEG encoding
        compression_params.push_back(CV_IMWRITE_JPEG_QUALITY);
        compression_params.push_back(jpegqual);
        imencode(".jpg", send, encoded, compression_params);


        imshow("send", send);

        int total_pack = 1 + (encoded.size() - 1) / PACK_SIZE; // PACK_SIZE is 4096

        int ibuf[1];
        ibuf[0] = total_pack;
        sock.sendTo(ibuf, sizeof(int), servAddress, servPort);

        for (int i = 0; i < total_pack; i++)
            sock.sendTo( & encoded[i * PACK_SIZE], PACK_SIZE, servAddress, servPort);

        pressed_key = waitKey(1);

        if(pressed_key == ' ')
            pressed_key = waitKey(0);

        if(pressed_key == 'q')
            break;
        clock_t next_cycle = clock();
        double duration = (next_cycle - last_cycle) / (double) CLOCKS_PER_SEC;
        log.info(" FPS: {}, kbps: {}, Processing time: {}ms" , (1 / duration), (PACK_SIZE * total_pack / duration / 1024 * 8), 1000*duration);

        last_cycle = next_cycle;
    }
    // Destructor closes the socket

} catch (SocketException & e) {
    log.error(e.what());
    exit(1);
}

return 0;
}

Server code

#include "PracticalSocket.h"
#include <iostream>
#include <cstdlib>

#include "cv.hpp"
#include "config.h"
#include "logger.h" // For trace

using namespace ModernCppCI;
using namespace cv;


int main(int argc, char * argv[]) {

Logger log{__func__};

if (argc != 2) { // Test for correct number of parameters
    log.error("Usage: {} <Server Port>", argv[0]);
    exit(1);
}

unsigned short servPort = atoi(argv[1]); // First arg:  Server port

try {
    UDPSocket sock(servPort);

    char buffer[BUF_LEN]; // Buffer for echo string
    int recvMsgSize; // Size of received message
    string sourceAddress; // Address of datagram source
    unsigned short sourcePort; // Port of datagram source

    clock_t last_cycle = clock();
    unsigned char pressed_key;
    while (1) {
        // Block until receive message from a client
        do {
            recvMsgSize = sock.recvFrom(buffer, BUF_LEN, sourceAddress, sourcePort); // BUF_LEN is 65540
        } while (recvMsgSize > sizeof(int));
        int total_pack = ((int * ) buffer)[0];

        log.info("expecting length of packs: {}", total_pack);
        char * longbuf = new char[PACK_SIZE * total_pack];
        for (int i = 0; i < total_pack; i++) {
            recvMsgSize = sock.recvFrom(buffer, BUF_LEN, sourceAddress, sourcePort);
            if (recvMsgSize != PACK_SIZE) {
                log.error("Received unexpected size pack: {}", recvMsgSize);
                continue;
            }
            memcpy( & longbuf[i * PACK_SIZE], buffer, PACK_SIZE); // Copy PACK_SIZE bytes from buffer to longbuf
        }

        log.info("Received packet from {}:{}", sourceAddress, sourcePort);
        Logger::level(LogLevel::trace);
        log.trace("longbuf size: {}", ((int * ) &longbuf)[0]);

        Mat rawData = Mat(1, PACK_SIZE * total_pack, CV_8UC1, longbuf);
        Mat frame = imdecode(rawData, CV_LOAD_IMAGE_COLOR);
        if (frame.empty()) {
            log.error("Decode failure!");
            continue;
        }
        imshow("recv", frame);
        pressed_key = waitKey(1);

        if(pressed_key == ' ')
            pressed_key = waitKey(0);

        if(pressed_key == 'q')
            break;

        free(longbuf);

        clock_t next_cycle = clock();
        double duration = (next_cycle - last_cycle) / (double) CLOCKS_PER_SEC;
        log.info(" FPS: {} , kbps: {} , Processing time: {}", (1 / duration), (PACK_SIZE * total_pack / duration / 1024 * 8), (next_cycle - last_cycle));

        last_cycle = next_cycle;
    }
} catch (SocketException & e) {
    log.error(e.what());
    exit(1);
  }

return 0;
}
Remy Lebeau
  • 555,201
  • 31
  • 458
  • 770
  • I am trying to understand your question. When you detect an unexpected size packet you want to skip the whole frame; basically at the point you check for a decode failure and continue, you would want to check whether the frame had an unexpected size packet and also continue? – Basya Aug 03 '17 at 07:41

2 Answers2

0

If I get your problem correctly, your protocol is:

  1. Send header (ibuf), which contains number N of expected data packets.
  2. Send N data packets

and what happens on server side is:

  1. Receive header
  2. Receive N-1 data packets (one is lost)
  3. Receive next header as data packet and throw away current frame.
  4. Wait whole frame for new header, thus 2 frames are lost.

The thing you miss here is distinguishment between header and data packet. The easiest way that you already use is checking size of packet. Knowing that you can determine what to do with current packet - is it beggining of new frame (so previous is gone) or new data. With that you can start reading new frame and lose only when packet got lost.

This snippet shows an idea of it:

    int total_pack = 0;
    int counter = 0;
    char * longbuf = nullptr;
    while (1) {
        recvMsgSize = sock.recvFrom(buffer, BUF_LEN, sourceAddress, sourcePort); // BUF_LEN is 65540

        if (recvMsgSize == sizeof(int)) { // header
            total_pack = ((int *)buffer)[0];
            counter = 0; // reset frame counter
            log.info("expecting length of packs: {}", total_pack);
            if (longbuf) delete[] longbuf;
            longbuf = new char[PACK_SIZE * total_pack];
        }
        else if (recvMsgSize == PACK_SIZE){ // if we know size of incoming frame
            if (total_pack > 0) { // skip it if we dont know header yet
                memcpy(&longbuf[counter * PACK_SIZE], buffer, PACK_SIZE); // Copy PACK_SIZE bytes from buffer to longbuf
                counter++;
                if (counter == total_pack) {
                    total_pack = 0; // clear header
                    break; // whole frame received
                }
            }
        }
        else
            log.error("Received unexpected size pack: {}", recvMsgSize);
    }

Also, data packet should contain small header with it's position in whole buffer (frame number would be useful too), since UDP packets are not said to be received in order of sending. (They may shift).

You shouldn't mix C++ and C allocations (new and free) Can you mix free and constructor in C++?

R2RT
  • 2,061
  • 15
  • 25
0

I am trying to understand your question. When you detect an unexpected size packet you want to skip the whole frame; basically at the point you check for a decode failure and continue, you would want to check whether the frame had an unexpected size packet and also continue? Or a bit before, skipping the decoding attempt as well....

If that is what you are trying to do, you can do something like this:

1.Add a flag at the level of the while loop:

while (1) {
bool goodFrame = true; // start out optimistic!

// Block until receive message from a client

2.Change the flag when you detect the bad packet:

        if (recvMsgSize != PACK_SIZE) {
            log.error("Received unexpected size pack: {}", recvMsgSize);
            goodFrame = false;
            continue;
        }

3.Check the flag and skip the decoding and use of the frame:

    log.trace("longbuf size: {}", ((int * ) &longbuf)[0]);

    if (!goodFrame) {
        // you probably do not need to log an error, as you did it above when you detected the bad packet.  
        continue;
    }

    Mat rawData = Mat(1, PACK_SIZE * total_pack, CV_8UC1, longbuf);

4.You may also want to skip copying the rest of the packets in this frame, since the frame will not be used anyway:

        if (goodFrame)
            memcpy( & longbuf[i * PACK_SIZE], buffer, PACK_SIZE); // Copy PACK_SIZE bytes from buffer to longbuf

So the resultant complete while loop will look like this:

while (1) {
    bool goodFrame = true; // start out optimistic!

    // Block until receive message from a client
    do {
        recvMsgSize = sock.recvFrom(buffer, BUF_LEN, sourceAddress, sourcePort); // BUF_LEN is 65540
    } while (recvMsgSize > sizeof(int));
    int total_pack = ((int * ) buffer)[0];

    log.info("expecting length of packs: {}", total_pack);
    char * longbuf = new char[PACK_SIZE * total_pack];
    for (int i = 0; i < total_pack; i++) {
        recvMsgSize = sock.recvFrom(buffer, BUF_LEN, sourceAddress, sourcePort);
        if (recvMsgSize != PACK_SIZE) {
            log.error("Received unexpected size pack: {}", recvMsgSize);
            goodFrame = false;
            continue;
        }
    if (goodFrame)
            memcpy( & longbuf[i * PACK_SIZE], buffer, PACK_SIZE); // Copy PACK_SIZE bytes from buffer to longbuf
    }

    log.info("Received packet from {}:{}", sourceAddress, sourcePort);
    Logger::level(LogLevel::trace);
    log.trace("longbuf size: {}", ((int * ) &longbuf)[0]);

    if (!goodFrame) {
        // you probably do not need to log an error, as you did it above when you detected the bad packet.  
        continue;
    }

    Mat rawData = Mat(1, PACK_SIZE * total_pack, CV_8UC1, longbuf);
    Mat frame = imdecode(rawData, CV_LOAD_IMAGE_COLOR);
    if (frame.empty()) {
        log.error("Decode failure!");
        continue;
    }
    imshow("recv", frame);
    pressed_key = waitKey(1);

    if(pressed_key == ' ')
        pressed_key = waitKey(0);

    if(pressed_key == 'q')
        break;

    free(longbuf);

    clock_t next_cycle = clock();
    double duration = (next_cycle - last_cycle) / (double) CLOCKS_PER_SEC;
    log.info(" FPS: {} , kbps: {} , Processing time: {}", (1 / duration), (PACK_SIZE * total_pack / duration / 1024 * 8), (next_cycle - last_cycle));

    last_cycle = next_cycle;
}

If I misunderstood your question, please clarify what you are asking and I will hopefully be able to help more.

Basya
  • 1,477
  • 1
  • 12
  • 22
  • Thanks all for your help – ngân phạm Aug 04 '17 at 01:22
  • You are very welcome. Did the help solve your problem, or do you need more help? If one of our answers solved your problem, or helped to solve your problem, please upvote and/or mark it as a solution. If we have not yet answered your question, please explain what help you still need, what isn't working, etc. and we will try to help you more. – Basya Aug 04 '17 at 10:40