18

I have a custom type defined in my database as

CREATE TYPE address AS (ip inet, port int);

And a table that uses this type in an array:

CREATE TABLE my_table (
  addresses  address[] NULL
)

I have a sample CSV file with the following contents

{(10.10.10.1,80),(10.10.10.2,443)}
{(10.10.10.3,8080),(10.10.10.4,4040)}

And I use the following code snippet to perform my COPY:

    Class.forName("org.postgresql.Driver");

    String input = loadCsvFromFile();

    Reader reader = new StringReader(input);

    Connection connection = DriverManager.getConnection(
            "jdbc:postgresql://db_host:5432/db_name", "user",
            "password");

    CopyManager copyManager = connection.unwrap(PGConnection.class).getCopyAPI();

    String copyCommand = "COPY my_table (addresses) " + 
                         "FROM STDIN WITH (" + 
                           "DELIMITER '\t', " + 
                           "FORMAT csv, " + 
                           "NULL '\\N', " + 
                           "ESCAPE '\"', " +
                           "QUOTE '\"')";

    copyManager.copyIn(copyCommand, reader);

Executing this program produces the following exception:

Exception in thread "main" org.postgresql.util.PSQLException: ERROR: malformed record literal: "(10.10.10.1"
  Detail: Unexpected end of input.
  Where: COPY only_address, line 1, column addresses: "{(10.10.10.1,80),(10.10.10.2,443)}"
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2422)
    at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1114)
    at org.postgresql.core.v3.QueryExecutorImpl.endCopy(QueryExecutorImpl.java:963)
    at org.postgresql.core.v3.CopyInImpl.endCopy(CopyInImpl.java:43)
    at org.postgresql.copy.CopyManager.copyIn(CopyManager.java:185)
    at org.postgresql.copy.CopyManager.copyIn(CopyManager.java:160)

I have tried with different combinations of the parentheses in the input but cannot seem to get the COPY working. Any ideas where I might be going wrong?

Swaranga Sarma
  • 13,055
  • 19
  • 60
  • 93
  • Is using copy a hard requirement or would using insert be ok even if it is slower? – Gregory Arenius May 31 '18 at 16:24
  • @GregoryArenius Based on our throughput and cost analysis, we do need to support COPY. – Swaranga Sarma Jun 01 '18 at 19:04
  • Have you tried to dump (`pg_dump`) the existing data and check both the CSV syntax and the COPY command PostgreSQL produces itself? – Pavel Horal Jun 03 '18 at 07:35
  • What is the exact shape of the CSV file? `{(10.10.10.1,80),(10.10.10.2,443)} {(10.10.10.3,8080),(10.10.10.4,4040)}` is not CSV : ) – jbet Jun 04 '18 at 12:02
  • @jbet There is a new line ```'\n'``` between the two structs: ```{(10.10.10.1,80),(10.10.10.2,443)}\n{(10.10.10.3,8080),(10.10.10.4,4040)}``` – Swaranga Sarma Jun 05 '18 at 21:45
  • anyway you appear to be defining the delimiter as "tab" when the data looks like comma delimited - delimiter ',' – Slumdog Jun 06 '18 at 23:46
  • @Slumdog Since my CSV schema has only one column in this case, so the file does not have a ```TAB```. The ```,``` is to separate the different fields of the structure and the elements of the array all of which goes into one CSV column. Basically the entire line is just one value – Swaranga Sarma Jun 07 '18 at 03:27
  • Sorry my bad for not reading the problem fully. OK a better suggestion now is to check the format of your data and your table structure. Do not use CSV file for the moment, just do a simple insert of data to prove what works and what doesn't before working with csv, eg. INSERT INTO my_table3 (ip) VALUES ('10.10.10.1,80'); -- fails INSERT INTO my_table3 (ip) VALUES ('10.10.10.1/24'); inserts OK – Slumdog Jun 07 '18 at 22:17
  • If you are expecting your csv to be two rows containing two columns each, then this can be an issue.. See, your struct will be read as col1 = {(10.10.10.1, col2=80), col3 = (10.10.10.2, and col4 = 443)}. Your struct contains a comma to separate, inet and port. – Amal Gupta Jun 08 '18 at 08:16
  • You have only one column, so remove DELIMITER . Your values are not quoted in csv, so remove QUOTE. No QUOTE , so No ESCAPE. Now try COPY my_table (addresses) FROM STDIN WITH ( FORMAT csv, NULL '\\N')" . Please share the results. – Shubham Kadlag Jun 08 '18 at 12:29
  • If still not working after above suggestion, temporarily change the csv to {"(10.10.10.1,80)","(10.10.10.2,443)"} and then try.Please share the results. – Shubham Kadlag Jun 08 '18 at 12:32
  • Consider wrapping the `StringReader` stream with a `TransformingStringReader` that transforms each line after reading from the file into whatever format postgres needs. Since it is a stream transforming line by line just before postgres reads it, it will consume less memory than transforming the whole file. If you created the custom postgres TYPE address, just so that you can use the COPY CSV streaming input feature, then you can replace that in favor of normal 2 columns table. – Zasz Jun 10 '18 at 02:23
  • @Zasz the Postgres type is already there. Was not invented to avoid the line transformation. Else the problem would have been simple. – Swaranga Sarma Jun 10 '18 at 02:57
  • What's wrong with the actual answers to your question, below? – Mikael Gueck Jun 10 '18 at 12:26

