I'm cracking my head at this weird Pyspark behavior and couldn't find anything on the internet.
We have a MySQL cluster in a private network, which we can access by a SSH Tunnel. I'm trying to read data from this database using Pyspark and SSHTunnelForwarder, and I'm doing it like this:
- Creating the tunnel:
server = SSHTunnelForwarder(
(target_tunnel_ip_address, 22),
ssh_username=tunnel_username",
ssh_private_key=private_key_filepath",
remote_bind_address=(mysql_address, 3306)
)
server.start()
- Creating a JDBC URL using the database information like so:
hostname = "localhost" #Because I forwarded I forwarded the remote port to my localhost
port = server.local_bind_port #To access which port the SSHTunnelForwarder used
username = my_username
password = my_password
database = my_database
jdbcUrl = "jdbc:mysql://{}:{}/{}?user={}&password={}".format(hostname, port, database, username, password)
- Reading the data from the database:
data = spark.read \
.format("jdbc") \
.option("url", jdbcUrl) \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("query", query) \
.load()
So far so good, this seems to work and I can see the table columns: [Output of variable data][1] [1]: https://i.stack.imgur.com/YJhCC.png
DataFrame[id: int, company_id: int, life_id: int, type_id: int, cep: string, address: string, number: int, complement: string, neighborhood: string, city: string, state: string, origin: string, created_at: timestamp, updated_at: timestamp, active: boolean]
But as soon as I call any method that actually reads the data, like .head(), .collect() or other variations, I get this error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7629.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7629.0 (TID 11996, XX.XXX.XXX.XXX, executor 0): com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
Does anyone have any idea why is this happening and how to fix it?