In my ExecutorService, whenever a task(containing data to stream) is available, the executor will activate a thread to send a stream request to Google Bigquery. I found after a while, it threw exception:
javax.net.ssl.SSLHandshakeException: Remote host closed connection during handshake
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973)
at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1343)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1371)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1355)
at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:563)
at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1281)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1256)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:250)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:77)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:964)
at com.google.api.client.auth.oauth2.TokenRequest.executeUnparsed(TokenRequest.java:283)
at com.google.api.client.auth.oauth2.TokenRequest.execute(TokenRequest.java:307)
at com.google.api.client.googleapis.auth.oauth2.GoogleCredential.executeRefreshToken(GoogleCredential.java:269)
at com.google.api.client.auth.oauth2.Credential.refreshToken(Credential.java:489)
at com.google.api.client.auth.oauth2.Credential.intercept(Credential.java:217)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:858)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:410)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:343)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:460)
at com.geotab.bigdata.bqstream.util.BigqueryHelper.submitStreamRequest(BigqueryHelper.java:79)
at com.geotab.bigdata.bqstream.pipeline.RequestSendTask.run(RequestSendTask.java:47)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException: SSL peer shut down incorrectly
at sun.security.ssl.InputRecord.read(InputRecord.java:505)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:954)
... 26 more
I assumed that it was because I didn't refresh my credentials. So I updated my code as the following. But it didn't help to solve the problem. I check google API doc(https://developers.google.com/drive/web/credentials). But it only provides the way for a web app account. But what I use is a service account for a Java application.
Or my problem is not due to credential at all? How can I solve it?
public class BigqueryHelper {
private String mProjectId;
private String mDataset;
private static Bigquery sBIGQUERY = null;
static{
getBigQuery();
}
public BigqueryHelper(String projectId, String dataSet) {
this.mProjectId = projectId;
this.mDataset = dataSet;
getBigQuery();
}
/**
* Insert a list of Rows into BigQuery
*
* @param bigquery
* The Bigquery client
* @param tableId
* The Bigquery tableId
* @param rowList
* The row list to be streamed onto Bigquery
* @throws IOException
* @throws InterruptedException
*/
public void submitStreamRequest(int taskid,String tableId, List<Rows> rowList)
throws IOException {
if (rowList != null && rowList.size() > 0) {
//System.out.println(taskid+"@@"+tableId+","+rowList.size()+"\n"+rowList.get(0).getJson().toString());
try {
TableDataInsertAllRequest request = new TableDataInsertAllRequest()
.setRows(rowList);
TableDataInsertAllResponse response = sBIGQUERY
.tabledata()
.insertAll(mProjectId, mDataset,
tableId, request).execute();
List<TableDataInsertAllResponse.InsertErrors> insertErrors = response
.getInsertErrors();
if (insertErrors != null && insertErrors.size() > 0) {
System.out.println("Failed to insert "
+ insertErrors.size() + " rows. First error: "
+ insertErrors.get(0).toString());
return;
}
}catch (IOException e) {
e.printStackTrace();
if(e instanceof javax.net.ssl.SSLHandshakeException){
synchronized(sBIGQUERY){
refreshBigQuery();
}
}
throw new IOException();
}
}
}
private static void getBigQuery() {
if (sBIGQUERY == null) {
HttpTransport transport = new NetHttpTransport();
List<String> lScope = new ArrayList<String>();
lScope.add(ConfigConstants.SCOPE);
GoogleCredential credential;
try {
credential = new GoogleCredential.Builder()
.setTransport(transport)
.setJsonFactory(ConfigConstants.JSON_FACTORY)
.setServiceAccountId(ConfigConstants.SERVICE_CLIENT_ID)
.setServiceAccountScopes(lScope)
.setServiceAccountPrivateKeyFromP12File(
new File(ConfigConstants.KEY_FILE)).build();
sBIGQUERY = new Bigquery.Builder(transport,
ConfigConstants.JSON_FACTORY, credential)
.setApplicationName("BigQuery-Service-Accounts/0.1")
.setHttpRequestInitializer(credential).build();
} catch (GeneralSecurityException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static void refreshBigQuery() {
sBIGQUERY = null;
getBigQuery();
}
}