2

I've had this issue for some time now, and I think it has to do with my lack of understanding of how to use combineByKey and reduceByKey, so hopefully somebody can clear this up.

I am working with DNA sequences, so I have a procedure to produce a bunch of different versions of it (forwards, backwards and complimented). I have several reading frames, meaning that for the string ABCABC, I want the following series of keys: ABC ABC, A BCA BC, AB CAB C.

Right now I am using the following function to break things up (I run this in a flatMap procedure):

# Modified from http://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python
def chunkCodons((seq, strand, reading_frame)):
    """
     Yield successive codons from seq
     """
    # Get the first characters
    if reading_frame > 0:
        yield (0, seq[0:reading_frame], strand, reading_frame)
    for i in xrange(reading_frame, len(seq), 3):
        if i % 1000000 == 0:
            print "Base # = {:,}".format(i)
        yield (i, seq[i:i + 3], strand, reading_frame)

I run this like so: reading_frames_rdd = nascent_reading_frames_rdd.flatMap(chunkCodons)

However, this takes a very long time on a long string of DNA, so I know this has to be wrong.

Therefore, I want to have Spark do this in more direct fashion by breaking it up by character (i.e. base) and then recombining it 3 at a time. The problem is that I have to combine keys that are not the same, but adjacent. Meaning that if I have (1, 'A'), (2, 'B'), (3, 'C'),...., I want to be able to generate (1, 'ABC').

I have no idea how to do this. I suspect I need to use combineByKey and have it only produce output conditionally. Do I simply have it only produce output that can be consumed by combineByKey if it meets my conditions? Is that how I should do it?

EDIT:

Here is my input: [(0, 'A'), (1, 'A'), (2, 'B'), (3, 'A'), (4, 'C'), ....]

I want output like this: [(0, 0, 'AAB'), (0, 1, 'ABX'), ...] and [(1, 0, 'A'), (1, 1, 'ABA'), (1, 2, 'CXX')...].

The format is [(reading frame, first base #, sequence)]

zero323
  • 322,348
  • 103
  • 959
  • 935
Chris Chambers
  • 1,367
  • 21
  • 39

1 Answers1

2

You can try something like this:

seq = sc.parallelize(zip(xrange(16), "ATCGATGCATGCATGC"))

(seq
 .flatMap(lambda (pos, x): ((pos - i, (pos, x)) for i in range(3)))
 .groupByKey()
 .mapValues(lambda x: ''.join(v for (pos, v)  in sorted(x)))
 .filter(lambda (pos, codon): len(codon) == 3)
 .map(lambda (pos, codon): (pos % 3, pos, codon))
 .collect())

and result:

[(0, 0, 'ATC'),
 (1, 1, 'TCG'),
 (2, 2, 'CGA'),
 (0, 3, 'GAT'),
 (1, 4, 'ATG'),
 (2, 5, 'TGC'),
 (0, 6, 'GCA'),
 (1, 7, 'CAT'),
 (2, 8, 'ATG'),
 (0, 9, 'TGC'),
 (1, 10, 'GCA'),
 (2, 11, 'CAT'),
 (0, 12, 'ATG'),
 (1, 13, 'TGC')]

In practice I would try something else:

from toolz.itertoolz import sliding_window, iterate, map, zip
from itertools import product
from numpy import uint8

def inc(x):
    return x + uint8(1)

# Create dictionary mapping from codon to integer
mapping = dict(zip(product('ATCG', repeat=3), iterate(inc, uint8(0))))

seq = sc.parallelize(["ATCGATGCATGCATGC"])

(seq
  # Generate pairs (start-position, 3-gram)
  .flatMap(lambda s: zip(iterate(inc, 0), sliding_window(3, s)))
  # Map 3-grams to respective integers
  .map(lambda (pos, seq): (pos, mapping.get(seq)))
  .collect())

Reading frame is clearly redundant and can be obtained from the start position at any moment so I omitted it here.

Simple mapping between codon and small integer can save a lot of memory and traffic.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Oh that's right, I can access the list while flatmapping, so I can grab groups of three. That's probably going to be enough. However, my big issue is a conceptual one, namely, that I want to know if it is possible to do an "optional" combine/reduce, meaning that I only reduce certain pairs. – Chris Chambers Jun 19 '15 at 20:22
  • I do like the idea of storing the indices rather than the actual strings, I might do that instead. – Chris Chambers Jun 19 '15 at 20:23
  • As far as I know it is not possible. You can obtain the same result either by generating multiple keys per value as above or creating "broader" keys and cleaning afterwards. – zero323 Jun 21 '15 at 20:46
  • Oh, sliding window is nice, that's very helpful. – Chris Chambers Jun 23 '15 at 21:14
  • Thanks for the help, the Toolz library in particular is awesome. I think this is enough to get me on the right track. – Chris Chambers Jun 23 '15 at 21:22
  • It is amazing. Pipes alone would be more than enough to like but with all the functional goodness it is almost to good to be true :) If you don't mind me asking what kind of problem are you trying to solve with Spark? – zero323 Jun 24 '15 at 04:11
  • I'm working with applying Spark to DNA statistics, so right now I'm trying to find all the possible open reading frames. I want to run this on very long strings of DNA and see how I can improve performance over the currently used techniques. – Chris Chambers Jun 25 '15 at 21:33