0

I have a spark dataframe with location column, which contains names of countries. I need to convert these names to ISO3 code. I know that there is a python library country-converter, but I don't know how to apply it so it converts values of only one column in my dataframe.

Example dataframe:

UserName Location
adam United States
anastasia United Kingdom

I am able to convert country names from the column to codes by transforming them to RDD and after converting again to DF:

import country_converter as coco

out_format = "ISO3"

countries = df.select("Location").rdd.collect()
countries = coco.convert(names=countries, to=out_format, not_found=None)
countriesDF = spark.createDataFrame(countries, StringType())

Output:

value
USA
GBR

However I have two problems with this code:

  1. As a result I create completely different dataframe and I loose information about UserName. I need output to be like this:

Expected output:

UserName Location
adam USA
anastasia GBR
  1. Some of the results are like this: Row(Countries='London, UK'), how can I get rid of this? I use the below code but I wonder if there is faster way than doing it manually for every Row:
countriesDF.replace({"Row(Countries='London, UK')" : "GBR"})
MoOn
  • 47
  • 5

3 Answers3

2

Update:

In case of huge data you can use dictionary to map all the values.

import country_converter as coco
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, lit, collect_set, create_map
from itertools import chain

data = [['United States'],['United Kingdom'],['Not a country']]*200000
df = spark.createDataFrame(data,['countries'])

#Create a {country:contry formatted} dictionary by only using unique values.
unique_countries = df.select("countries").distinct().rdd.flatMap(lambda x: x).collect() 
unique_countries_formatted = coco.convert(unique_countries,to='ISO3', not_found=None)
uc_dict = dict(zip(unique_countries,unique_countries_formatted))

#create a map to apply on df
mapping_expr = create_map([lit(x) for x in chain(*uc_dict.items())])

#apply the df
df = df.withColumn('countries_formatted', mapping_expr[df.countries])

df.show()

Command took 8.85 seconds. 200k records under 8.5sec

You should use UDF to do it.

import country_converter as coco
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, lit

df = spark.createDataFrame([['United States'],
                            ['United Kingdom'],
                            ['Not a country'],
                            [None]],['countries'])

def country_converter(country): # define function here
  if country:
    return coco.convert(country, to='ISO3', not_found=None)
  return None

cc_udf = udf(country_converter, StringType()) #register udf
df = df.withColumn("countries_formatted",cc_udf(df.countries))
df.show()

Output:

+--------------+-------------------+
|     countries|countries_formatted|
+--------------+-------------------+
| United States|                USA|
|United Kingdom|                GBR|
| Not a country|      Not a country|
|          null|               null|
+--------------+-------------------+
Equinox
  • 6,483
  • 3
  • 23
  • 32
  • thank your for your help, using udf worked for keeping df information. Although I have some problem with this solution. On my dataset it took 12 minutes to create dataframe with udf.. Also I need to make some aggregations (groupBy and count) on this df, but after adding aggregations, when I try show(5) it just keeps spinning (I am waiting 1 hour now and still no result). Is there any way to fix it? – MoOn Jan 21 '22 at 06:37
  • UDFs will be slow. since they are not created taking into account for large data. The only way i can think of making it faster is using an dictionary mapping. Like you take all countries the create a dictionary with country as key and formatted names as values. And you `map` the dictionary values to your new column. https://stackoverflow.com/questions/50321549/map-values-in-a-dataframe-from-a-dictionary-using-pyspark – Equinox Jan 21 '22 at 07:42
  • You can either just take distinct values and create a dictionary or use the country data which i assume is used by `country_converter` https://github.com/konstantinstadler/country_converter/blob/master/country_converter/country_data.tsv – Equinox Jan 21 '22 at 07:44
  • @MonikaOnn I have updated the answer with an approach which should be significantly faster. That should also resolve time out of of the show statements – Equinox Jan 21 '22 at 08:24
  • I tried your method with mapping but I keep getting this error: Unsupported literal type class java.util.ArrayList when I try to create a map.. Do you know what could cause it? I removed all special characters and None values from countries list, but still get this error.. – MoOn Jan 21 '22 at 12:00
  • What is the spark version you are using? – Equinox Jan 21 '22 at 14:31
  • I'm using Spark 3.2.0 (running on Databricks Community Edition) – MoOn Jan 22 '22 at 10:05
  • I think the problem is that some values in the dictionary look this way : [Brooklyn NY, Brooklyn NY]. I don't know why there is this array.. I was trying to find solution and the only one I found is that pyspark API can't handle complex arrays and it should be done this way: for array [5,8] df.withColumn("nums", array(lit(5), lit(8))).show(). But I didn't manage to apply it to my case. Also I don't think there should be array in the dictionary at all. Is there a way to handle it when creating dict? I'm sorry for keep bothering you, Im new to pyspark and I really wanna understand this problem.. – MoOn Jan 24 '22 at 08:35
  • You can analyze the values and may be pick one value or concat the values to a string as well. in your example you can either just Pick one value. It depends on the data /previous transformation which was used for creating a structure like that. – Equinox Jan 24 '22 at 10:44
0

For large data I found it seems working:

def get_UNregion( iso2):
    global cc_all
    if 'cc_all' not in globals():
        cc_all = coco.CountryConverter(include_obsolete=True)
    return cc_all.convert(names=iso2, to='UNregion')
Youth overturn
  • 341
  • 5
  • 7
0

Please note that country_converter exclusively supports English country names. If you require a solution that supports multiple languages, consider exploring countrywrangler, which offers support for 34 languages and was developed for speed efficiency.

Here is a straightforward example of its usage:

import countrywrangler as cw

alpha2 = cw.Normalize.name_to_alpha2("Germany")
print(alpha2)

>>> DE

CountryWrangler includes a fuzzy search that is able to detect almost all countries, regardless of variations in formatting styles or spelling errors. It is about 100x times slower than the normal function but compared to other libraries still fast.

import countrywrangler as cw

alpha2 = cw.Normalize.name_to_alpha2("Germany Federal Republic of", use_fuzzy=True)
print(alpha2)

>>> DE

The full documentation can be found here: https://countrywrangler.readthedocs.io/en/latest/normalize/country_name/

Disclosure: As the author of CountryWrangler, I wish to clarify that this answer is not intended to discourage the use of country_converter, but rather to provide an alternative solution for certain use cases.