3 Answers3

4

See https://git.mikael.io/mikaelhg/pg-object-csv-copy-poc/ for a project with a JUnit test that does what you want.

Basically, you want to be able to use commas for two things: to separate array items, and to separate type fields, but you DON'T want the CSV parsing to interpret commas as field delineators.

So

  1. you want to tell the CSV parser to consider the whole row to be one string, one field, which you can do by enclosing it in single quotes and telling the CSV parser about this, and
  2. you want the PG field parser to consider each array item type instance to be enclosed in a double quote.

Code:

copyManager.copyIn("COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''", reader);

DML example 1:

COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''

CSV example 1:

'{"(10.0.0.1,1)","(10.0.0.2,2)"}'
'{"(10.10.10.1,80)","(10.10.10.2,443)"}'
'{"(10.10.10.3,8080)","(10.10.10.4,4040)"}'

DML example 2, escaping the double quotes:

COPY my_table (addresses) FROM STDIN WITH CSV

CSV example 2, escaping the double quotes:

"{""(10.0.0.1,1)"",""(10.0.0.2,2)""}"
"{""(10.10.10.1,80)"",""(10.10.10.2,443)""}"
"{""(10.10.10.3,8080)"",""(10.10.10.4,4040)""}"

Full JUnit test class:

package io.mikael.poc;

import com.google.common.io.CharStreams;
import org.junit.*;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;
import org.testcontainers.containers.PostgreSQLContainer;

import java.io.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;

import static java.nio.charset.StandardCharsets.UTF_8;

public class CopyTest {

    private Reader reader;

    private Connection connection;

    private CopyManager copyManager;

    private static final String CREATE_TYPE = "CREATE TYPE address AS (ip inet, port int)";

    private static final String CREATE_TABLE = "CREATE TABLE my_table (addresses  address[] NULL)";

    private String loadCsvFromFile(final String fileName) throws IOException {
        try (InputStream is = getClass().getResourceAsStream(fileName)) {
            return CharStreams.toString(new InputStreamReader(is, UTF_8));
        }
    }

    @ClassRule
    public static PostgreSQLContainer db = new PostgreSQLContainer("postgres:10-alpine");

    @BeforeClass
    public static void beforeClass() throws Exception {
        Class.forName("org.postgresql.Driver");
    }

