I am trying to process a very large corpus using pyspark, however my input file is not structured "one document per line", so I can't simply load the file directly using sc.textFile
.
Instead, I am loading the file using a generator function that yield
s documents whenever a stop-sequence is encountered. I can wrap this generator using sc.parallelize
, however that will cause pyspark to load all my data into RAM all at once, which I can't afford.
Is there any way to work around this? Or will I definitely need to convert my text files?
Here is basically what I want to run:
def repaired_corpus(path):
_buffer = ""
for line in open(path):
doc_end = line.find(doc_end_pattern)
if doc_end != -1:
_buffer += line[:doc_end + len(doc_end_pattern)]
yield _buffer
_buffer = ""
else:
_buffer += line
some_state = sc.broadcast(my_state)
in_rdd = spark.sparkContext.parallelize(repaired_corpus(path))
json_docs = in_rdd.map(
lambda item: process_element(
item, some_state.value
)
).saveAsTextFile("processed_corpus.out")