Since Django loads its code dynamically, I am currently facing the problem of not being able to use class variables.
My intention: I wanted to run a few threads that repetitively do the same thing over and over again. However, the threads should only exist once. That's why I collected them in a dictionary.
Problem: The dictionary is reset again and again, because Django constantly reloads code.
The question now is: How can I solve this cleverly?
This is the Wrapper module with thread & abstract parent class. If you want a spcecific task you have to inherit from here:
class TaskThread(Thread):
def __init__(self, delay_time: timedelta = None, repeat_time: timedelta = None, task: Callable = None, thread_name: str = None, daemon=None, *args, **kwargs):
super().__init__(name=thread_name, daemon=daemon, args=args, kwargs=kwargs)
self.sleep_time = delay_time
if repeat_time is None:
raise ValueError('No repeat interval given')
else:
self.repeat_time = repeat_time
if task is None:
raise ValueError('No task given')
else:
self.task = task
self.next_execution: Optional[datetime.datetime] = None
def run(self):
if self.sleep_time:
time.sleep(self.sleep_time.total_seconds())
del self.sleep_time
self.execute_task()
def execute_task(self):
self.next_execution = self.__calculate_next_execution__()
time.sleep(self.repeat_time.total_seconds())
self.task()
self.execute_task()
def get_next_execution(self) -> datetime.datetime:
return self.next_execution
def __calculate_next_execution__(self) -> datetime.datetime:
current_time = timezone.now()
return current_time + self.repeat_time
REPEATING_TASKS: Dict[str, TaskThread] = dict()
class RepeatingTasks(ABC):
TIME_ZONE = pytz.timezone(LOCAL_TIME_ZONE)
@classmethod
def start(cls):
print('NAME: ', cls.__name__)
print('DICT: ', REPEATING_TASKS)
if not REPEATING_TASKS.get(cls.__name__):
print(f'NEW THREAD')
if not cls.start_immediately():
delay = cls.__calculate_first_start_time__(cls.start_first_time_clock(), cls.start_first_time_delay())
task_thread = TaskThread(delay_time=delay, repeat_time=cls.repeat_time(), task=cls.task, thread_name=cls.task_name(), daemon=True)
else:
task_thread = TaskThread(repeat_time=cls.repeat_time(), task=cls.task, thread_name=cls.task_name(), daemon=True)
REPEATING_TASKS[cls.__name__] = task_thread
task_thread.start()
else:
print(f'What do we say to the god of death? NOT TODAY!!!')
@classmethod
@abstractmethod
def task(cls):
pass
@classmethod
@abstractmethod
def task_name(cls) -> str:
pass
@classmethod
def start_immediately(cls) -> bool:
return True
@classmethod
def repeat_time(cls) -> timedelta:
return timedelta(weeks=0, days=0, hours=0, minutes=0, seconds=0, milliseconds=0, microseconds=0)
@classmethod
def start_first_time_clock(cls) -> datetime.time:
return datetime.time(hour=0, minute=0, second=0, microsecond=0)
@classmethod
def start_first_time_delay(cls) -> timedelta:
return timedelta(weeks=0, days=0, hours=0, minutes=0, seconds=0, milliseconds=0, microseconds=0)
@classmethod
def next_execution(cls) -> datetime:
if REPEATING_TASKS.get(cls.__name__):
return REPEATING_TASKS.get(cls.__name__).next_execution
else:
raise ValueError(f'Thread ({cls.task_name()}) does not exist. It may never have started.')
@classmethod
def __calculate_first_start_time__(cls, start_time_clock: datetime.time, start_time_date: timedelta) -> timedelta:
current_time = timezone.now()
start_thread_time: datetime = datetime.datetime(year=current_time.year, month=current_time.month, day=current_time.day)
if start_time_clock.tzinfo:
start_thread_time = pytz.timezone(start_time_clock.tzinfo.__str__()).localize(start_thread_time)
else:
start_thread_time = cls.TIME_ZONE.localize(start_thread_time)
start_thread_time = start_thread_time.replace(hour=start_time_clock.hour, minute=start_time_clock.minute, second=start_time_clock.second, microsecond=start_time_clock.microsecond)
if current_time > start_thread_time:
start_thread_time = start_thread_time + timedelta(days=1)
return (start_thread_time - current_time) + start_time_date
This is one child class with a task:
class GarbageCollectorDB(RepeatingTasks):
NAME = 'GarbageCollectorDB'
START_IMMEDIATELY = False
START_TIME = time(hour=12, minute=0, second=0, microsecond=0)
START_DELAY = timedelta(weeks=0, days=3, hours=0, minutes=0, seconds=0, milliseconds=0, microseconds=0)
REPEATING_TIME = timedelta(weeks=0, days=7, hours=0, minutes=0, seconds=0, milliseconds=0, microseconds=0)
@classmethod
def task_name(cls) -> str:
return cls.NAME
@classmethod
def start_immediately(cls) -> bool:
return cls.START_IMMEDIATELY
@classmethod
def repeat_time(cls) -> timedelta:
return cls.REPEATING_TIME
@classmethod
def start_first_time_clock(cls) -> time:
return cls.START_TIME
@classmethod
def start_first_time_delay(cls) -> timedelta:
return cls.START_DELAY
@classmethod
def task(cls):
cls.__delete_all_unverified_users__()
@classmethod
def __delete_all_unverified_users__(cls):
current_time = timezone.now()
allowed_delay = timedelta(seconds=EMAIL_TOKEN_LIFE_IN_SEC)
all_users = User.objects.all()
for user in all_users:
if not user.verified_email and (current_time - user.date_joined) > allowed_delay:
user.delete()
Here is the class loaded:
class UserConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'user'
def ready(self):
from user.models import User, Profile
from user.signals import __create_profile__, __save_profile__, __delete_profile_picture__
post_save.connect(__create_profile__, sender=User)
post_save.connect(__save_profile__, sender=User)
post_delete.connect(__delete_profile_picture__, sender=Profile)
if CLEAN_DB_FROM_INVALID_USERS:
from user.service import GarbageCollectorDB
GarbageCollectorDB.start()
Problem here: The dictionary ("REPEATING_TASKS") is emptied again and again. Therefore I have multiple threads, which do the same.