4

How can I Classify the values of the Clients table with the values of the rows of the Combinations table?

I decide to create a combinations table to develop all combinations from main row (Clients Table).

I am planning to check that the row of the customers coincides with a row of the combinations table to classify it as sector B (Combinations Table).

I have this flow but Dtabricks returns error:

 for i,j in select_df.iterrows():
      for u,v in dfCombinacionesDias.iterrows():
          if (
              (select_df["MONDAY"][i] == registro["LUNES"][u]) 
              and (select_df["TUESDAY"][i] == registro["MARTES"][u]) 
              and (select_df["WEDNESDAY"][i] == registro["MIERCOLES"][u]) 
              and (select_df["THURSDAY"][i] == registro["JUEVES"][u]) 
              and (select_df["FRIDAY"][i] == registro["VIERNES"][u]) 
              and (select_df["SATURDAY"][i] == registro["SABADO"][u]) 
              and (select_df["SUNDAY"][i] == registro["DOMINGO"][u])
          ):
              Sector = "B"
          else:
              Sector = "A"
        
vSubSeq = "('{}','{}')".format(select_df["IDClient"][i],Sector)
sqlInsertSequence = "Insert into {0}.{1} values {2}".format(dSCHEMA, Table, vSubSeq,vdataDeltaPath)
print(sqlInsertSequence)
dfTables = spark.sql(sqlInsertSequence)

I add the image with the different tables (Clients, Combinations and Sector)

Tables

I think that I need a for to loop a table row by row (Combinations table) to compare with a row in clients table if there is a match I save this value in a new table (sector table) and obviously will exist other for to loop the clients table. But I would like to know an algorithm that helps look tables to compare?

Vitalizzare
  • 4,496
  • 7
  • 13
  • 32
AlexZ
  • 229
  • 1
  • 8

3 Answers3

0

I have this flow but Dtabricks returns error:

"Return error"... is vague. For instance, you are using registro which does not seem to be defined anywhere in the code extract you have provided.

And you are not using Databricks' capabilities efficiently. Iterating over rows of DataFrames (Spark) is inefficient, especially with nested loops.
Instead, you can use Spark's DataFrame APIs (from the Apache Spark API reference) to get the desired results more efficiently.

As an alternative approach, without using nested loops, you can:

  • rename the columns of the Combinations DataFrame to match the columns of the Clients DataFrame.
  • join the Clients DataFrame and the Combinations DataFrame on all the days' columns.

After the join, any row from the Clients DataFrame that has a match in the Combinations DataFrame will be classified as "B". Rows without a match will be "A".

from pyspark.sql.functions import col, when

# Assuming you have loaded your data into two DataFrames: df_clients and df_combinations

# Step 1: Rename columns in df_combinations to match df_clients
df_combinations = df_combinations.withColumnRenamed("LUNES", "MONDAY")\
                                 .withColumnRenamed("MARTES", "TUESDAY")\
                                 .withColumnRenamed("MIERCOLES", "WEDNESDAY")\
                                 .withColumnRenamed("JUEVES", "THURSDAY")\
                                 .withColumnRenamed("VIERNES", "FRIDAY")\
                                 .withColumnRenamed("SABADO", "SATURDAY")\
                                 .withColumnRenamed("DOMINGO", "SUNDAY")

# Step 2: Join df_clients with df_combinations
df_joined = df_clients.join(df_combinations, on=["MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY"], how="left_outer")

# Step 3: Create a new column "Sector" based on whether there is a match in df_combinations
df_result = df_joined.withColumn("Sector", when(col("MONDAY").isNotNull(), "B").otherwise("A"))

# Step 4: If you want to store the result in another table
df_result.select("IDClient", "Sector").write.format("delta").save("/path/to/save/location")

That uses DataFrame transformations and actions over explicit loops for operations on Spark DataFrames.

VonC
  • 1,262,500
  • 529
  • 4,410
  • 5,250
0

Idea

I assume that "x" in the posted data example works like a boolean trigger. So why not to replace it with True and empty space with False? After that, we can apply logical operators directly to data. For example, what does it mean that the client's days do not fit in the "Sector B" pattern? Schematically it means any(client_days and not sector_b) is True, as in the following model:

import pandas as pd

week_days = 'mon tue wed thu fri sat sun'.split()
client_days = pd.Series([0,1,0,0,1,0,0], index=week_days)
sector_b = pd.Series([1,0,1,0,1,0,0], index=week_days)

assert any(client_days & ~sector_b)

How to implement this in Pandas

pandas 1.5.1

Let's model this idea in Pandas, as if we could apply toPandas to the original data:

import pandas as pd

week_days = 'mon tue wed thu fri sat sun'.split()
data = [
    [0,1,0,0,1,0,0],
    [1,0,1,0,1,0,0],
    [1,0,1,0,0,0,0],
    [1,0,0,0,0,0,0],
    [1,0,0,0,1,0,0],
    [0,0,1,0,1,0,0],
    [0,0,0,0,1,0,0],
    [0,0,1,0,0,0,0],
    [1,1,1,1,1,1,1],
    [1,0,1,0,0,0,0],
]
clients = pd.DataFrame(
    data,
    index=1 + pd.Index(range(len(data)), name='Client'),
    columns=week_days,
    dtype=bool
)
sectors = pd.DataFrame(
    data=[[1,0,1,0,1,0,0]], 
    index=pd.Index(['Sector B'], name='sector'),
    columns=week_days,
    dtype=bool,
)

In this case we could use dot operator, i.e. scalar product, keeping in mind that addition and multiplication correspond to the or/and operations in the case of boolean data:

