0

I have 2 datasets. In each one I have several columns. But I want to use only 2 columns from each dataset, without doing any join, merge or combination between the both of the datasets.

Example dataset 1:

column_dataset_1 <String>    |      column_dataset_1_normalized <String>
-----------------------------------------------------------------------
11882621-V021BRP161305-1     |      11882621V021BRP1613051
-----------------------------------------------------------------------
W-B.7120RP1605794            |      WB7120RP1605794
-----------------------------------------------------------------------
D/57RP.1534421               |      D57RP1534421
-----------------------------------------------------------------------
125858G_022BR/P070751        |      125858G022BRP070751
-----------------------------------------------------------------------
300B.5190C57/51507           |      300B5190C5751507
-----------------------------------------------------------------------

Example dataset 2

column_dataset_2 <String>                                                           |       column_dataset_2_normalized <String>
-------------------------------------------------------------------------------------------------------------------------------------------------------------
Por ejemplo, si W-B.7120RP1605794se trata de un archivo de texto,                   |  PorejemplosiWB7120RP1605794setratadeunarchivodetexto 
-------------------------------------------------------------------------------------------------------------------------------------------------------------     
se abrirá en un programa de procesamiento de texto.                                 |  seabrirenunprogramadeprocesamientodetexto
-------------------------------------------------------------------------------------------------------------------------------------------------------------
                                                                                    |
