First, let's redefine mapping to group by channel
and return MapType
Column
(toolz
are convenient, but can be replaced with itertools.chain
)*:
from toolz import concat, interleave
from pyspark.sql.functions import col, create_map, lit, struct
# Create literal column from id to sensor -> channel map
channel_map = create_map(*concat((lit(k), v) for k, v in sensor_channel_df
.groupby("id")
# Create map Column from literal label to channel
.apply(lambda grp: create_map(*interleave([
map(lit, grp["sensor"]),
map(col, grp["channel"])])))
.to_dict()
.items()))
Next, get a list of sensors:
sensors = sorted(sensor_channel_df["sensor"].unique().tolist())
and combine data columns:
df = spark.createDataFrame(data_df)
data_cols = struct(*[c for c in df.columns if c != "id"])
Components defined above can be combined:
cols = [channel_map[col("id")][sensor].alias(sensor) for sensor in sensors]
df.select(["id"] + cols)
+---+------------------+------------------+------------------+------------------+------------------+
| id| acceleration| speed| temp| torque| weight|
+---+------------------+------------------+------------------+------------------+------------------+
| a| null| null| 8.712929970154072|5.4881350392732475| 0.0|
| a| null| null| 2.021839744032572| 7.151893663724195| 1.0|
| a| null| null| 83.2619845547938| 6.027633760716439| 2.0|
| a| null| null| 77.81567509498505| 5.448831829968968| 3.0|
| a| null| null| 87.00121482468191| 4.236547993389047| 4.0|
| b| null| 97.8618342232764| 6.458941130666561| null| 5.0|
| b| null| 79.91585642167236| 4.375872112626925| null| 6.0|
| b| null|46.147936225293186| 8.917730007820797| null| 7.0|
| b| null| 78.05291762864555| 9.636627605010293| null| 8.0|
| b| null|11.827442586893323|3.8344151882577773| null| 9.0|
| c| 63.99210213275238| null| 10.0| null| 7.917250380826646|
| c| 14.33532874090464| null| 11.0| null| 5.288949197529044|
| c| 94.46689170495839| null| 12.0| null| 5.680445610939323|
| c|52.184832175007166| null| 13.0| null| 9.25596638292661|
| c| 41.46619399905236| null| 14.0| null|0.7103605819788694|
+---+------------------+------------------+------------------+------------------+------------------+
It is also possible, although less efficient, to use udf
:
from toolz import unique
from pyspark.sql.types import *
from pyspark.sql.functions import udf
channel_dict = (sensor_channel_df
.groupby("id")
.apply(lambda grp: dict(zip(grp["sensor"], grp["channel"])))
.to_dict())
def remap(d):
fields = sorted(unique(concat(_.keys() for _ in d.values())))
schema = StructType([StructField(f, DoubleType()) for f in fields])
def _(row, id):
return tuple(float(row[d[id].get(f)]) if d[id].get(f) is not None
else None for f in fields)
return udf(_, schema)
(df
.withColumn("vals", remap(channel_dict)(data_cols, "id"))
.select("id", "vals.*"))
+---+------------------+------------------+------------------+------------------+------------------+
| id| acceleration| speed| temp| torque| weight|
+---+------------------+------------------+------------------+------------------+------------------+
| a| null| null| 8.712929970154072|5.4881350392732475| 0.0|
| a| null| null| 2.021839744032572| 7.151893663724195| 1.0|
| a| null| null| 83.2619845547938| 6.027633760716439| 2.0|
| a| null| null| 77.81567509498505| 5.448831829968968| 3.0|
| a| null| null| 87.00121482468191| 4.236547993389047| 4.0|
| b| null| 97.8618342232764| 6.458941130666561| null| 5.0|
| b| null| 79.91585642167236| 4.375872112626925| null| 6.0|
| b| null|46.147936225293186| 8.917730007820797| null| 7.0|
| b| null| 78.05291762864555| 9.636627605010293| null| 8.0|
| b| null|11.827442586893323|3.8344151882577773| null| 9.0|
| c| 63.99210213275238| null| 10.0| null| 7.917250380826646|
| c| 14.33532874090464| null| 11.0| null| 5.288949197529044|
| c| 94.46689170495839| null| 12.0| null| 5.680445610939323|
| c|52.184832175007166| null| 13.0| null| 9.25596638292661|
| c| 41.46619399905236| null| 14.0| null|0.7103605819788694|
+---+------------------+------------------+------------------+------------------+------------------+
In Spark 2.3 or later you can apply your current code with vectorized UDF.
* To understand what is going on here let's take a look at a single group as one, processed by apply
:
grp = sensor_channel_df.groupby("id").get_group("a")
First we convert sensor
sensor column to a sequence of Spark literals Columns
(think about constant value):
keys = list(map(lit, grp["sensor"]))
keys
Column<b'weight'>, Column<b'torque'>, Column<b'temp'>]
and sensor
column to sequence of Spark Columns
(think about pointer to the data):
values = list(map(col, grp["channel"]))
values
[Column<b'chan1'>, Column<b'chan2'>, Column<b'chan3'>]
When evaluated in a context the former one will result in constant output:
df_ = df.drop_duplicates(subset=["id"])
df_.select(keys).show()
+------+------+----+
|weight|torque|temp|
+------+------+----+
|weight|torque|temp|
|weight|torque|temp|
|weight|torque|temp|
+------+------+----+
while the latter one will repeat the data:
df_.select(values).show(3)
+-----+------------------+-----------------+
|chan1| chan2| chan3|
+-----+------------------+-----------------+
| 10| 7.917250380826646|63.99210213275238|
| 5| 6.458941130666561| 97.8618342232764|
| 0|5.4881350392732475|8.712929970154072|
+-----+------------------+-----------------+
Next we interleave these two and combine into a MapType
column:
mapping = create_map(*interleave([keys, values]))
mapping
Column<b'map(weight, chan1, torque, chan2, temp, chan3)'>
This gives us mapping from a metric name to the data column (think about Python dict
), and when evaluated:
df_.select(mapping).show(3, False)
+---------------------------------------------------------------------------+
|map(weight, chan1, torque, chan2, temp, chan3) |
+---------------------------------------------------------------------------+
|Map(weight -> 10.0, torque -> 7.917250380826646, temp -> 63.99210213275238)|
|Map(weight -> 5.0, torque -> 6.458941130666561, temp -> 97.8618342232764) |
|Map(weight -> 0.0, torque -> 5.4881350392732475, temp -> 8.712929970154072)|
+---------------------------------------------------------------------------+
Finally external comprehension repeats this for all groups, so channel_map
is a Column
:
Column<b'map(a, map(weight, chan1, torque, chan2, temp, chan3), b, map(weight, chan1, temp, chan2, speed, chan3), c, map(temp, chan1, weight, chan2, acceleration, chan3))'>
which evaluated gives following structure:
df_.select(channel_map.alias("channel_map")).show(3, False)
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Map(a -> Map(weight -> 10.0, torque -> 7.917250380826646, temp -> 63.99210213275238), b -> Map(weight -> 10.0, temp -> 7.917250380826646, speed -> 63.99210213275238), c -> Map(temp -> 10.0, weight -> 7.917250380826646, acceleration -> 63.99210213275238))|
|Map(a -> Map(weight -> 5.0, torque -> 6.458941130666561, temp -> 97.8618342232764), b -> Map(weight -> 5.0, temp -> 6.458941130666561, speed -> 97.8618342232764), c -> Map(temp -> 5.0, weight -> 6.458941130666561, acceleration -> 97.8618342232764)) |
|Map(a -> Map(weight -> 0.0, torque -> 5.4881350392732475, temp -> 8.712929970154072), b -> Map(weight -> 0.0, temp -> 5.4881350392732475, speed -> 8.712929970154072), c -> Map(temp -> 0.0, weight -> 5.4881350392732475, acceleration -> 8.712929970154072))|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Finally we use id
column to select map
of interest:
df_.select(channel_map[col("id")].alias("data_mapping")).show(3, False)
+---------------------------------------------------------------------------------+
|data_mapping |
+---------------------------------------------------------------------------------+
|Map(temp -> 10.0, weight -> 7.917250380826646, acceleration -> 63.99210213275238)|
|Map(weight -> 5.0, temp -> 6.458941130666561, speed -> 97.8618342232764) |
|Map(weight -> 0.0, torque -> 5.4881350392732475, temp -> 8.712929970154072) |
+---------------------------------------------------------------------------------+
and column names to extract values from the map
:
df_.select(channel_map[col("id")]["weight"].alias("weight")).show(3, False)
+-----------------+
|weight |
+-----------------+
|7.917250380826646|
|5.0 |
|0.0 |
+-----------------+
At the end of the day this just a bunch of simple transformation on data structures containing symbolic expressions.