Idea
I assume that "x"
in the posted data example works like a boolean trigger. So why not to replace it with True
and empty space with False
? After that, we can apply logical operators directly to data. For example, what does it mean that the client's days do not fit in the "Sector B"
pattern? Schematically it means any(client_days and not sector_b) is True
, as in the following model:
import pandas as pd
week_days = 'mon tue wed thu fri sat sun'.split()
client_days = pd.Series([0,1,0,0,1,0,0], index=week_days)
sector_b = pd.Series([1,0,1,0,1,0,0], index=week_days)
assert any(client_days & ~sector_b)
How to implement this in Pandas
pandas 1.5.1
Let's model this idea in Pandas, as if we could apply toPandas
to the original data:
import pandas as pd
week_days = 'mon tue wed thu fri sat sun'.split()
data = [
[0,1,0,0,1,0,0],
[1,0,1,0,1,0,0],
[1,0,1,0,0,0,0],
[1,0,0,0,0,0,0],
[1,0,0,0,1,0,0],
[0,0,1,0,1,0,0],
[0,0,0,0,1,0,0],
[0,0,1,0,0,0,0],
[1,1,1,1,1,1,1],
[1,0,1,0,0,0,0],
]
clients = pd.DataFrame(
data,
index=1 + pd.Index(range(len(data)), name='Client'),
columns=week_days,
dtype=bool
)
sectors = pd.DataFrame(
data=[[1,0,1,0,1,0,0]],
index=pd.Index(['Sector B'], name='sector'),
columns=week_days,
dtype=bool,
)
In this case we could use dot
operator, i.e. scalar product, keeping in mind that addition and multiplication correspond to the or/and operations in the case of boolean data:
answer = (clients @ ~sectors.loc['Sector B']).map({True: 'A', False: 'B'})
Implementation on PySpark
pyspark 3.4.1
Suppose that for some reason we can't use toPandas
. Let's reorganize data as if they are PySpark DataFrame:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
clients_sdf = spark.createDataFrame(clients.reset_index())
sectors_sdf = spark.createDataFrame(sectors.reset_index())
How would we implement the idea being restricted to this data type?
First, sector's data are small and we can extract them in some sequence (e.g. list). Next, we can apply map
for logical AND, then reduce
for logical OR, which gives us True
for "Sector A"
cases and "False"
otherwise. After that we apply when
from pyspark.sql.functions
to map values:
from pyspark.sql.functions import lit, when
from functools import reduce
client_data = clients_sdf[week_days]
sector_b = [*sectors_sdf.where('sector == "Sector B"')[week_days].first()]
not_in_B = map(lambda x, y: x & lit(not y), client_data, sector_b)
is_in_sector_A = reduce(lambda x, y: x | y, not_in_B)
client_sector = when(is_in_sector_A, 'A').otherwise('B')
answer = clients_sdf.withColumn('Sector', client_sector).select('Client', 'Sector')
Output:
>>> answer.show()
+------+------+
|Client|Sector|
+------+------+
| 1| A|
| 2| B|
| 3| B|
| 4| B|
| 5| B|
| 6| B|
| 7| B|
| 8| B|
| 9| A|
| 10| B|
+------+------+
General case
This is just a fantasy on what it might look like in the general case. Suppose we have these data:
import pandas as pd
week_days = 'mon tue wed thu fri sat sun'.split()
data = [
[0,1,0,1,0,0,0], # changed to fit a new Sector A
[1,0,1,0,1,0,0],
[1,0,1,0,0,0,0],
[1,0,0,0,0,0,0],
[1,0,0,0,1,0,0],
[0,0,1,0,1,0,0],
[0,0,0,0,1,0,0],
[0,0,1,0,0,0,0],
[1,1,1,1,1,1,1], # fit Sector C
[1,0,1,0,0,0,0],
]
clients = pd.DataFrame(
data,
index=1 + pd.Index(range(len(data)), name='Client'),
columns=week_days,
dtype=bool
)
sectors = pd.DataFrame( # add Sector A, Sector C
data=[[0,1,0,1,0,1,0], [1,0,1,0,1,0,0], [1,1,1,1,1,1,1]],
index=pd.Index(['Sector A', 'Sector B', 'Sector C'], name='sector'),
columns=week_days,
dtype=bool,
)
We can see here 3 sectors presumably arranged in descending order of their priority, which we might want to represent in the final frame by their last letter.
Let's do it in Pandas:
isin_sector = ~(clients @ ~sectors.T)
answer = (
isin_sector
.apply(lambda column: column.map({True: column.name[-1]}))
.agg(lambda row: row.dropna()[0], axis='columns')
)
display(answer)
Now in PySpark, trying to avoid Pandas API. Here, when applying coalesce
, I rely on the fact that dictionaries in Python preserve the order in which items are added:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, coalesce
from functools import reduce
spark = SparkSession.builder.getOrCreate()
clients_sdf = spark.createDataFrame(clients.reset_index())
sectors_sdf = spark.createDataFrame(sectors.reset_index())
client_data = clients_sdf[week_days]
def is_in_sector(sector):
'''sector : a boolean sequence'''
return ~reduce(lambda x, y: x | y,
map(lambda x, y: x & lit(not y),
client_data, sector))
sectors = {
(rest:=rec.asDict()).pop('sector')[-1]: is_in_sector(rest.values())
for rec in sectors_sdf.collect()
}
client_sector = coalesce(
*(when(is_in_sec, sec_name) for sec_name, is_in_sec in sectors.items())
)
answer = clients_sdf.withColumn('Sector', client_sector).select('Client', 'Sector')
answer.show()