-------------------------------------------------------------------------------------------------------------------------------------------------------------
utilizados 125858G_022BR/P070751 frecuentemente (por ejemplo, un texto que describe |  utilizados125858G022BRP070751frecuentementeporejemplountextoquedescribe

--------------------------------------------------------------------------------------------------------------------------------------------------------------

column_dataset_1_normalized is the result of column_dataset_1 is normalized column_dataset_2_normalized is the resut of column_dataset_2 is normalized

I want to compare column_dataset_1_normalized if is exist in column_dataset_2_normalized. If yes I should extract it from column_dataset_2

Example:

WB7120RP1605794 is in the second line of column_dataset_1_normalized, is exist in the first line of column_dataset_2_normalized, so I should extract it's real value [W-B.7120RP1605794], from column_dataset_2 and store it in a new column in dataset 2.

And the same for 125858G022BRP070751 is in forth line in column_dataset_2_normalized, I should extract it from column_dataset_2 [125858G_022BR/P070751]. The comparaison should, take one by one value of column_dataset_1_normalized and search it in all the cell of column_dataset_2_normalized.

For normalization I used this code to kepp only number and letter:

df = df.withColumn(
        "column_normalized",
        F.regexp_replace(F.col("column_to_normalize"), "[^a-zA-Z0-9]+", ""))

Someone can propose me a suggestion how can I do it ? Thank you

verojoucla
  • 599
  • 2
  • 12
  • 23
  • And why don't you want to do this with a join? It's the canonical answer to the problem you're describing. Can you elaborate on what you have tried that didn't work? – Oliver W. Nov 29 '19 at 12:37
  • @OliverW. Thanks for your answer. I don't have a common columns between the datasets. For this reason I asked the question. I'm trying to find a solution on my side. hopeful I find a suggestion / help from you. – verojoucla Nov 29 '19 at 12:44
  • 1
    `df1.join(df2, expr('column_dataset_2_normalized rlike column_dataset_1_normalized')).select('column_dataset_1')` – jxc Nov 29 '19 at 12:46
  • 1
    @verojoucla, you don't need a common column when join two dataframes. you can also use function `instr(...) > 0` in the join condition.. – jxc Nov 29 '19 at 12:48

1 Answers1

1

There are various way to join two dataframes:

(1) find the location/position of string column_dataset_1_normalized in column_dataset_2_normalized by using SQL function locate, instr, position etc, return a position (1-based) if exists

    from pyspark.sql.functions import expr

    cond1 = expr('locate(column_dataset_1_normalized,column_dataset_2_normalized)>0')
    cond2 = expr('instr(column_dataset_2_normalized,column_dataset_1_normalized)>0')
    cond3 = expr('position(column_dataset_1_normalized IN column_dataset_2_normalized)>0')

(2) use regex rlike to find column_dataset_1_normalized from column_dataset_2_normalized, this is only valid when no regex meta-characters is shown in column_dataset_1_normalized

    cond4 = expr('column_dataset_2_normalized rlike column_dataset_1_normalized')

Run the following code and use one of the above conditions, for example:

df1.join(df2, cond1).select('column_dataset_1').show(truncate=False)
+---------------------+
|column_dataset_1     |
+---------------------+
|W-B.7120RP1605794    |
|125858G_022BR/P070751|
+---------------------+

Edit: Per comments, the matched sub-string might not be the same as df1.column_dataset_1, so we will need to reverse-engineer the sub-string from the normalized string. Based on how the normalization is conducted, the following udf might help (notice this will not cover any leading/trailing non-alnum that might be in the matched). Basically, we will iterate through the string by chars and find the start/end index of the normalized string in the original string, then take the sub-string:

from pyspark.sql.functions import udf

@udf('string')
def find_matched(orig, normalized):
  n, d = ([], [])
  for i in range(len(orig)):
    if orig[i].isalnum(): 
      n.append(orig[i])
      d.append(i)
  idx = ''.join(n).find(normalized)
  return orig[d[idx]:d[idx+len(normalized)]] if idx >= 0 else None

df1.join(df2, cond3) \
   .withColumn('matched', find_matched('column_dataset_2', 'column_dataset_1_normalized')) \
   .select('column_dataset_2', 'matched', 'column_dataset_1_normalized') \
   .show(truncate=False)

+------------------------------------------------------------------------------------+-----------------------+---------------------------+
|column_dataset_2                                                                    |matched                |column_dataset_1_normalized|
+------------------------------------------------------------------------------------+-----------------------+---------------------------+
|Por ejemplo, si W-B.7120RP-1605794se trata de un archivo de texto,                  |W-B.7120RP-1605794     |WB7120RP1605794            |
|utilizados 125858G_022BR/P-070751 frecuentemente (por ejemplo, un texto que describe|125858G_022BR/P-070751 |125858G022BRP070751        |
+------------------------------------------------------------------------------------+-----------------------+---------------------------+
jxc
  • 13,553
  • 4
  • 16
  • 34
  • Thank you, I will test it. But I would like to extrcat the value of the reference from column_dataset_2, because sometime the column_dataset_2 and column_dataset_1 are differents. – verojoucla Nov 29 '19 at 14:18
  • @verojoucla, just add `column_dataset_2` into the select method, is that what you need? `df.join(...).select('column_dataset_1', 'column_dataset_2')` – jxc Nov 29 '19 at 15:51
  • I should take one by one from column_dataset_1_normalized, I search it in all column_dataset_2_normalized column. If I find it, I extract it from column_dataset_2. I already put an example about (WB7120RP1605794) in my question. – verojoucla Dec 02 '19 at 08:00
  • I hope that you understood my need. Thank you – verojoucla Dec 02 '19 at 12:47
  • you meant the sub-string you are looking for in column_dataset_2 might be different from column_dataset_1 even if the normalized strings match? If that's the case, I will have to know how you normalize these string. – jxc Dec 02 '19 at 13:35
  • I edited my question, I added a code how I normalized the columns. About your question, if the sub-string that I'm looking for in column_dataset_2 might be different or no from column_dataset_1 even if the normalized strings match. Some times are mathed some times no. – verojoucla Dec 02 '19 at 14:01
  • what I did so far, I found the column_dataset_2_normalized that exist in column_dataset_1_normalized. I joined the two datasets and I created a dataframe that contain the 4 columns. Now, I should extract the column_dataset_1_normalized from column_dataset_2, I mean extract it's value without normalization. I already used your solution "cond3" – verojoucla Dec 02 '19 at 14:24
  • this might require a udf and will not be able to cover the leading and trailing non-alnum if exists. – jxc Dec 02 '19 at 14:34
  • ok let's try please, can you suggest me a solution, thank you very much – verojoucla Dec 02 '19 at 14:41
  • 1
    BTW. for testing purpose, I added some non-alnum chars into your original example texts. – jxc Dec 02 '19 at 15:36
  • Thank you for your answer. Can you please tell me why I got this error, it concern the withcolumn step: Some Error messages: Py4JJavaError: An error occurred while calling o6207.withColumn. : java.lang.UnsupportedOperationException: functionExists is not supported Thank you very much – verojoucla Dec 03 '19 at 12:07
  • this is most likely from your spark version. I only tested it on spark 2.4. older version might not support the spark.udf.register syntax. I can revert this back to the previous version of the post. – jxc Dec 03 '19 at 12:12
  • I tried to change the udf using: udf_matched = udf(find_matched, 'string') Then I used udf_matched function in withColumn, but it's not working – verojoucla Dec 03 '19 at 13:18
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/203530/discussion-between-jxc-and-verojoucla). – jxc Dec 03 '19 at 13:58
  • I'm very sorry, but really I very need to your help please. Can you help me in this question: https://stackoverflow.com/questions/59228817/extract-string-from-text-pyspark have a nice weekend and thank you very much in advance :) – verojoucla Dec 07 '19 at 19:06
  • 1
    @verojoucla, I think you already had an answer there. have a good weekend. – jxc Dec 07 '19 at 22:20
  • I know that I disturbed you, I'm sorry, but really I didn't find a solution for the issue of the performance when I use a big data. The proposed solution, doesn't work because the build took more than 5 hours and still running. I'm really need to your experience. Thanks a lot. I'm still talking about the question https://stackoverflow.com/questions/59228817/extract-string-from-text-pyspark/59229857?noredirect=1#comment104710107_59229857 – verojoucla Dec 09 '19 at 18:46
  • @verojoucla, there is nothing too much we can do in the SQL end. probably have to find the bottleneck and tune the cluster configuration. – jxc Dec 10 '19 at 03:54
  • Hi, @jxc :) Can you give a suggestion about my question please ? https://stackoverflow.com/questions/59931770/sum-of-column-values-pyspark Thanks – verojoucla Jan 27 '20 at 13:39
  • @verojoucla, added an answer, as I remember your Spark version is below 2.4, so you might need to use explode or udf solution. – jxc Jan 27 '20 at 21:16
  • Hi @jxc I know that I disturbed you more, I'm sorry. I trust only on you. Can you please give a suggestion about my question and take your time until tomorrow, I will accept only your solution. Thank you very much https://stackoverflow.com/questions/60190375/compare-two-datasets-in-pyspark – verojoucla Feb 12 '20 at 14:22
  • 1
    Hi, @verojoucla, I am not online as often as I did last year due to my schedules. but I believe you can get enough help from the community anyway:) – jxc Feb 13 '20 at 04:47