2

I would like to be able to flash an LED continuously while my main while loop continues. I understand that in the following code when the function led_flash() gets called, the script will stop until the while loop defined in the function ends. This prohibits the remainder of the code from running.

import RPi.GPIO as GPIO
import time

GPIO.setmode(GPIO.BCM)
GPIO.setup(25, GPIO.OUT)

def led_flash():
    while TRUE:
        GPIO.output(25, ON)
        time.sleep(1)
        GPIO.output(25, OFF)
        time.sleep(1)

while True:
    if x=1
        led_flash()
        ...do other stuff

I have read that threading will work in this instance, however I have not found an example simple enough for me to grasp. Additionally, if threading, how would I be able to end the led_flash() function thread later in my main while loop?

DNA
  • 42,007
  • 12
  • 107
  • 146
SpaceCase
  • 148
  • 2
  • 10

2 Answers2

3

Based on answers from here and here you can start a thread like this:

import threading
while True:
    if x = 1:
        flashing_thread = threading.Thread(target=led_flash)
        flashing_thread.start()
        #continue doing stuff

Since, in your case, you want to stop the thread (I assume if x doesn't equal 1), then you can create a thread stopping class like so:

import threading
import sys

class StopThread(StopIteration): pass

threading.SystemExit = SystemExit, StopThread

class Thread2(threading.Thread):

    def stop(self):
        self.__stop = True

    def _bootstrap(self):
        if threading._trace_hook is not None:
            raise ValueError('Cannot run thread with tracing!')
        self.__stop = False
        sys.settrace(self.__trace)
        super()._bootstrap()

    def __trace(self, frame, event, arg):
        if self.__stop:
            raise StopThread()
        return self.__trace

And call it like: flashing_thread.stop()

Put it all together to get:

import threading
import sys
import RPi.GPIO as GPIO
import time

class StopThread(StopIteration): pass

threading.SystemExit = SystemExit, StopThread

class Thread2(threading.Thread):

    def stop(self):
        self.__stop = True

    def _bootstrap(self):
        if threading._trace_hook is not None:
            raise ValueError('Cannot run thread with tracing!')
        self.__stop = False
        sys.settrace(self.__trace)
        super()._bootstrap()

    def __trace(self, frame, event, arg):
        if self.__stop:
            raise StopThread()
        return self.__trace
#############################################################

GPIO.setmode(GPIO.BCM)
GPIO.setup(25, GPIO.OUT)

def led_flash():
    while TRUE:
        GPIO.output(25, ON)
        time.sleep(1)
        GPIO.output(25, OFF)
        time.sleep(1)

# x gets defined somewhere

while True:
    if x == 1:
        flashing_thread = Thread2(target=led_flash)
        flashing_thread.start()
        #continue doing stuff
    else:
        if flashing_thread and flashing_thread.isAlive():
            flashing_thread.stop()
Community
  • 1
  • 1
Ryan Schuster
  • 494
  • 4
  • 15
  • @cricket_007 It's part of the asker's code. The asker has defined it in their code and Ryan retained it for clarity (to the asker). – Aaron3468 Apr 06 '16 at 18:09
  • @Aaron3468 word. cricket_007 I added a comment for clarity regarding x. – Ryan Schuster Apr 06 '16 at 18:14
  • 1
    Thank you, this is the most simple I have seen it explained. I assume I could also pass an argument to the thread to change the state of the LED (on, off, flashing) instead of stopping the thread. – SpaceCase Apr 06 '16 at 18:24
  • @SpaceCase yes. A new thread with input arguments looks like: `flashing_thread = threading.Thread(target=led_flash, args=my_args)` where `my_args` is a tuple like `(arg1,arg2,)`. – Ryan Schuster Apr 06 '16 at 18:27
  • Also, if this answer worked for you, do you mind accepting it? – Ryan Schuster Apr 06 '16 at 19:55
  • @SpaceCase Very good observation! If you do choose to have a thread like that, you'll still need some way to end the (now rogue) thread when your program finishes/crashes or you will end up with a resource leak. Then your led will flash constantly and running the program again will result in a fight over led access. Check out [this question](http://stackoverflow.com/questions/1635080/terminate-a-multi-thread-python-program). Don't forget to accept Ryan's answer :) – Aaron3468 Apr 06 '16 at 22:35
  • @ Ryan Schuster when I execute the code I get the error: flashing_thread = threading.Thread2(target=flash_led) AttributeError: 'module' object has no attribute 'Thread2' – SpaceCase Apr 06 '16 at 22:46
  • @SpaceCase Typo on my account! It should be `Thread2(target=flash_led)` not `threading.Thread2(target=flash_led)` seeing as you've already defined the `Thread2` class yourself. – Ryan Schuster Apr 06 '16 at 23:13
  • Ah, makes sense. Thanks! – SpaceCase Apr 07 '16 at 00:57
  • I am implementing this code on my PiBorg wheeled robot with a Raspberry Pi 3 board to switch on the yellow blinking light and keep moving. I had to change GPIO.output(25,ON) to GPIO.output(25, True), and also while TRUE to while True. So now the code works fine to switch on the blinking. However, I cannot stop blinking. The line of code: flashing_thread.stop() does not stop it for some reason. I have to restart Raspberry Pi 3 to stop it blinking. Python is a new language for me, and I cannot figure out so far how I can stop a blinking thread. I tried about everything already to no avail. – Oleksiy Muzalyev Mar 25 '17 at 08:35
  • I solved the problem with blinking light on PiBorg & Raspberry Pi 3 wheeled robot with the code provided by Aaron3468 below. I added +1 to both solutions, as I still use .isAlive() from yours. – Oleksiy Muzalyev Mar 25 '17 at 09:30
1

A simple example after looking at the documentation:

from threading import Thread
import time

def main_loop():
    mythread = LedThread()
    mythread.start()

    time.sleep(20) # execute while loop for 20 seconds
    mythread.stop()

class LedThread(Thread):

    def __init__(self):
        super(LedThread, self).__init__()
        self._keepgoing = True

    def run(self):
        while (self._keepgoing):
            print 'Blink'
            time.sleep(0.5)


    def stop(self):
        self._keepgoing = False

main_loop()

Ideally, you should be using the target parameter from threading.Thread.__init__, because it allows you to push your function into the thread. Ryan Schuster's example is the more robust of the two, though I hope this one can help you understand what threads are by using only the basics necessary to run one.

Aaron3468
  • 1,734
  • 16
  • 29
  • +1 This code works just fine on my PiBorg & Raspberry Pi 3 wheeled robot. I can switch on the blinking light, keep moving, and switch off blinking. – Oleksiy Muzalyev Mar 25 '17 at 09:28