-1

I have a program who read information from database. Sometimes, the message is bigger that expected. So, before send that message to my broker I zip it with this code:

public static byte[] zipBytes(byte[] input) throws IOException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream(input.length);
        OutputStream ej = new DeflaterOutputStream(bos);
        ej.write(input);
        ej.close();
        bos.close();
        return bos.toByteArray();
    }

Recently, i retrieved a 80M message from DB and when execute my code above just OutOfMemory throw on the "ByteArrayOutputStream" line. My java program just have 512 of memory to do all process and cant give it more memory.

How can I solve this?

This is no a duplicate question. I cant increase heap size memory.

EDIT: This is flow of my java code:

rs = stmt.executeQuery("SELECT data FROM tbl_alotofinfo"); //rs is a valid resulset
while(rs.next()){
   byte[] bt = rs.getBytes("data");
   if(bt.length > 60) { //only rows with content > 60. I need to know the size of message, so if I use rs.getBinaryStream I cannot know the size, can I?
      if(bt.length >= 10000000){
         //here I need to zip the bytearray before send it, so
         bt = zipBytes(bt); //this method is above
         // then publish bt to my broker
      } else {
         //here publish byte array to my broker
      }
   }
}

EDIT Ive tried with PipedInputStream and the memory that process consume is same as zipBytes(byte[] input) method.

private InputStream zipInputStream(InputStream in) throws IOException {
        PipedInputStream zipped = new PipedInputStream();
        PipedOutputStream pipe = new PipedOutputStream(zipped);
        new Thread(
                () -> {
                    try(OutputStream zipper = new DeflaterOutputStream(pipe)){
                        IOUtils.copy(in, zipper);
                        zipper.flush();
                    } catch (IOException e) {
                        IOUtils.closeQuietly(zipped);  // close it on error case only
                        
                        e.printStackTrace();
                    } finally {
                        IOUtils.closeQuietly(in);
                        IOUtils.closeQuietly(zipped);
                        IOUtils.closeQuietly(pipe);
                    }
                }
        ).start();
        return zipped;
    }

How can I compress by Deflate my InputStream?

EDIT

That information is sent to JMS in Universal Messaging Server by Software AG. This use a Nirvana client documentation: https://documentation.softwareag.com/onlinehelp/Rohan/num10-2/10-2_UM_webhelp/um-webhelp/Doc/java/classcom_1_1pcbsys_1_1nirvana_1_1client_1_1n_consume_event.html and data is saved in nConsumeEvent objects and the documentation show us only 2 ways to send that information: nConsumeEvent (String tag, byte[] data) nConsumeEvent (String tag, Document adom)

https://documentation.softwareag.com/onlinehelp/Rohan/num10-5/10-5_UM_webhelp/index.html#page/um-webhelp%2Fco-publish_3.html%23

The code for connection is:

nSessionAttributes nsa = new nSessionAttributes("nsp://127.0.0.1:9000");
MyReconnectHandler rhandler = new MyReconnectHandler();
nSession mySession = nSessionFactory.create(nsa, rhandler);
if(!mySession.isConnected()){
   mySession.init();
}
nChannelAttributes chaAtt = new nChannelAttributes();
chaAttr.setName("mychannel"); //This is a topic
nChannel myChannel = mySession.findChannel(chaAtt);
List<nConsumeEvent> messages = new ArrayList<nConsumeEvent>();
rs = stmt.executeQuery("SELECT data FROM tbl_alotofinfo");
while(rs.next){
   byte[] bt = rs.getBytes("data");
   if(bt.length > 60){
      nEventProperties prop = new nEventProperties();
      if(bt.length > 10000000){
         bt = compressData(bt); //here a need compress data without ByteArrayInputStream
         prop.put("isZip", "true");
         nConsumeEvent ncon = new nconsumeEvent("1",bt);
         ncon.setProperties(prop);
         messages.add(ncon);
      } else {
         prop.put("isZip", "false");
         nConsumeEvent ncon = new nconsumeEvent("1",bt);
         ncon.setProperties(prop);
         messages.add(ncon);
      }
}
ntransactionAttributes tatrib = new nTransactionAttributes(myChannel);
nTransaction myTransaction = nTransactionFactory.create(tattrib);
Vector<nConsumeEvent> m = new Vector<nConsumeEvent>(messages);
myTransaction.publish(m);
myTransaction.commit();
} 

