1

I am trying to filter from the rdd which have values as "01-10-2019"

print("\n ### Remove duplicates in merged RDD:")

insuredata = insuredatamerged_cache.distinct()
print("insuredata: ",type(insuredata))

print("\n  ### Increase partition to 8 in merged RDD:")
insuredata.getNumPartitions()
insuredatarepart = insuredata.repartition(8)
insuredatarepart.getNumPartitions()

print("insuredatarepart:",type(insuredatarepart))

print("\n ### Split RDD with business date field:")

rdd_201901001 = insuredatarepart.map(lambda y: y.split(",",-1)).filter(lambda x: u'01-10-2019' in x)

print(" ### count of rdd_201901001:",rdd_201901001.count())

Input values:

where insuredatarepart is class 'pyspark.rdd.RDD' with below dataset as list values

Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'No', IssuerId='96601', IssuerId2='96601', MarketCoverage=u'SHOP (Small Group)', NetworkName=u'Select Network', NetworkURL=u'http://il.coventryproviders.com', SourceName=u'SERFF', StateCode=u'IL', custnum='13')Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'Yes', IssuerId='37001', IssuerId2='37001', MarketCoverage=u'Individual', NetworkName=u'HumanaDental PPO/Traditional Preferred', NetworkURL=u'https://www.humana.com/finder/search?customerId=1085&pfpkey=317', SourceName=u'HIOS', StateCode=u'GA', custnum='13')
    Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'No', IssuerId='54172', IssuerId2='54172', MarketCoverage=u'Individual', NetworkName=u'Molina Marketplace', NetworkURL=u'https://eportal.molinahealthcare.com/Provider/ProviderSearch?RedirectFrom=MolinaStaticWeb&State=fl&Coverage=MMP', SourceName=u'HIOS', StateCode=u'FL', custnum='14')

Exception is as shown below:

### Remove duplicates in merged RDD:
insuredata:  class 'pyspark.rdd.PipelinedRDD'
 Result Count after duplicates removed:  1407
 Result Count of duplicates removed:  1

### Increase partition to 8 in merged RDD:
insuredatarepart: class 'pyspark.rdd.RDD'

### Split RDD with business date field:
20/02/05 19:11:43 ERROR Executor: Exception in task 0.0 in stage 74.0 (TID 150)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1008, in <lambda>
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1008, in <genexpr>
  File "/home/hduser/sparkdata2/script/insurance_info2_new.py", line 294, in <lambda>
    rdd_201901001 = insuredatarepart.map(lambda y: y.split(",",-1)).filter(lambda x: u'01-10-2019' in x)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1502, in __getattr__
    raise AttributeError(item)
AttributeError: split

        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
        at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
        at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Vega
  • 27,856
  • 27
  • 95
  • 103
RSH
  • 13
  • 3
  • Welcome to SO! Please take a moment to read about how to post spark questions: https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-examples – YOLO Feb 09 '20 at 07:02
  • Looks like your insuredatarepart is list. And you are applying split function on it. Since list don't have split function. Its throwing error. – Karthik Feb 09 '20 at 11:57

1 Answers1

0

From the printed output that you provided, it appears that you have RDD of type Row.

Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'No', IssuerId='96601', IssuerId2='96601', MarketCoverage=u'SHOP (Small Group)', NetworkName=u'Select Network', NetworkURL=u'http://il.coventryproviders.com', SourceName=u'SERFF', StateCode=u'IL', custnum='13')Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'Yes', IssuerId='37001', IssuerId2='37001', MarketCoverage=u'Individual', NetworkName=u'HumanaDental PPO/Traditional Preferred', NetworkURL=u'https://www.humana.com/finder/search?customerId=1085&pfpkey=317', SourceName=u'HIOS', StateCode=u'GA', custnum='13')
Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'No', IssuerId='54172', IssuerId2='54172', MarketCoverage=u'Individual', NetworkName=u'Molina Marketplace', NetworkURL=u'https://eportal.molinahealthcare.com/Provider/ProviderSearch?RedirectFrom=MolinaStaticWeb&State=fl&Coverage=MMP', SourceName=u'HIOS', StateCode=u'FL', custnum='14')

Here, you must not be calling split function to split the elements because they already seem to be split in multiple fields through whatever process you used to acquire these. You can just access through item index.

rdd_201901001 = insuredatarepart.filter(lambda x: u'01-10-2019' in x[0])

Notice that map is removed, and index is added in filter clause as in x[0]

If you had a single string type field in your Row (which you don't, based upon shared output); you would still need to call split on zeroeth element, not on the Row itself and the statement might have been

rdd_201901001 = insuredatarepart.map(lambda y: y[0].split(",",-1)).filter(lambda x: u'01-10-2019' in x[0])

Notice that index values have been applied in both map and filter operations. This would have resulted in a RDD of list of strings that you would need to stitch together.

xenodevil
  • 604
  • 7
  • 18