0

I have a local text file kv_pair.log formatted such as that key value pairs are comma delimited and records begin and terminate with a new line:

"A"="foo","B"="bar","C"="baz"
"A"="oof","B"="rab","C"="zab"
"A"="aaa","B"="bbb","C"="zzz"

I am trying to read this to a Pair RDD using pySpark as follows:

from pyspark import SparkContext
sc=sparkContext()

# Read raw text to RDD
lines=sc.textFile('kv_pair.log')

# How to turn this into a Pair RDD?
pairs=lines.map(lambda x: (x.replace('"', '').split(",")))

print type(pairs)
print pairs.take(2)

I feel I am close! The output of above is:

[[u'A=foo', u'B=bar', u'C=baz'], [u'A=oof', u'B=rab', u'C=zab']]

So it looks like pairs is a list of records, which contains a list of the kv pairs as strings.

How can I use pySpark to transform this into a Pair RDD such as that the keys and values are properly separated?

Ultimate goal is to transform this Pair RDD into a DataFrame to perform SQL operations - but one step at a time, please help transforming this into a Pair RDD.

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
gpanda
  • 785
  • 2
  • 10
  • 23
  • `pairs=lines.flatMap(lambda x: (x.replace('"', '').split(",")))` ? – mrsrinivas Aug 01 '17 at 16:35
  • @mrsrinivas, this suggestion yields: `[u'A=foo', u'B=bar', u'C=baz', u'A=oof', u'B=rab', u'C=zab', u'A=aaa', u'B=bbb', u'C=zzz']` Where what I would want is: `['A'='foo', 'B'='bar', 'C'='baz'], ['A'='foo', 'B'='rab', 'C'='zab'], ['A'='foo', 'B'='bbb', 'C'='zzz']` (I think xD) – gpanda Aug 01 '17 at 16:44
  • if you are concerning about char `u`(Unicode), you don't have to think about it. It's just for console output only – mrsrinivas Aug 01 '17 at 16:53

2 Answers2

1

You can use flatMap with a custom function as lambda can't be used for multiple statements

def tranfrm(x):
    lst = x.replace('"', '').split(",")
    return [(x.split("=")[0], x.split("=")[1]) for x in lst]

pairs = lines.map(tranfrm)
mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
  • 1
    Check this for proper way to parse csv in Spark SQL and Scala (should be same for python as well) : https://stackoverflow.com/a/39533431/1592191 – mrsrinivas Aug 01 '17 at 16:50
  • This suggestion fails on the kv_pair.log example provided with error: `AttributeError: 'list' object has no attribute 'map'` on line `return lst.map(lambda x: (x.split("=")[0], x.split("=")[1]))` – gpanda Aug 01 '17 at 17:05
0

This is really bad practice for a parser, but I believe your example could be done with something like this:

from pyspark import SparkContext
from pyspark.sql import Row

sc=sparkContext()

# Read raw text to RDD
lines=sc.textFile('kv_pair.log')

# How to turn this into a Pair RDD?
pairs=lines.map(lambda x: (x.replace('"', '').split(",")))\
           .map(lambda r: Row(A=r[0].split('=')[1], B=r[1].split('=')[1], C=r[2].split('=')[1] )

print type(pairs)
print pairs.take(2)
grepe
  • 1,897
  • 2
  • 14
  • 24
  • Thank you for the suggestion, and perhaps I should have included in my question, but I am trying to avoid hard-coding the expected kv pairs A, B, C, etc. – gpanda Aug 01 '17 at 17:08