I'm new to python, but I'm trying to learn it by myself for a university course. Therefore I have to program a pygame window that represents a drone fly zone (The drone should fly automated like a search helicopter in a given area and on the window a visual representation of what happens should be displayed...).
How an older version of the pygame window look's like when the virtual drone flies.
So far so good.
Now I have to link the movements of the game-represented drone with the real Tello drone. I figured out that using threading for the movement commands to the drone is the way to go when the pygame window should run the whole time.
But now I got stuck because I have to delay the automated movements in pygame in order to wait for the drone flight time and response.
I update the pygame window with a clock
set to tick(60)
in the loop of the game (therefore I created a Clock clock = pygame.time.Clock()
in the initialization of the pygame program), but for the automated drone movements I created an Userevent.
The drone movements is trigger like a Snake game via a Userevent witch should be triggered every second.
Therefor I implemented a timer:
self.SCREEN_UPDATE = pygame.USEREVENT
self.screen_update = pygame.time.set_timer(self.SCREEN_UPDATE, 1000).
But now I have to delay the automated movements in pygame every time the drone should do a movement. Otherwise, the pygame program run's too fast for the real-world problem. (The User event gets starter again before the real drone has finished his command)
I also tried to implement break functions for that, but they do not work.
def break_rotation(self):
print("start waiting")
self.screen_update.time.wait(2000)
#self.screen_update = pygame.time.set_timer(self.SCREEN_UPDATE, 2000)
#pygame.time.wait(2000)
print("end waiting")``
def break_move(self):
print("start waiting")
self.screen_update.time.wait(3000)
#self.screen_update = pygame.time.set_timer(self.SCREEN_UPDATE, 3000)
#pygame.time.wait(3000)
print("end waiting")
I also tried the comment lines. pygame.time.wait()
makes the whole pygame wait. But You should still be able to use your keyboard for emergency landings and other stuff.
So I only have to delay the next Userevent, where the hole flypath for the next block / section get's tested for the automated flypath and the program decides to fly forward or should it turn.
Does anyone have an Idear how I could achieve that delay on the timer? Sadly I also couldn't find some similar problems.
EDIT 1:
@CmdCoder858
I think this code may hold the update back for a couple of seconds.
When using the and
operator combined with a passed variable.
if event.type == SCREEN_UPDATE and x == 1:
I also tried using multiple events but that didn't work probably.
if event.type == SCREEN_UPDATE and event.type == drone_task_finished:
Interestingly it ignored the hole and operator this way.
import pygame
import threading
import _thread
import time
def task():
global x
# placeholder for drone task
print('Test thread start')
time.sleep(5) # simulate delay in the task
x = x+1
print('Thread ends')
pygame.event.post(pygame.event.Event(drone_task_finished))
if __name__ == '__main__':
pygame.init()
x = 0
drone_task_finished = pygame.USEREVENT # an integer representing the first user event
drone_task = threading.Thread(target=task) # create a thread with a task function
drone_task.start()
pygame.display.set_caption("Overview of the Search Area")
window = pygame.display.set_mode((500, 300))
SCREEN_UPDATE = pygame.USEREVENT
screen_update = pygame.time.set_timer(SCREEN_UPDATE, 1000)
print('Start event loop')
while 1:
for event in pygame.event.get():
if event.type == pygame.QUIT:
exit()
if event.type == SCREEN_UPDATE and x == 1:
print('Hura Timer paused for 5 sec')
x = 0
drone_task = threading.Thread(target=task) # create a thread with a task function
drone_task.start()
if event.type == SCREEN_UPDATE:
print('Update nach 1sec')
if event.type == drone_task_finished: # detect if a drone_task_finished event has been posted
print('Drone task has finished')
But is there any reason not to use import _thread
?
With that, I could call the function multiple times in new threads.
Therefore I have to change the task function to def task(threadName):
Now I could simply use _thread.start_new_thread(task, ("",))
every time the task should be used later on.