I've had this issue for some time now, and I think it has to do with my lack of understanding of how to use combineByKey and reduceByKey, so hopefully somebody can clear this up.
I am working with DNA sequences, so I have a procedure to produce a bunch of different versions of it (forwards, backwards and complimented). I have several reading frames, meaning that for the string ABCABC
, I want the following series of keys: ABC ABC
, A BCA BC
, AB CAB C
.
Right now I am using the following function to break things up (I run this in a flatMap procedure):
# Modified from http://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python
def chunkCodons((seq, strand, reading_frame)):
"""
Yield successive codons from seq
"""
# Get the first characters
if reading_frame > 0:
yield (0, seq[0:reading_frame], strand, reading_frame)
for i in xrange(reading_frame, len(seq), 3):
if i % 1000000 == 0:
print "Base # = {:,}".format(i)
yield (i, seq[i:i + 3], strand, reading_frame)
I run this like so: reading_frames_rdd = nascent_reading_frames_rdd.flatMap(chunkCodons)
However, this takes a very long time on a long string of DNA, so I know this has to be wrong.
Therefore, I want to have Spark do this in more direct fashion by breaking it up by character (i.e. base) and then recombining it 3 at a time. The problem is that I have to combine keys that are not the same, but adjacent. Meaning that if I have (1, 'A'), (2, 'B'), (3, 'C'),....
, I want to be able to generate (1, 'ABC').
I have no idea how to do this. I suspect I need to use combineByKey and have it only produce output conditionally. Do I simply have it only produce output that can be consumed by combineByKey if it meets my conditions? Is that how I should do it?
EDIT:
Here is my input: [(0, 'A'), (1, 'A'), (2, 'B'), (3, 'A'), (4, 'C'), ....]
I want output like this: [(0, 0, 'AAB'), (0, 1, 'ABX'), ...]
and [(1, 0, 'A'), (1, 1, 'ABA'), (1, 2, 'CXX')...]
.
The format is [(reading frame, first base #, sequence)]