answer = (clients @ ~sectors.loc['Sector B']).map({True: 'A', False: 'B'})

Implementation on PySpark

pyspark 3.4.1

Suppose that for some reason we can't use toPandas. Let's reorganize data as if they are PySpark DataFrame:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

clients_sdf = spark.createDataFrame(clients.reset_index())
sectors_sdf = spark.createDataFrame(sectors.reset_index())

How would we implement the idea being restricted to this data type? First, sector's data are small and we can extract them in some sequence (e.g. list). Next, we can apply map for logical AND, then reduce for logical OR, which gives us True for "Sector A" cases and "False" otherwise. After that we apply when from pyspark.sql.functions to map values:

from pyspark.sql.functions import lit, when
from functools import reduce

client_data = clients_sdf[week_days]
sector_b = [*sectors_sdf.where('sector == "Sector B"')[week_days].first()]
not_in_B = map(lambda x, y: x & lit(not y), client_data, sector_b)
is_in_sector_A = reduce(lambda x, y: x | y, not_in_B)
client_sector = when(is_in_sector_A, 'A').otherwise('B')
answer = clients_sdf.withColumn('Sector', client_sector).select('Client', 'Sector')

Output:

>>> answer.show()
+------+------+
|Client|Sector|
+------+------+
|     1|     A|
|     2|     B|
|     3|     B|
|     4|     B|
|     5|     B|
|     6|     B|
|     7|     B|
|     8|     B|
|     9|     A|
|    10|     B|
+------+------+

General case

This is just a fantasy on what it might look like in the general case. Suppose we have these data:

import pandas as pd

week_days = 'mon tue wed thu fri sat sun'.split()
data = [
    [0,1,0,1,0,0,0],    # changed to fit a new Sector A
    [1,0,1,0,1,0,0],
    [1,0,1,0,0,0,0],
    [1,0,0,0,0,0,0],
    [1,0,0,0,1,0,0],
    [0,0,1,0,1,0,0],
    [0,0,0,0,1,0,0],
    [0,0,1,0,0,0,0],
    [1,1,1,1,1,1,1],    # fit Sector C
    [1,0,1,0,0,0,0],
]
clients = pd.DataFrame(
    data,
    index=1 + pd.Index(range(len(data)), name='Client'),
    columns=week_days,
    dtype=bool
)
sectors = pd.DataFrame(     # add Sector A, Sector C
    data=[[0,1,0,1,0,1,0], [1,0,1,0,1,0,0], [1,1,1,1,1,1,1]], 
    index=pd.Index(['Sector A', 'Sector B', 'Sector C'], name='sector'),
    columns=week_days,
    dtype=bool,
)

We can see here 3 sectors presumably arranged in descending order of their priority, which we might want to represent in the final frame by their last letter.

Let's do it in Pandas:

isin_sector = ~(clients @ ~sectors.T)

answer = (
    isin_sector
    .apply(lambda column: column.map({True: column.name[-1]}))
    .agg(lambda row: row.dropna()[0], axis='columns') 
)

display(answer)

Now in PySpark, trying to avoid Pandas API. Here, when applying coalesce, I rely on the fact that dictionaries in Python preserve the order in which items are added:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, coalesce
from functools import reduce

spark = SparkSession.builder.getOrCreate()

clients_sdf = spark.createDataFrame(clients.reset_index())
sectors_sdf = spark.createDataFrame(sectors.reset_index())

client_data = clients_sdf[week_days]

def is_in_sector(sector):
    '''sector : a boolean sequence'''
    return ~reduce(lambda x, y: x | y, 
                   map(lambda x, y: x & lit(not y), 
                       client_data, sector))

sectors = {
    (rest:=rec.asDict()).pop('sector')[-1]: is_in_sector(rest.values())
    for rec in sectors_sdf.collect()
}
client_sector = coalesce(
    *(when(is_in_sec, sec_name) for sec_name, is_in_sec in sectors.items())
)
answer = clients_sdf.withColumn('Sector', client_sector).select('Client', 'Sector')
answer.show()
Vitalizzare
  • 4,496
  • 7
  • 13
  • 32
0

Here's an answer:

import pandas as pd

classification_results = []

for i, row_client in select_df.iterrows():
    Sector = "A"  # Initialize Sector as A
    # Loop through each row in the Combinations table
    for u, row_combination in dfCombinacionesDias.iterrows():
        if (
            (row_client["MONDAY"] == row_combination["LUNES"]) and
            (row_client["TUESDAY"] == row_combination["MARTES"]) and
            (row_client["WEDNESDAY"] == row_combination["MIERCOLES"]) and
            (row_client["THURSDAY"] == row_combination["JUEVES"]) and
            (row_client["FRIDAY"] == row_combination["VIERNES"]) and
            (row_client["SATURDAY"] == row_combination["SABADO"]) and
            (row_client["SUNDAY"] == row_combination["DOMINGO"])
        ):
            Sector = "B"
            break  

    # Append the classification result to the list
    classification_results.append({"IDClient": row_client["IDClient"], "Sector": Sector})

# Create a DataFrame from the classification_results list
sector_df = pd.DataFrame(classification_results)

print(sector_df)

Explanation: This code snippet helps classify values from the Clients table based on matching rows in the Combinations table. It loops through each row in the Clients table, and for each row, it checks if there is a matching row in the Combinations table based on the days of the week. If a match is found, the client is classified as Sector "B," otherwise, it's classified as Sector "A." The classification results are stored in the sector_df DataFrame.