I have a specific ETL problem: in my case data comes from a paginated resource, and can be fetched by pages of fixed size. So the call to the API specifies page 1, page 2, page 3 etc.
According to my plan each page is then transformed and loaded.
So an ETL of a page is an etl_batch
flow (of atomic (page)tasks), but then I would like to run ETL over the whole paginated resource, until the last available page. So I thought this should be solved by a flow with a loop (called every hour, for example) over etl_batch
flows (called every couple of seconds). Based on references below I realized the model has to be cast as a flow of a task of (page)flows of (page)tasks.
docs.prefect.io/core/advanced_tutorials/task-looping.html
stackoverflow.com/questions/68103561/looping-tasks-in-prefect
Here's the issue I encountered : if I specify the schedule for etl_batch
flow its state appears to be reset.
But I comment out etl_batch
flow schedule schedule_every_second
, then the loop is working correctly, i.e page
is incremented (please see the snippet below). However, for this ETL pipeline it is important to run etl_batch
flow and (page)tasks on schedule due to possible saturation of the API.
I assume the problem is due to the fact that cache is being cleared when etl_batch
flow is run on schedule (I tried to insert cache_for=
here and there to no avail). Do you have any suggestions?
from datetime import timedelta
from prefect import task, Flow, context, Parameter
from prefect.schedules import IntervalSchedule
from prefect.engine.signals import LOOP
schedule = IntervalSchedule(
start_date=None,
interval=timedelta(seconds=3),
)
schedule_every_second = IntervalSchedule(
start_date=None,
interval=timedelta(seconds=1),
)
@task
def next_page(page):
print(f" {page}")
return page + 1
@task
def paginate():
current_page = context.get("task_loop_result", 1)
with Flow('etl_batch',
# schedule=schedule_every_second
) as flow:
x = Parameter('x')
add = next_page(x)
state = flow.run(parameters={'x': current_page})
nr = state.result[add]._result.value
if nr > 3:
return nr
raise LOOP(result=nr)
if __name__ == "__main__":
with Flow("etl", schedule=schedule) as etl_flow:
r = paginate()
f = etl_flow.run()