5

I have a use-case to move selected data from Postgres to Amazon S3. This should happen in a single step. I am writing a java program to get this done.

I have figured out a way to copy the data in 2 steps. I use the CopyManager Library and the copyOut method to get the data into my local. After this, I'm moving the same file into S3 using Java.

postgres code to get the data into my local

CopyManager copyManager = new CopyManager((BaseConnection) con);
FileWriter fileWriter = new FileWriter("file.csv");
copyManager.copyOut("COPY (SELECT stmt) TO STDOUT WITH DELIMITER '\t' CSV HEADER", fileWriter);

AWS code to move from local to S3

AmazonS3 conn = new AmazonS3Client(credentials);
conn.setEndpoint("xxx.com");
conn.putObject(
            bucket1.getName(),
            "request.json",
            new File("file.csv")
    );

I expect it to happen in one go instead of writing to a file and then moving the file to S3.

Nav_cfc
  • 154
  • 1
  • 4
  • 15
  • If you're open to doing this in python instead of java, I have an example which I think would work (export to CSV in memory, direct upload to S3 without leaving a permanent file on OS). Let me know if you're interested an I'll write an answer with the example and how it works. – Adam Bethke Dec 27 '18 at 19:23
  • @AdamBethke Can it be done by limiting the amount of memory and running a loop to append this buffered data into a file on OS? Otherwise, with the data that we have, we would go into a Memory Exception. – Nav_cfc Jan 07 '19 at 11:31
  • Realized my original comment was a bit off - it's not in memory, but it's a (process managed) temporary file on disk. I posted it as an answer; I use a similar process to transfer 6GB dumps frequently and it doesn't use a significant amount of memory. If this isn't what you're looking for, let me know and I'll delete the answer. – Adam Bethke Jan 08 '19 at 13:05
  • So basically it is doing what I am trying to do. Instead of storing it in an explicit file location, a temporary file is created, data is written into it, and then this same file is moved to the S3 and then the temporary file is deleted. It does not have anything to do with memory then? – Nav_cfc Jan 08 '19 at 16:06
  • Yeah, that's correct. It'll have some memory overhead because it's generating a process, but it should be minimal. – Adam Bethke Jan 08 '19 at 18:26
  • @AdamBethke I tried to run your code. It's very similar to the one I have. The temporary file will just have a random location, and for me, I have specified the location explicitly. So your code is taking almost the same amount of time as mine. Do you have any idea about having it in memory for a buffer size? – Nav_cfc Jan 10 '19 at 08:39
  • Unfortunately, if I'm understanding correctly, I'm not sure how you'd do that. Based on your issues with hitting an out of memory exception, I think you've probably hit the spot where you either need to upgrade to an instance with more memory, to be okay with the speed. or to develop a streaming solution. At a certain point, it's going to take a decent amount of time to transfer a lot of data, and the choice shifts from "can I get it done faster" to "how do I optimize the way / what else we're blocking". – Adam Bethke Jan 10 '19 at 11:32

4 Answers4

0

I've not tried this, but I think you should be able to do it.

Rather than passing a FileWriter to copyOut(), you can pass any OutputStream. You can also provide an InputStream to the putObject method rather than a file.

So you just need to convert your OutputStream to an InputStream, for which there are a number of methods (for example, see this post) or you could use something like the EasyStream library.

James Baker
  • 1,143
  • 17
  • 39
0

You should be using PutObject with InputStream.

You can use PipedOutputStream and PipedInputStream to redirect output to putObject's input

PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream(in);

Use in as an argument to PutObject and start writing to out in another thread.

