3

In my ExecutorService, whenever a task(containing data to stream) is available, the executor will activate a thread to send a stream request to Google Bigquery. I found after a while, it threw exception:

javax.net.ssl.SSLHandshakeException: Remote host closed connection during handshake
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973)
at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1343)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1371)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1355)
at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:563)
at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1281)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1256)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:250)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:77)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:964)
at com.google.api.client.auth.oauth2.TokenRequest.executeUnparsed(TokenRequest.java:283)
at com.google.api.client.auth.oauth2.TokenRequest.execute(TokenRequest.java:307)
at com.google.api.client.googleapis.auth.oauth2.GoogleCredential.executeRefreshToken(GoogleCredential.java:269)
at com.google.api.client.auth.oauth2.Credential.refreshToken(Credential.java:489)
at com.google.api.client.auth.oauth2.Credential.intercept(Credential.java:217)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:858)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:410)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:343)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:460)
at com.geotab.bigdata.bqstream.util.BigqueryHelper.submitStreamRequest(BigqueryHelper.java:79)
at com.geotab.bigdata.bqstream.pipeline.RequestSendTask.run(RequestSendTask.java:47)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException: SSL peer shut down incorrectly
at sun.security.ssl.InputRecord.read(InputRecord.java:505)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:954)
... 26 more

I assumed that it was because I didn't refresh my credentials. So I updated my code as the following. But it didn't help to solve the problem. I check google API doc(https://developers.google.com/drive/web/credentials). But it only provides the way for a web app account. But what I use is a service account for a Java application.

Or my problem is not due to credential at all? How can I solve it?

public class BigqueryHelper {

private String mProjectId;
private String mDataset;

private static Bigquery sBIGQUERY = null;

static{
    getBigQuery();
}

public BigqueryHelper(String projectId, String dataSet) {
    this.mProjectId = projectId;
    this.mDataset = dataSet;
    getBigQuery();
}

/**
 * Insert a list of Rows into BigQuery
 * 
 * @param bigquery
 *            The Bigquery client
 * @param tableId
 *            The Bigquery tableId
 * @param rowList
 *            The row list to be streamed onto Bigquery
 * @throws IOException
 * @throws InterruptedException
 */
public void submitStreamRequest(int taskid,String tableId, List<Rows> rowList)
        throws IOException {
    if (rowList != null && rowList.size() > 0) {


        //System.out.println(taskid+"@@"+tableId+","+rowList.size()+"\n"+rowList.get(0).getJson().toString());

        try {
            TableDataInsertAllRequest request = new TableDataInsertAllRequest()
                    .setRows(rowList);
            TableDataInsertAllResponse response = sBIGQUERY
                    .tabledata()
                    .insertAll(mProjectId, mDataset,
                            tableId, request).execute();

            List<TableDataInsertAllResponse.InsertErrors> insertErrors = response
                    .getInsertErrors();
            if (insertErrors != null && insertErrors.size() > 0) {
                System.out.println("Failed to insert "
                        + insertErrors.size() + " rows. First error: "
                        + insertErrors.get(0).toString());
                return;
            }
        }catch (IOException e) {

            e.printStackTrace();
            if(e instanceof javax.net.ssl.SSLHandshakeException){
                synchronized(sBIGQUERY){
                    refreshBigQuery();
                }
            }
            throw new IOException();
        }
    }
}
private  static void  getBigQuery() {
    if (sBIGQUERY == null) {
        HttpTransport transport = new NetHttpTransport();
        List<String> lScope = new ArrayList<String>();
        lScope.add(ConfigConstants.SCOPE);
        GoogleCredential credential;
        try {
            credential = new GoogleCredential.Builder()
                    .setTransport(transport)
                    .setJsonFactory(ConfigConstants.JSON_FACTORY)
                    .setServiceAccountId(ConfigConstants.SERVICE_CLIENT_ID)
                    .setServiceAccountScopes(lScope)
                    .setServiceAccountPrivateKeyFromP12File(
                            new File(ConfigConstants.KEY_FILE)).build();
            sBIGQUERY = new Bigquery.Builder(transport,
                    ConfigConstants.JSON_FACTORY, credential)
                    .setApplicationName("BigQuery-Service-Accounts/0.1")
                    .setHttpRequestInitializer(credential).build();
        } catch (GeneralSecurityException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

private static void refreshBigQuery() {
    sBIGQUERY = null;
    getBigQuery();
}

}

foxwendy
  • 2,819
  • 2
  • 28
  • 50

0 Answers0