    @Before
    public void before() throws Exception {
        String input = loadCsvFromFile("/data_01.csv");
        reader = new StringReader(input);

        connection = DriverManager.getConnection(db.getJdbcUrl(), db.getUsername(), db.getPassword());
        copyManager = connection.unwrap(PGConnection.class).getCopyAPI();

        connection.setAutoCommit(false);
        connection.beginRequest();

        connection.prepareCall(CREATE_TYPE).execute();
        connection.prepareCall(CREATE_TABLE).execute();
    }

    @After
    public void after() throws Exception {
        connection.rollback();
    }

    @Test
    public void copyTest01() throws Exception {
        copyManager.copyIn("COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''", reader);

        final StringWriter writer = new StringWriter();
        copyManager.copyOut("COPY my_table TO STDOUT WITH CSV", writer);
        System.out.printf("roundtrip:%n%s%n", writer.toString());

        final ResultSet rs = connection.prepareStatement(
                "SELECT array_to_json(array_agg(t)) FROM (SELECT addresses FROM my_table) t")
                .executeQuery();
        rs.next();
        System.out.printf("json:%n%s%n", rs.getString(1));
    }

}

Test output:

roundtrip:
"{""(10.0.0.1,1)"",""(10.0.0.2,2)""}"
"{""(10.10.10.1,80)"",""(10.10.10.2,443)""}"
"{""(10.10.10.3,8080)"",""(10.10.10.4,4040)""}"

json:
[{"addresses":[{"ip":"10.0.0.1","port":1},{"ip":"10.0.0.2","port":2}]},{"addresses":[{"ip":"10.10.10.1","port":80},{"ip":"10.10.10.2","port":443}]},{"addresses":[{"ip":"10.10.10.3","port":8080},{"ip":"10.10.10.4","port":4040}]}]
Mikael Gueck
  • 5,511
  • 1
  • 27
  • 25
1

In CSV format, when you specify a seperator, you can not use it as a character in your data, unless you escape it!

example of a csv file using comma as a separator

a correct record: data1, data2   parse results: [0] => data1 [1] => data2

an incorrect one: data,1, data2 parse results: [0] => data [1] => 1 [2] => data2

finally you do not need to load your file as a csv, but as a simple file, so replace your method loadCsvFromFile(); by

public String loadRecordsFromFile(File file) {
 LineIterator it = FileUtils.lineIterator(file, "UTF-8");
 StringBuilder sb = new StringBuilder();
 try {
   while (it.hasNext()) {
     sb.append(it.nextLine()).append(System.nextLine);
   }
 } 
 finally {
   LineIterator.closeQuietly(iterator);
 }

 return sb.toString();
}

Do not forget to add this dependency in your pom file

<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->

    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.6</version>
    </dependency>

Or to download the JAR from commons.apache.org

Halayem Anis
  • 7,654
  • 2
  • 25
  • 45
0

1NF

First of all, I think your table design is wrong because it isn't 1NF compliant. Every field should only contain atomic attributes, but that's not the case. Why not a table like:

CREATE TABLE my_table (
    id,
    ip inet,
    port int
)

Where id is the number of your line in the source file and ip/port one of the adresses in this line? Sample data:

id | ip         | port
-----------------------
1  | 10.10.10.1 | 80
1  | 10.10.10.2 | 443
2  | 10.10.10.3 | 8080
2  | 10.10.10.4 | 4040
...

Hence, you will be able to query your database on single address (find all the associated adresses, return true if two adresses are on the same line, whatever else you might want...).

Load the data

But let's assume you know what you are doing. The main issue here is that your input data file is in a special format. It might be one single column CSV file, but it would be a very degenerated CSV file. Anyway, you have to transform the lines before you insert them into the database. You have two options:

  1. you read each line of the input file and you make an INSERT (this may take a while);
  2. you convert the input file into a text file with the expected format and use COPY.

Insert one by one

