1

Don't know if the headline is good enough. Feel free to adjust it!

Here's the situation: I got a dataframe that is basically a product catalogue. In this there are two important columns. One is the product ID and one is a 12-digit category. This is some sample data. Of course, the original data contains many more products, more columns and many different categories.

products = [
    {'category': 110401010601, 'product': 1000023},
    {'category': 110401020601, 'product': 1000024},
    {'category': 110401030601, 'product': 1000025},
    {'category': 110401040601, 'product': 1000026},
    {'category': 110401050601, 'product': 1000027}]

pd.DataFrame.from_records(products)

The task is to use the 12-digit category number to form parent-categories and use those parents to count the number of products that match that parent category. The parent-categories are formed in 2-digit steps. The counts per parent are later on used to find a parent for each product that has a minimum number of records (let's say 12 children). Of course, the shorter the number gets, the more products will match that number. Here's an example parent structure:

110401050601 # product category
1104010506 # 1st parent
11040105 # 2nd parent
110401 # 3rd parent
1104 # 4th parent
11 # 5th super-parent

You see that there may be many more products matching for instance the 1104 instead of just the 110401050601.

Idea 1 for Small Data: As long as you have small or medium size data fully loaded into a Pandas dataframe, this is an easy task. I solved it with this code. The disadvantage is that this code assumes that all data is in memory and each loop is another select into the full dataframe, which is not good in terms of performance. Example: for 100.000 rows and 6-parent groups (formed from the 12-digits) you may end up with 600.000 select via DataFrame.loc[...] thus growing gradually (worst case). To prevent this I'm breaking the loop if the parent has been seen before. Remark: the df.shape[0] method is similar to len(df).

df = df.drop_duplicates()
categories = df['category'].unique()

counts = dict()
for cat in categories:
    counts[cat] = df.loc[df['category'] == cat].shape[0]

    for i in range(10,1,-2):
        parent = cat[:i]

        if parent not in counts:
            counts[parent] = df.loc[df['category'].str.startswith(parent)].shape[0]
        else:
            break

counts = {key: value for key, value in counts.items() if value >= MIN_COUNT}

Which results in something like this (using parts of my original data):

{'11': 100,
 '1103': 7,
 '110302': 7,
 '11030202': 7,
 '1103020203': 7,
 '110302020301': 7,
 '1104': 44,
 '110401': 15,
 '11040101': 15,
 '1104010106': 15,
 '110401010601': 15}

Idea 2 for Big Data using flatmap-reduce: Now imagine you have much much more data which is loaded row-wise and you want to achieve the same thing as above. I was thinking of using flatmap to split the category number into its parents (one to many) using a 1-counter for each parent and then apply groupby-key to get the count for all possible parents. The advantage of this version is, that it doesn't need all data at once and that it is not doing any selects into the dataframe. But in the flatmap-step the number of rows increases by a factor of 6 (due to 12-digit category number split into 6 groups). Since Pandas has no flatten/flatmap method, I had to apply a work-around using unstack (for explanation see this post).

df = df.drop_duplicates()
counts_stacked = df['category'].apply(lambda cat: [(cat[:i], 1) for i in range(10,1,-2)])
counts = counts_stacked.apply(pd.Series).unstack().reset_index(drop=True)

df_counts = pd.DataFrame.from_records(list(counts), columns=['category', 'count'])
counts = df_counts.groupby('category').count().to_dict()['count']
counts = {key: value for key, value in counts.items() if value >= MIN_COUNT}

Question: Both solutions are fine, but I wonder if there is a more elegant way to achieve the same result. I feel that I've missed something.

Matthias
  • 5,574
  • 8
  • 61
  • 121
  • What are you trying to optimize? Speed, memory use, readability? Elegance is a bit vague. – Zev Apr 14 '18 at 00:34
  • Basically I’m looking for a version that can be easily transferred to PySpark or Apache Beam that are all somewhat line based on the map-reduce concept. My second code is similar to that but is exploding the number of lines a lot. Here I’m interested if there is another way to do that. – Matthias Apr 14 '18 at 07:32

2 Answers2

1

You can using cumsum here

df.category.astype(str).str.split('(..)').apply(pd.Series).replace('',np.nan).dropna(1).cumsum(1).stack().value_counts()
Out[287]: 
11              5
1104            5
110401          5
11040102        1
110401050601    1
1104010206      1
110401040601    1
11040101        1
1104010106      1
110401010601    1
110401020601    1
11040104        1
110401030601    1
11040103        1
1104010406      1
1104010306      1
11040105        1
1104010506      1
dtype: int64
BENY
  • 317,841
  • 20
  • 164
  • 234
  • Nice. Didn’t know that you can use split(‘(..)’). This solution strongly depends on pandas and won’t work in PySpark or Apache Beam I guess – Matthias Apr 14 '18 at 07:30
0

Here's another solution using Apache Beam SDK for Python. This is compatible to Big Data using the map-reduce paradigm. The sample file should contain the product id as first column and 12-digit category as second column using the ; as separator. The elegance of this code is that you can nicely see each transformation per line.

# Python 2.7

import apache_beam as beam
FILE_IN = 'my_sample.csv'
SEPARATOR = ';'

# the collector target must be created outside the Do-Function to be globally available
results = dict()

# a custom Do-Function that collects the results
class Collector(beam.DoFn):    
    def process(self, element):
        category, count = element
        results[category] = count
        return { category: count }


# This runs the pipeline locally.
with beam.Pipeline() as p:
    counts = (p
     | 'read file row-wise' >> beam.io.ReadFromText(FILE_IN, skip_header_lines=True)
     | 'split row' >> beam.Map(lambda line: line.split(SEPARATOR))
     | 'remove useless columns' >> beam.Map(lambda words: words[0:2])
     | 'remove quotes' >> beam.Map(lambda words: [word.strip('\"') for word in words])
     | 'convert from unicode' >> beam.Map(lambda words: [str(word) for word in words])
     | 'convert to tuple' >> beam.Map(lambda words: tuple(words))
     | 'remove duplicates' >> beam.RemoveDuplicates()
     | 'extract category' >> beam.Map(lambda (product, category): category)
     | 'create parent categories' >> beam.FlatMap(lambda cat: [cat[:i] for i in range(12,1,-2)])
     | 'group and count by category' >> beam.combiners.Count.PerElement()
     | 'filter by minimum count' >> beam.Filter(lambda count: count[1] >= MIN_COUNT)
     | 'collect results' >> beam.ParDo(collector)
    )

result = p.run()
result.wait_until_finish()

# investigate the result; 
# expected is a list of tuples each consisting of the category and its count
print(results)

The code is written in Python 2.7 since the Apache Beam SDK for Python does not support Python 3, yet.

Matthias
  • 5,574
  • 8
  • 61
  • 121