Because API exection, to the end of the day I need send the information in byte array, but if this is the only one byte array in my code is OK. How can I compress the byte array or InputStream with rs.getBinaryStream() in this implementation?

EDIT

The database server used is PostgreSQL v11.6

EDIT

I've applied the first one solution of @VGR and works fine.

Only one thing, SELECT query is in a while(true) like:

while(true){
   rs = stmt.executeQuery("SELECT data FROM tbl_alotofinfo"); //rs is a valid resulset
// all that implementation you know for this entire post
   
   Thread.sleep(10000);
}

So, a SELECT is execute each 3 second. But I've done a test with my program running and the memory just increase in each process. Why? If the information that database return is same in each request, should not the memory keep like first request? Or maybe I forgot close a stream?

while(true){
rs = stmt.executeQuery("SELECT data FROM tbl_alotofinfo"); //rs is a valid resulset
 while(rs.next()) {
                        
                         //byte[] bt = rs.getBytes("data");
                         byte[] bt;
                         try (BufferedInputStream source = new BufferedInputStream(
                                    rs.getBinaryStream("data"), 10_000_001)) {

                                    source.mark(this.zip+1);

                                    boolean sendToBroker = true;
                                    boolean needCompression = true;
                                    for (int i = 0; i <= 10_000_000; i++) {
                                        if (source.read() < 0) {
                                            sendToBroker = (i > 60);
                                            needCompression = (i >= this.zip);
                                            break;
                                        }
                                    }

                                    if (sendToBroker) {
                                        nEventProperties prop = new nEventProperties();
                                        // Rewind stream
                                        source.reset();

                                        if (needCompression) {
                                            System.out.println("Tamaño del mensaje mayor al esperado. Comprimiendo mensaje");
                                            ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
                                            try (OutputStream brokerStream = new DeflaterOutputStream(byteStream)) {
                                                IOUtils.copy(source, brokerStream);
                                            }
                                            bt = byteStream.toByteArray();
                                            
                                            prop.put("zip", "true");
                                        } else {
                                            bt = IOUtils.toByteArray(source);
                                        }
                                        System.out.println("size: "+bt.length);
                                        prop.put("host", this.host);
                                        nConsumeEvent ncon = new nConsumeEvent(""+rs.getInt("xid"), bt);
                                        ncon.setProperties(prop);
                                        messages.add(ncon);
                                    }
                                }
   }
}