11thdimension
  • 10,333
  • 4
  • 33
  • 71
  • Can you help me with the code to putobject and pipedinputstream with the 2 threads. I tried to use the pipedstream but had difficulties with the threads when there was huge data. Read and write difficulties as the output stream used to close when the input stream was running. A little snippet of the code would be helpful. – Nav_cfc Dec 21 '18 at 19:10
  • Closing `PipedOutputStrem` should not have an effect on associated `PipedInputStream`, you will have to explicitly call close on both of them. On another note, I don't think even using `InputStream` methods will help, as that's to avoid reading file in memory completely before sending it to S3, however even stream methods in S3 API need `content-length` in `ObjectMetaData` else it reads whole stream in memory. Funny part is to get content length we need to have read entire stream. It just bugs me so much. – 11thdimension Dec 21 '18 at 20:12
  • So using the stream method, we cannot restrict the amount of data In memory? Can't we just write a loop where in we read 'x' amount of data until it's read completely? And at the same time read the 'x' data into the input stream for s3? – Nav_cfc Dec 22 '18 at 05:50
0

Here is my Python code solution for moving all the tables in a DB named dvdrental in PostgreSQL into an S3 bucket.

import boto3
import json
import psycopg2

s3_client = boto3.client('s3')

# database connection setup
connection = psycopg2.connect(
                database="dvdrental",
                user="postgres",
                password="****")
connection.autocommit = True
cursor = connection.cursor()

# Get all the table names from the database
query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 
'public' and table_type = 'BASE TABLE';"
cursor.execute(query)
table_names = cursor.fetchall()
table_names_list = [row[0] for row in table_names]

# Get the contents of each table in json format and move it to the s3 bucket
for name in table_names_list:
  query2 = "SELECT array_to_json(array_agg(row_to_json(n_alias))) from (select * 
  from {}) n_alias;".format(name)
  cursor.execute(query2)
  file_content = cursor.fetchall()
  data = json.dumps(file_content).encode('UTF-8')
  s3_client.put_object(Body=data, Bucket='dvdrentalbucket', 
  Key='{}.json'.format(name))



# closing database connection.
cursor.close()
connection.close()
print("PostgreSQL connection is closed")
bad_coder
  • 11,289
  • 20
  • 44
  • 72
-1

If you're open to doing it in python, here's an example that should work:

import boto
import gzip
import psycopg2
import tempfile

# database connection setup
connection = psycopg2.connect('postgresql://scott:tiger@localhost/mydatabase')
connection.autocommit = True
cursor = connection.cursor()

# aws connection setup
s3_connection = boto.connect_s3('<aws access key>', '<aws secret key>')
bucket = s3_connection.get_bucket('<bucket>')

with tempfile.NamedTemporaryFile() as t:
    with gzip.GzipFile(t.name, mode='wb') as g:
        cursor.copy_expert("COPY ({0}) TO STDOUT WITH CSV HEADER".format('<select_query>'), g)
    key = boto.s3.key.Key(bucket, '<s3_key>')
    key.set_contents_from_filename(g.name)

This process makes use of the tempfile module in python, which allows you to create a file that gets is used and then removed within the process. The context manager (with tempfile...) simplifies the management of the file writing process, so you don't have to delete it manually. Depending on how you set up the tempfile, you can make the file accessible to, or never visible to, system users. Basically, you're streaming the SELECT statement to the STDOUT, and then writing the STDOUT to a tempfile. You're still beholden to your database for the SELECT statement in terms of memory management, speed and access.

The benefit is that you don't need to keep the entire file in memory while trying to transfer it to S3; the drawbacks are that you need enough disk space to temporarily store the file, and that it's obviously slower because you're writing to disk as opposed to doing the entire thing in memory.

The other thing to note is that I kept in the step where python is compressing the file using gzip before uploading. I did that to save space on upload; this is especially useful if you're uploading a table with a lot of repetitive data.

As an aside: you should not use this as-is in an environment where you're open to SQL injection; there are better ways of generating the COPY command if that's a part of your use-case.

Adam Bethke
  • 1,028
  • 2
  • 19
  • 35
  • I'm following the above procedure in my code but I'm experiencing issues for very large files as I believe my Composer's local storage is not enough. How would you handle it when your local storage Is not enough? – Minato Jan 30 '20 at 06:28
  • 1
    My quick thought is you'd need to implement a streaming solution like in this question: https://stackoverflow.com/q/31031463/6591849; that said, streaming solutions are always going to be more complex to handle / I'm not sure what the right approach would be to compress the file while streaming (probably some form of chunking solution) – Adam Bethke Jan 30 '20 at 12:49