The first options seems easy: for the first row of the csv file, {(10.10.10.1,80),(10.10.10.2,443)}, you have to run the query:

INSERT INTO my_table VALUES (ARRAY[('10.10.10.1',80),('10.10.10.2',443)]::address[], 4)

To do so, you just have to create a new string:

String value = row.replaceAll("\\{", "ARRAY[")
                    .replaceAll("\\}", "]::address[]")
                    .replaceAll("\\(([0-9.]+),", "'$1'");
String sql = String.format("INSERT INTO my_table VALUES (%s)", value);

And execute the query for every line of the input file (or for a better security, use a prepared statement).

Insert with COPY

I will elaborate on the second option. You have to use in Java code:

copyManager.copyIn(sql, from);

Where copy query is a COPY FROM STDIN statement and from is a reader. The statement will be:

COPY my_table (addresses) FROM STDIN WITH (FORMAT text);

To feed the copy manager, you need data like (note the quotes):

{"(10.10.10.1,80)","(10.10.10.2,443)"}
{"(10.10.10.3,8080)","(10.10.10.4,4040)"}

With a temporary file

The simpler way to get the data in the right format is to create a temporary file. You read each line of the input file and replace ( by "( and ) by )". Write this processed line into a temporary file. Then pass a reader on this file to the copy manager.

On the fly

With two threads You can use two threads:

  • thread 1 reads the input file, processes the lines one by one and writes them into a PipedWriter.

  • thread 2 passes a PipedReader connected to the previous PipedWriter to the copy manager.

The main difficulty is to sychronize the threads in such a way that thread 2 starts to read the PipedReader before thread 1 starts to write data into the PipedWriter. See this project of mine for an example.

With a custom reader The from reader could be an instance of something like (naive version):

class DataReader extends Reader {
    PushbackReader csvFileReader;
    private boolean wasParenthese;

    public DataReader(Reader csvFileReader) {
        this.csvFileReader = new PushbackReader(csvFileReader, 1);
        wasParenthese = false;
    }

    @Override
    public void close() throws IOException {
        this.csvFileReader.close();
    }

    @Override
    public int read(char[] cbuf, int off, int len) throws IOException {
        // rely on read()
        for (int i = off; i < off + len; i++) {
            int c = this.read();
            if (c == -1) {
                return i-off > 0 ? i-off : -1;
            }
            cbuf[i] = (char) c;
        }
        return len;
    }

    @Override
    public int read() throws IOException {
        final int c = this.csvFileReader.read();
        if (c == '(' && !this.wasParenthese) {
            this.wasParenthese = true;
            this.csvFileReader.unread('(');
            return '"'; // add " before (
        } else {
            this.wasParenthese = false;
            if (c == ')') {
                this.csvFileReader.unread('"');
                return ')';  // add " after )
            } else {
                return c;
            }
        }
    }
}

(This is a naive version because the right way to do it would be to override only public int read(char[] cbuf, int off, int len). But you should then process the cbuf to add the quotes and store the extra chars pushed to the right: this is a bit tedious). Now, if r is the reader for the file:

{(10.10.10.1,80),(10.10.10.2,443)}
{(10.10.10.3,8080),(10.10.10.4,4040)}

Just use:

Class.forName("org.postgresql.Driver");
Connection connection = DriverManager
        .getConnection("jdbc:postgresql://db_host:5432/db_base", "user", "passwd");

CopyManager copyManager = connection.unwrap(PGConnection.class).getCopyAPI();
copyManager.copyIn("COPY my_table FROM STDIN WITH (FORMAT text)", new DataReader(r));

On bulk loading

If you are loading a huge amount of data, don't forget the basic tips: disable autocommit, remove indexes and constraints, and use TRUNCATE and ANALYZE as follows:

TRUNCATE my_table;
COPY ...;
ANALYZE my_table;

This will speed up the loading.

jferard
  • 7,835
  • 2
  • 22
  • 35