1

i have a code:

list_files = glob.glob("/t/main_folder/*/file_*[0-9].csv")
test = sorted(list_files, key = lambda x:x[-5:])

so this code has helped me to find files that i need to work with. I found 5 csv files in different folders. next step-im using a code down below , to work with every file i found, i need to use full outer join for every file, firstly for main_folder/folder1/file1.csv, then for main_folder/folder2/file2 and etc etc. until last file that was found one-by-one. thats why i need loop

  
df_deltas = spark.read.format("csv").schema(schema).option("header","true")\
.option("delimiter",";").load(test)
df_mirror = spark.read.format("csv").schema(schema).option("header","true")\
.option("delimiter",",").load("/t/org_file.csv").cache()
df_deltas.createOrReplaceTempView("deltas")
df_mirror.createOrReplaceTempView("mirror")
df_mir2=spark.sql("""select 
coalesce (deltas.DATA_ACTUAL_DATE,mirror.DATA_ACTUAL_DATE) as DATA_ACTUAL_DATE,
coalesce (deltas.DATA_ACTUAL_END_DATE,mirror.DATA_ACTUAL_END_DATE) as DATA_ACTUAL_END_DATE,
coalesce (deltas.ACCOUNT_RK,mirror.ACCOUNT_RK) as ACCOUNT_RK,
coalesce (deltas.ACCOUNT_NUMBER,mirror.ACCOUNT_NUMBER) as ACCOUNT_NUMBER,
coalesce (deltas.CHAR_TYPE,mirror.CHAR_TYPE) as CHAR_TYPE,
coalesce (deltas.CURRENCY_RK,mirror.CURRENCY_RK) as CURRENCY_RK,
coalesce (deltas.CURRENCY_CODE,mirror.CURRENCY_CODE) as CURRENCY_CODE,
coalesce (deltas.CLIENT_ID,mirror.CLIENT_ID) as CLIENT_ID,
coalesce (deltas.BRANCH_ID,mirror.BRANCH_ID) as BRANCH_ID,
coalesce (deltas.OPEN_IN_INTERNET,mirror.OPEN_IN_INTERNET) as OPEN_IN_INTERNET
from mirror
full outer join deltas on
deltas.ACCOUNT_RK=mirror.ACCOUNT_RK

                """)

df_deltas = spark.read.format("csv").schema(schema).option("header","true")\
.option("delimiter",";").load(test)--HERE I'M USING MY CODE TO FILL THE .LOAD WITH FILES

how is it possible to make a loop for the first found file, then for the second and so on?

nox8315
  • 23
  • 4

1 Answers1

1

You can use a for loop to do that,

for idx, file in enumerate(test):
    globals()[f"df_{idx}"] = spark.read.format("csv").schema(schema).option("header","true").option("delimiter",";").load(file)

This will create DFs in the global namespace with names df_0 for the first file, df_1 for the second file, and so on. Then you can use this DF to do whatever you want

Tushar Patil
  • 748
  • 4
  • 13
  • this helped me, thanks a lot. maybe do you know, how can i avoid of using dataframes, that was already used? for example if i used this script for the first time, i filled my table with data from my files, how can i let my code to understand, that i dont need to use for example this 3 files, that was already used, but only new files, that came for example today, or yesterday? maybe i need to create new file as log.csv and put there all my used dataframes with filename? is it possible to make it? – nox8315 Feb 20 '23 at 08:04
  • Hmm, you can use the created time of the file to achieve this, for example, you had 3 files yesterday and you processed them, after that today you have 2 more files, then you apply a filter saying if the creation time is greater than or equal to today then only you keep it else skip it. You can refer to this on how to get the creation time of a file https://stackoverflow.com/a/39501288/11713502 – Tushar Patil Feb 20 '23 at 08:13
  • but for example if im using this script, after script will be done i will immediatly create new folder with new file and run script again? this wont work as i see? but if my df is already in log.csv it wont use this df in script. is it possible? – nox8315 Feb 20 '23 at 08:27
  • If filename doesn’t exist in column filename in log.csv then use in loop else skip, something like this is possible? :) – nox8315 Feb 20 '23 at 09:41
  • I am not able to understand your flow, how are the files coming and how are you processing them and how are you storing the results? and what is the frequency of the files? – Tushar Patil Feb 20 '23 at 10:00
  • I have files in folders, and I need to use joins for all files that exist in path, as I did by your help, but now if I will start script again, it will do the same, but I need for example just a message from a loot, if all files was used by loot, that there is no more files to add in script, or if I will create new file and start loop-the loop will do only created file, not all files again. So I think I can make like an extra csv file where I will write all used path, and by this file the script should understand that Ihave already used that file and it need to be skipped – nox8315 Feb 20 '23 at 11:24
  • Yes, then you can use the `log.csv` you were mentioning. After doing all the processing, you can store the names of the processed file in `log.csv` and next time check before reading if the file name is present in `log.csv` – Tushar Patil Feb 20 '23 at 12:54