4

I implemented an UnboundedSource for Apache Beam that needs to communicate with a server requiring 256 bit encryption. I am able use it when running jobs locally (by installing unlimited strength policy files[1] into my local $JAVA_HOME/jre/lib/security directory.

My problem is trying to use this UnboundedSource when it is used in a Dataflow job running in Google Cloud Platform[2]?

Is it possible to configure a Dataflow job to use the unlimited security policy files or is that functionality not supported in GCP?

The error I get when trying to run in GCP (without the unlimited strength policy files) is a handshake error

  exception: "java.util.concurrent.ExecutionException: java.net.ConnectException: Received fatal alert: handshake_failure
    at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
    at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
    at com.urbanairship.connect.client.StreamConnection.connect(StreamConnection.java:212)
    at com.urbanairship.connect.client.StreamConnection.begin(StreamConnection.java:145)
    at com.urbanairship.connect.client.StreamConnection.read(StreamConnection.java:122)
    at com.urbanairship.connect.client.StreamConsumeTask.transitionToReading(StreamConsumeTask.java:138)
    at com.urbanairship.connect.client.StreamConsumeTask.stream(StreamConsumeTask.java:100)
    at com.urbanairship.connect.client.StreamConsumeTask.run(StreamConsumeTask.java:83)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Received fatal alert: handshake_failure
    at com.ning.http.client.providers.netty.request.NettyConnectListener.onFutureFailure(NettyConnectListener.java:133)
    at com.ning.http.client.providers.netty.request.NettyConnectListener.access$200(NettyConnectListener.java:37)
    at com.ning.http.client.providers.netty.request.NettyConnectListener$1.operationComplete(NettyConnectListener.java:104)
    at org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:409)
    at org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:395)
    at org.jboss.netty.channel.DefaultChannelFuture.setFailure(DefaultChannelFuture.java:362)
    at org.jboss.netty.handler.ssl.SslHandler.setHandshakeFailure(SslHandler.java:1460)
    at org.jboss.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1314)
    at org.jboss.netty.handler.ssl.SslHandler.decode(SslHandler.java:852)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    ... 3 more

As suggested in another question I tried using reflection to override the security policy to allow unlimited length keys, but that did not work and one of the comments mentioned the reflection hack doesn't work for Java 8 (I believe due to a variable being changed to final).

Running this block in my job did not work ...

final Class<?> jceSecurity = Class.forName("javax.crypto.JceSecurity");
final Class<?> cryptoPermissions = Class.forName("javax.crypto.CryptoPermissions");
final Class<?> cryptoAllPermission = Class.forName("javax.crypto.CryptoAllPermission");

final Field isRestrictedField = jceSecurity.getDeclaredField("isRestricted");
isRestrictedField.setAccessible(true);
final Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(isRestrictedField, isRestrictedField.getModifiers() & ~Modifier.FINAL);
isRestrictedField.set(null, false);

final Field defaultPolicyField = jceSecurity.getDeclaredField("defaultPolicy");
defaultPolicyField.setAccessible(true);
final PermissionCollection defaultPolicy = (PermissionCollection) defaultPolicyField.get(null);

final Field perms = cryptoPermissions.getDeclaredField("perms");
perms.setAccessible(true);
((Map<?, ?>) perms.get(defaultPolicy)).clear();

final Field instance = cryptoAllPermission.getDeclaredField("INSTANCE");
instance.setAccessible(true);
defaultPolicy.add((Permission) instance.get(null));

[1] http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html
[2] https://cloud.google.com/dataflow

Community
  • 1
  • 1
  • What do you mean with "did not work"? – Kayaman Apr 29 '17 at 20:07
  • 1
    When running a job with Dataflow in GCP (with or without the reflection hack) I would get a `handshake error`. I am assuming due to the key size requirement of the server I am communicating with as I ran into the same issue locally until I updated the security policy to unlimited. – Jeffrey Meyers Apr 29 '17 at 21:37
  • When you say "running this block in my job" - where exactly do you put this block of code? Is it in your main() function that submits the pipeline; or is it part of the implementation of your UnboundedSource; or somewhere else? – jkff May 01 '17 at 18:06
  • I ran that block in the `UnboundedReader.start()` method – Jeffrey Meyers May 02 '17 at 19:12
  • Did you ever figure this out? Although I’m seeing a different stack trace (`Illegal key length`) I have a feeling I’m running into the same issue. – Sander Dec 14 '17 at 19:32

0 Answers0