0

The following is a few rows from an example file which is ~ 30GB

### s3://mybucket/tmp/file_in.txt
"one"|"mike"|"456"|"2010-01-04"
"two"|"lisa"|"789"|"2011-03-08"
"three"|"ann"|"845"|"2012-06-11"

I'd like to use PySpark to...

  • read the text file using spark's parallelism
  • replace the "n" character with "X"
  • output the updated text to a new text file with the same format

so the resulting file would look like this:

### s3://mybucket/tmp/file_out.txt
"oXe"|"mike"|"456"|"2010-01-04"
"two"|"lisa"|"789"|"2011-03-08"
"three"|"aXX"|"845"|"2012-06-11"

I have tried a variety of ways to achieve this seemingly simple task...

data = sc.textFile('s3://mybucket/tmp/file_in.txt')

def make_replacement(row):
    result = row.replace("n", "X")
    return result

out_data = data.map(make_replacement).collect()

#out_data = data.map(lambda line: make_replacement(line)).collect()

out_data.coalesce(1).write.format("text").option("header", "false").save("s3://mybucket/tmp/file_out.txt")

but I continue to see the following errors:

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 21, <<my_server>>, executor 9): java.lang.RuntimeException: Failed to run command: /usr/bin/virtualenv -p python3 --system-site-packages virtualenv_application....
    at org.apache.spark.api.python.VirtualEnvFactory.execCommand(VirtualEnvFactory.scala:120)

Note: solutions using read.csv or dataframe will not be applicable to this problem
Any recommendations on how to solve this?

Michael K
  • 439
  • 3
  • 13

2 Answers2

1

You can create an expression and call the expression in select

from pyspark.sql import functions as F

df = spark.read.csv('s3://mybucket/tmp/file_in.txt','\t')
expr = [F.regexp_replace(F.col(column), pattern="n", replacement="X").alias(column) for column in df.columns]

df = df.select(expr)
df.write.csv.format("text").option("header", "false").save("s3://mybucket/tmp/file_out.txt")
nobody
  • 10,892
  • 8
  • 45
  • 63
  • the real-world example requires that I cannot use dataframe because there are delimiter issues. I need to pre-process using text only prior to reading into dataframe. – Michael K Jan 13 '21 at 17:54
0

If you need not to play with data set then why you even looking for spark .

use python file read and write code and replace the character.

sample code

sandeep rawat
  • 4,797
  • 1
  • 18
  • 36
  • because reading 30 GB of data line by line in python will be much slower than using spark's distributed model – Michael K Jan 14 '21 at 18:02
  • as you intend is just update the data no further processing. So the cost (time to read from file share to system will be same ) – sandeep rawat Jan 15 '21 at 05:56