0

I have a pyspark dataframe of ~1.7billion rows with the schema:

INPUT SCHEMA
id  
ip  
datetime

and I am trying to find the modal ip for each id

I currently have a function where I make a separate table of

INT TABLE
id
ip
number_of_records

and then filter that for the modal ip

This seems incredibly slow and bulky, what is a more efficient way to get the modal ip for each ip?

Proposed Output Schema
id
modal_ip

Thanks all!

Joe S
  • 410
  • 6
  • 16
  • What do you mean by "modal ip"? – absolutelydevastated Sep 09 '19 at 14:44
  • 1
    I think this is pretty much what you want: https://stackoverflow.com/a/36695251/5368910 – timchap Sep 09 '19 at 14:48
  • @absolutelydevastated the ip address that has the most records associated with it for each id. ie if there is ip1 with 3 records for that id and ip2 with 5 records on that id then choose ip2 – Joe S Sep 09 '19 at 14:55
  • @timchap That is basically exactly what I have, I was looking for a more elegant/faster solution that does not require so much heavy lifting here – Joe S Sep 09 '19 at 14:56
  • What if there are multiple modes? – absolutelydevastated Sep 09 '19 at 14:58
  • then choose both! – Joe S Sep 09 '19 at 14:59
  • You could reduce the requirement to sort and then filter if you wanted: instead, you could take each id-grouped partition and just iterate over it once to find the most-prevalent IP. e.g. a `reduceByKey` with `key=(ID, IP)` and `f=lambda a, b: max(a, b)`. This won't find multiple modes though. – timchap Sep 09 '19 at 15:04
  • @timchap that looks interesting, and there arent very many ties, could you show that in code? – Joe S Sep 09 '19 at 15:08
  • I think the `groupBy`, `count` then `window` method is already the best way to go about doing this. Although I'd suggest that you `rank` the IPs instead of taking `row_number`. What is it that you don't like about that solution? – absolutelydevastated Sep 09 '19 at 15:08

1 Answers1

1

Expanding on my comments, here's a solution which demonstrates how you can technically achieve this in two passses of the data - one to count, and one to reduce and find (multiple) modes. I've implemented the second part with the RDD API - translating into the DataFrame API is left to the reader ;) (tbh I don't know if it's even possible to do custom aggregations with multiple output rows like this):

from pyspark.sql import types

import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

# Example data
data = [
    (0 ,'12.2.25.68'),
    (0 ,'12.2.25.68'),
    (0 ,'12.2.25.43'),
    (1 ,'62.251.0.149'),  # This ID has two modes
    (1 ,'62.251.0.140'),
]

schema = types.StructType([
    types.StructField('id', types.IntegerType()),
    types.StructField('ip', types.StringType()),
])

df = spark.createDataFrame(data, schema)

# Count id/ip pairs
df = df.groupBy('id', 'ip').count()

def find_modes(a, b):
    """
    Reducing function to find modes (can return multiple). 

    a and b are lists of Row
    """
    if a[0]['count'] > b[0]['count']:
        return a
    if a[0]['count'] < b[0]['count']:
        return b
    return a + b

result = (
    df.rdd
    .map(lambda row: (row['id'], [row]))
    .reduceByKey(find_modes)
    .collectAsMap()
)

Result:

{0: [Row(id=0, ip='12.2.25.68', count=2)],
 1: [Row(id=1, ip='62.251.0.149', count=1),
 Row(id=1, ip='62.251.0.140', count=1)]}

Small caveat to this approach: because I aggregate repeated modes in-memory, if you have many different IPs with the same count for a single ID, you do risk OOM issues. For this particular application, I'd say it's very unlikely (e.g. a single user probably won't have 1 million different IPs, all with 1 event).

But I tend to agree with @absolutelydevastated, the simplest solution is probably the one you have already, even if it has an extra pass of the data. But you should probably avoid doing a sort/rank and instead just seek the max count in the window if possible.

timchap
  • 503
  • 2
  • 11