For example, this is the heap memory in two times. First one memory use above 500MB and second one (with the same information of database) used above 1000MB enter image description here

  • Don’t use byte arrays. InputStreams and OutputStreams exist precisely to address this problem, by handling data in small amounts. The best way to read large data items will depend on how you are reading from your database; for instance, if you’re using JDBC, you’ll want to use [ResultSet.getBinaryStream](https://docs.oracle.com/en/java/javase/17/docs/api/java.sql/java/sql/ResultSet.html#getBinaryStream(int)). – VGR Dec 29 '21 at 02:31
  • @VGR Way JDBC retrieve info is OK. Problem is when I try to zip with code above. I'll try with OutputStream instead. Thank you. – Jesús Alberto Carrillo García Dec 29 '21 at 15:12
  • If `input` is an 80 megabyte array, `new ByteArrayOutputStream(input.length)` will take up *another 80 megabytes* of memory. And then `bos.toByteArray()` will create another brand new byte array, so if deflation reduced the size by, say, 50%, `bos.toByteArray()` takes up another 40 megabytes of memory. This is why you want to avoid byte arrays and you want to avoid ByteArrayOutputStream. – VGR Dec 29 '21 at 15:29
  • @JesúsAlbertoCarrilloGarcía Please [edit] your question to include the remaining source code you have as a [mcve]. It might be possible that you don't need a method like `zipBytes` to work on `byte` arrays but instead on working on `InputStream`/`OutputStream` instances directly. But that depends on what you are trying to do outside of this method. – Progman Dec 29 '21 at 15:36
  • @VGR I understand. All examples I found about this use ByteArrayInputStream and ByteArrayOutputStream, I dont know how to do that without use them. Can you help me? Ive edit the post with flow of my java. – Jesús Alberto Carrillo García Dec 29 '21 at 17:02
  • @Progman Ive edit post with the flow – Jesús Alberto Carrillo García Dec 29 '21 at 17:03
  • How do you publish to your broker, does it offer a way to publish using an output stream (or have it consume an input stream)? – Mark Rotteveel Dec 29 '21 at 17:43
  • Yes it does. Anyway, by red issues, I need to send a compress message. Its possible to comprees a inputstream? Or a inputstream object is a minimal size objecto already? – Jesús Alberto Carrillo García Dec 29 '21 at 18:06
  • @JesúsAlbertoCarrilloGarcía Please [edit] your question to include the source code how you send the message. Also explain how you detect on sender and receiver side if the message was compressed or not. The method `getBinaryStream()` is the right direction and you can combine it with the `Deflate*Stream()` classes. That way (at least in theory) you don't read all the data in huge byte arrays, but instead read and transform the data via a stream of bytes. Please add a MCVE to your question. – Progman Dec 29 '21 at 20:24
  • Ive edit the post. Ive tried with a method who receive a InputStream (ResultSet.getBinaryStream) but the memory consume is same as zipBytes() method – Jesús Alberto Carrillo García Dec 29 '21 at 22:03
  • 1
    @JesúsAlbertoCarrilloGarcía Please [edit] your question to include the actual database system you are using. As an example, the JDBC driver for MySQL uses `ByteArrayInputStream` internally for the `getBinaryStream()` and `getBlob()` methods, which makes them difficult to use in your case. Also add/emphasize the fact, that the target API does **not** support `InputStream` instances for sending the data (unlike you said in https://stackoverflow.com/questions/70514183/java-heap-outofmemory-on-bytearrayoutputstream-for-deflater-zip-bytes?noredirect=1#comment124663558_70514183 it does) – Progman Dec 30 '21 at 13:14
  • Ok, the message is a JMS Topic and in consumer I can use a InputStream object, so I think is supported by publisher but is not. My fault, sorry. – Jesús Alberto Carrillo García Dec 30 '21 at 16:07

1 Answers1

1

rs.getBytes("data") reads the entire 80 megabytes into memory at once. In general, if you are reading a potentially large amount of data, you don’t want to try to keep it all in memory.

The solution is to use getBinaryStream instead.

Since you need to know whether the total size is larger than 10,000,000 bytes, you have two choices:

  • Use a BufferedInputStream with a buffer of at least that size, which will allow you to use mark and reset in order to “rewind” the InputStream.
  • Read the data size as part of your query. You may be able to do this by using a Blob or using a function like LENGTH.

The first approach will use up 10 megabytes of program memory for the buffer, but that’s better than hundreds of megabytes:

while (rs.next()) {
    try (BufferedInputStream source = new BufferedInputStream(
        rs.getBinaryStream("data"), 10_000_001)) {

        source.mark(10_000_001);

        boolean sendToBroker = true;
        boolean needCompression = true;
        for (int i = 0; i <= 10_000_000; i++) {
            if (source.read() < 0) {
                sendToBroker = (i > 60);
                needCompression = (i >= 10_000_000);
                break;
            }
        }

        if (sendToBroker) {
            // Rewind stream
            source.reset();

            if (needCompression) {
                try (OutputStream brokerStream =
                    new DeflaterOutputStream(broker.getOutputStream())) {

                    source.transferTo(brokerStream);
                }
            } else {
                try (OutputStream brokerStream =
                    broker.getOutputStream())) {

                    source.transferTo(brokerStream);
                }
            }
        }
    }
}

Notice that no byte arrays and no ByteArrayOutputStreams are used. The actual data is not kept in memory, except for the 10 megabyte buffer.

The second approach is shorter, but I’m not sure how portable it is across databases:

while (rs.next()) {
    Blob data = rs.getBlob("data");
    long length = data.length();
    if (length > 60) {
        try (InputStream source = data.getBinaryStream()) {
            if (length >= 10000000) {
                try (OutputStream brokerStream =
                    new DeflaterOutputStream(broker.getOutputStream())) {

                    source.transferTo(brokerStream);
                }
            } else {
                try (OutputStream brokerStream =
                    broker.getOutputStream())) {

                    source.transferTo(brokerStream);
                }
            }
        }
    }
}

