2

I am trying to write a tool to load CSV files into multiple database. While trying to search online how to use COPY command with Snowflake, I couldn't find information how to do it in Java. This is how I do with PostgreSQL

public void loadData(Message message) throws Exception {
    try (Connection connection = DriverManager.getConnection(message.getJdbcUrl(),
            message.getUser(), message.password)) {
        loadDataWithMode(loadRequest, (BaseConnection) connection);
    } catch (Throwable error){
        throw error;   
    }
}


public void loadDataWithMode(Message message, BaseConnection connection) throws Exception {
    CopyManager copyManager = new CopyManager(connection);
    String fields = message.getFields();
    final String targetTableWithFields = message.getTableName() + "(" + fields + ")";
        try (InputStream input = fileService.load(loadRequest.getFilePath())) {
            try (InputStreamReader reader = new InputStreamReader(input, "UTF-8")) {
                copyManager.copyIn("COPY " + targetTableWithFields + " from STDIN 
            }
        }
 }

I'm not familiar with Snowflake how to do it, any help will be appreciated.

Mark Rotteveel
  • 100,966
  • 191
  • 140
  • 197
Hard Worker
  • 995
  • 11
  • 33
  • 1
    Here are some code snippets which can be used to build your code : https://stackoverflow.com/questions/68016277/unable-to-execute-snowflake-copy-command-through-java For PUT : https://stackoverflow.com/questions/62117729/unable-to-execute-snowflake-put-command-through-java https://stackoverflow.com/questions/63616368/writing-data-into-snowflake-using-python – Srinath Menon Jul 11 '21 at 14:38
  • 1
    As @SrinathMenon says, this seems to be the answer you want: https://stackoverflow.com/a/62119754/132438 – Felipe Hoffa Jul 12 '21 at 18:26

1 Answers1

4
public void loadDataWithMode(Message message, Connection connection) throws Exception {
        String fields = message.getFields();
        final String targetTableWithFields = message.getTableName() + "(" + fields + ")";
            LOG.info("about to copy data into table: " + targetTableWithFields);
            try (Statement statement = connection.createStatement()) {

            final SnowflakeConnectionV1 snowflakeConnectionV1 = (SnowflakeConnectionV1) connection;
            final File tempFile = fileSystemService.asLocalFile(message.getFilePath());
            try (Statement stmt = snowflakeConnectionV1.createStatement(); InputStream inputStream = new FileInputStream(tempFile)) {
                final String createStage = buildCreateStageStatement();
                LOG.info("Executing sql:{}", createStage);
                stmt.execute(createStage);
                LOG.info("Create stage was successfully executed");
                snowflakeConnectionV1.uploadStream("COPYIN_STAGE", "", inputStream, tempFile.getName(), false);
                LOG.info("Upload stream was successfully executed");
                stmt.execute("USE WAREHOUSE "+ message.getExportConnectionDetails().getWarehouse());
                LOG.info("Warehouse was successfully set to: "+message.getExportConnectionDetails().getWarehouse());
                final boolean purgeData = !(message.getLoadMode() == LoadMode.INCREMENTAL);
                String sql = String.format("copy into %s(%s) from @COPYIN_STAGE/%s file_format = ( type='CSV', skip_header=1) purge=" + purgeData + "   ", message.getTableName(), fields, tempFile.getName());
                LOG.info("Executing sql:{}", sql);
                stmt.execute(sql);
            }
        connection.commit();
        LOG.info("data was successfully copied " + targetTableWithFields);
    }
}

private String buildCreateStageStatement() {
    return "CREATE OR REPLACE TEMPORARY STAGE COPYIN_STAGE " + "file_format = ( type ='CSV')";
}
Hard Worker
  • 995
  • 11
  • 33