1

What I have so far is:

lookup = sc.textFile("/user/myuser/lookup.asv")
lookup.map(lambda r: r.split(chr(1)) )

And now I have a RDD looks like

[
    [filename1, category1],
    [filename2, category2],
    ...
    [filenamen, categoryn]
]

How can I turn that RDD into a broadcasted dictionary like:

{filename1: category1, filename2: category2, ...}

This is what I have tried but not working:

>>> broadcastVar = sc.broadcast({})
>>> data = sc.parallelize([[1,1], [2,2], [3,3], [4,4]])
>>> def myfunc(x):
...     broadcastVar[str(x[0])] = x[1]
... 
>>> result = data.map(myfunc)
>>> broadcastVar
<pyspark.broadcast.Broadcast object at 0x7f776555e710>
>>> broadcastVar.value
{}
>>> result.collect()
...
ERROR: TypeError: 'Broadcast' object does not support item assignment
...
>>> broadcastVar.value
{}

For more information about why I am building this huge lookup variable, read this:

This is a followup question of this one.

I have two tables where

table1: a very wide (25K columns and 150K rows) table where each column contains the pixel info and the first column is the filename of the input image file.

table2: a TSV (tab delimited file) file that has 3 million rows and each row contains the image file name and the product category of the image.

Speaking in SQL, I need to do a inner join on those two tables on the filename so I can label the image data for later on machine learning.

It is not realistic to do it in any sort of SQL because you have to create a table for table1 which has 25K columns, the create table syntax will be ridiculous long.

Then I am thinking about creating a lookup variable using table2 and maybe make it a broadcast variable where the key is the filename and the value is the product category.

Community
  • 1
  • 1
B.Mr.W.
  • 18,910
  • 35
  • 114
  • 178

1 Answers1

0

Broadcast variables are read-only on workers. Spark provides accumulators which are write only but these are intended for things like counters. Here you can simply collect and create a Python dictionary:

lookup_bd = sc.broadcast({
  k: v for (k, v) in lookup.map(lambda r: r.split(chr(1))).collect()
})

It is not realistic to do it in any sort of SQL because you have to create a table for table1 which has 25K columns, the create table syntax will be ridiculous long.

Creation shouldn't be a problem. As long you know the names you can easily create table like this programmatically:

from pyspark.sql import Row

colnames = ["x{0}".format(i) for i in range(25000)] # Replace with actual names

df = sc.parallelize([
   row(*[randint(0, 100) for _ in range(25000)]) for x in range(10)
]).toDF()

## len(df.columns)
## 25000

There is another problem here which is much more serious even when you use plain RDDs. Very wide rows are generally speaking hard to handle in any row-wise format.

One thing you can do is use sparse representation like SparseVector or SparseMatrix. Another is to encode pixel info for example using RLE.

zero323
  • 322,348
  • 103
  • 959
  • 935