Both approaches assume there is some API available for your “broker” which allows the data to be written to an OutputStream. I’ve assumed, for the sake of example, that it’s broker.getOutputStream().

Update

It appears you are required to create nConsumeEvent objects, and that class only allows its data to be specified as a byte array in its constructors.

That byte array is unavoidable, obviously. And since there is no way to know the exact number of bytes a compressed version will require, a ByteArrayOutputStream is also unavoidable. (It’s possible to avoid using that class, but the replacement would be essentially a reimplementation of ByteArrayOutputStream.)

But you can still read the data as an InputStream in order to reduce your memory usage. And when you aren’t compressing, you can still avoid ByteArrayOutputStream, thereby creating only one additional byte array.

So, instead of this, which is not possible for nConsumeEvent:

if (needCompression) {
    try (OutputStream brokerStream =
        new DeflaterOutputStream(broker.getOutputStream())) {

        source.transferTo(brokerStream);
    }
} else {
    try (OutputStream brokerStream =
        broker.getOutputStream())) {

        source.transferTo(brokerStream);
    }
}

It should be:

byte[] bt;
if (needCompression) {
    ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
    try (OutputStream brokerStream = new DeflaterOutputStream(byteStream)) {
        source.transferTo(byteStream);
    }
    bt = byteStream.toByteArray();
} else {
    bt = source.readAllBytes();
}

prop.put("isZip", String.valueOf(needCompression));
nConsumeEvent ncon = new nconsumeEvent("1", bt);
ncon.setProperties(prop);
messages.add(ncon);

Similarly, the second example should replace this:

if (length >= 10000000) {
    try (OutputStream brokerStream =
        new DeflaterOutputStream(broker.getOutputStream())) {

        source.transferTo(brokerStream);
    }
} else {
    try (OutputStream brokerStream =
        broker.getOutputStream())) {

        source.transferTo(brokerStream);
    }
}

with this:

byte[] bt;
if (length >= 10000000) {
    ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
    try (OutputStream brokerStream = new DeflaterOutputStream(byteStream)) {
        source.transferTo(byteStream);
    }
    bt = byteStream.toByteArray();
    prop.put("isZip", "true");
} else {
    bt = source.readAllBytes();
    prop.put("isZip", "false");
}

nConsumeEvent ncon = new nconsumeEvent("1", bt);
ncon.setProperties(prop);
messages.add(ncon);
VGR
  • 40,506
  • 4
  • 48
  • 63
  • Hi, @VGR thank you for your reply. Ive seen you code but Im afraid the broker.getOutputStream() code is not available for my API. Ive edited the post with the connection and way to send information to my messaging server. For that implementation, which is the best way to compress info and then send the byte array? – Jesús Alberto Carrillo García Dec 30 '21 at 00:55
  • H, @VGR, thank you so much, the first one solution works fine, second one throws org.postgresql.util.PSQLException: Bad value for type long exception. Anyways just something else, memory heap increase more each call to database, if the information for database is same twice, why memory increase? Should not consume the same memory? Ive edited the post. – Jesús Alberto Carrillo García Dec 30 '21 at 18:37