I am now struggling since weeks on one Problem. I have a code for a very simple Alarm Device. It receives TCP Notifications from my Axis Cameras and plays a sound and also lights up my Hella Rotary LED Signal Lamps. For this purpose I took a raspberry with a touchscreen and built it into a housing with a relay-output via GPIO.
I also have some Kivy buttons that should enable/disable the alarms. A kivy button is executing Functions ; the functions set a variable. BUT the variable is ONLY being changed IN the class with the Kivy app but not outside.
I also made a seperate Variant where the complete code is in one class - also not working.
The variable power
is never beeing changed and always keeps its value 1
in function do_pseudo_blinken
. Please - has somebody an idea? Thank you!
Variant 2 - Everything in one class MainApp
:
#! /usr/bin/env python2
# Beedo v110 8.4.2016
# mit Sound!
import sys
import time
import datetime
import socket
from multiprocessing import Process, Queue
#from kivy.core.window import Window
from kivy.lang import Builder
from kivy.app import App
#import var
import kivy
import os
#--------------------------------------------
#import RPi.GPIO as GPIO # import GPIO library - auskommentiert fuer Tests auf Windows
class MainApp(App):
global power
power = 1
global timestamp
def build(self):
return Builder.load_file("beedo_111_test.kv")
#---------------------------------------------------
# Kivy Callback functions
#---------------------------------------------------
def callback_power_0(self):
global power
power = 0
print "Power =", power
def callback_power_1(self):
global power
power = 1
print "Power =", power
#------------------------------------------------
def callback_terrasse_0(self):
terrasse=0
print "Terrasse =", terrasse
def callback_terrasse_1(self):
terrasse=1
print "Terrasse =", terrasse
#------------------------------------------------
def callback_umseck_0(self):
umseck=0
print "umseck =", umseck
def callback_umseck_1(self):
umseck=1
print "umseck =", umseck
#------------------------------------------------
def callback_eingang_0(self):
eingang=0
print "eingang =", eingang
def callback_eingang_1(self):
eingang=1
print "eingang =", eingang
#------------------------------------------------
def callback_strasse_0(self):
strasse=0
print "strasse =", strasse
def callback_strasse_1(self):
strasse=1
print "strasse =", strasse
#------------------------------------------------
def callback_pieplang_0(self):
pieplang=0
print "pieplang =", pieplang
def callback_pieplang_1(self):
pieplang=1
print "pieplang =", pieplang
#------------------------------------------------
#===============================================================================
def timestamp():
fmt = "%H:%M:%S (%d.%m.%Y)" # 14:16:23 (24.03.2016)
#return time.strftime(fmt, time.gmtime(time.time())) #hat falsche Zeit ausgegeben
return time.strftime(fmt, time.localtime(time.time()))
#-------------------------------------------------------------------------------
def socket_server(self, host, port, event_queue, num):
global zeit
#localtime = time.asctime( time.localtime(time.time()) )
print "socketserver startet ..", port
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind((host, port))
except socket.error as msg:
print("Binding to the port %d failed." % port)
print("Error Code: " + str(msg[0]) + " Message " + msg[1])
sys.exit()
s.listen(10)
print "listening on Port", port
while True:
conn, addr = s.accept()
event = (addr[0], timestamp(), num)
event_queue.put(event)
conn.close()
#-------------------------------------------------------------------------------
def do_blinken (): #laesst Rundumlicht und LED aufleuchten, und gibt kurzen Piepston
GPIO.output(23, True)
GPIO.setup(24, GPIO.OUT)
GPIO.output(24, True)
GPIO.output(25, True)
time.sleep(0.508)
GPIO.output(25, False)
time.sleep(4)
GPIO.output(24, False)
GPIO.setup(24, GPIO.IN)
GPIO.output(23, False)
return
#-------------------------------------------------------------------------------
def do_pseudo_blinken (self): #Pseudo-Blinkfunktion auf (Windows)
print "power =", power
print "blinken beginnt"
time.sleep(2)
print "blinken endet"
return
#-------------------------------------------------------------------------------
def do_sound (self, n): #spielt sound ab der in "IP_NAME" definiert ist
print "spiele sound ab fuer", n
#os.system("omxplayer -o local --no-osd --vol -1200 %s >/dev/null" %(n))
print "sound abgespielt fuer ", n
return
#-------------------------------------------------------------------------------
def event_handler(self, event_queue, ip_name):
print ("handler startet..")
#GPIO.cleanup() #setzt generelle GPIO Parameter - auskommentiert fuer Tests auf Windows
#GPIO.setwarnings(False) #setzt generelle GPIO Parameter - auskommentiert fuer Tests auf Windows
#GPIO.setmode(GPIO.BCM) #setzt generelle GPIO Parameter - auskommentiert fuer Tests auf Windows
#GPIO.setup(23, GPIO.OUT) #setzt generelle GPIO Parameter - auskommentiert fuer Tests auf Windows
#GPIO.setup(24, GPIO.IN) #setzt generelle GPIO Parameter - auskommentiert fuer Tests auf Windows
#GPIO.setup(25, GPIO.OUT) #setzt generelle GPIO Parameter - auskommentiert fuer Tests auf Windows
print ("handler gestartet")
while True:
try:
event = event_queue.get(timeout=10)
camera, n = ip_name[event[0]][event[2]-1]
#---------------------------------------------------
print("%s ---> %s" % (camera, event[1]))
blinken1 = Process (target=MainApp().do_pseudo_blinken, args=()) #ruft Funktion fuer Beedo-blinken auf (RAspberry)
sound1 = Process (target=MainApp().do_sound, args=(n,))
#blinken1 = Process (target=do_pseudo_blinken, args=()) #ruft Pseudo-Blinkfunktion auf (Windows)
blinken1.start()
sound1.start()
time.sleep(0.6)
sound1.terminate()
#print MainApp.power
#---------------------------------------------------
except:
pass
#===============================================================================
if __name__ == "__main__":
#Window.size = (800, 480)
#------------------- config data ---------------
HOST = "0.0.0.0"
PORT1 = 47120
PORT2 = 47121
IP_NAME = {
"192.168.1.202" : [("Umseck ", "umseck.mp3")],
"192.168.1.203" : [("Eingang ", "eingang4.mp3"), ("Strasse", "strasse.mp3")],
"192.168.1.204" : [("Terrasse", "terrasse.mp3")],
"192.168.1.100" : [("Test ", "umseck.mp3"), ("Test2 ", "terrasse.mp3")],
"192.168.1.200" : [("Test ", "eingang2.mp3"), ("Test2 ", "strasse.mp3")],
"192.168.1.201" : ["CCU2"],
"127.0.0.1" : ["localhost"]
}
#-----------------------------------------------
EVENTS = Queue()
#-----------------------------------------------
server1 = Process (target=MainApp().socket_server, args=(HOST, PORT1, EVENTS, 1))
server2 = Process (target=MainApp().socket_server, args=(HOST, PORT2, EVENTS, 2))
handler = Process (target=MainApp().event_handler, args=(EVENTS, IP_NAME))
server1.start()
server2.start()
handler.start()
MainApp().run()
server1.join()
server2.join()
handler.join()
Variant 1 (original Code) - All function "outside" of the Kivy Class MainApp
#! /usr/bin/env python2
# Beedo v110 8.4.2016
# mit Sound!
import sys
import time
import socket
from multiprocessing import Process, Queue
#from kivy.core.window import Window
from kivy.lang import Builder
from kivy.app import App
#import var
import kivy
import os
#--------------------------------------------
#import RPi.GPIO as GPIO # import GPIO library - auskommentiert fuer Tests auf Windows
#===============================================================================
def timestamp():
fmt = "%H:%M:%S (%d.%m.%Y)" # 14:16:23 (24.03.2016)
#return time.strftime(fmt, time.gmtime(time.time())) #hat falsche Zeit ausgegeben
return time.strftime(fmt, time.localtime(time.time()))
#-------------------------------------------------------------------------------
def socket_server(host, port, event_queue, num):
print "socketserver startet ..", port
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind((host, port))
except socket.error as msg:
print("Binding to the port %d failed." % port)
print("Error Code: " + str(msg[0]) + " Message " + msg[1])
sys.exit()
s.listen(10)
print "listening on Port", port
while True:
conn, addr = s.accept()
event = (addr[0], timestamp(), num)
event_queue.put(event)
conn.close()
#-------------------------------------------------------------------------------
def do_blinken (): #laesst Rundumlicht und LED aufleuchten, und gibt kurzen Piepston
GPIO.output(23, True)
GPIO.setup(24, GPIO.OUT)
GPIO.output(24, True)
GPIO.output(25, True)
time.sleep(0.508)
GPIO.output(25, False)
time.sleep(4)
GPIO.output(24, False)
GPIO.setup(24, GPIO.IN)
GPIO.output(23, False)
return
#-------------------------------------------------------------------------------
def do_pseudo_blinken (): #Pseudo-Blinkfunktion auf (Windows)
print "power =", MainApp.power
print "blinken beginnt"
time.sleep(5)
print "blinken endet"
return
#-------------------------------------------------------------------------------
def do_sound (n): #spielt sound ab der in "IP_NAME" definiert ist
print "spiele sound ab fuer", n
#os.system("omxplayer -o local --no-osd --vol -1200 %s >/dev/null" %(n))
print "sound abgespielt fuer ", n
return
#-------------------------------------------------------------------------------
def event_handler(event_queue, ip_name):
print ("handler startet..")
#GPIO.cleanup() #setzt generelle GPIO Parameter - auskommentiert fuer Tests auf Windows
#GPIO.setwarnings(False) #setzt generelle GPIO Parameter - auskommentiert fuer Tests auf Windows
#GPIO.setmode(GPIO.BCM) #setzt generelle GPIO Parameter - auskommentiert fuer Tests auf Windows
#GPIO.setup(23, GPIO.OUT) #setzt generelle GPIO Parameter - auskommentiert fuer Tests auf Windows
#GPIO.setup(24, GPIO.IN) #setzt generelle GPIO Parameter - auskommentiert fuer Tests auf Windows
#GPIO.setup(25, GPIO.OUT) #setzt generelle GPIO Parameter - auskommentiert fuer Tests auf Windows
print ("handler gestartet")
while True:
try:
event = event_queue.get(timeout=10)
camera, n = ip_name[event[0]][event[2]-1]
print("%s ---> %s" % (camera, event[1]))
#---------------------------------------------------
blinken1 = Process (target=do_pseudo_blinken, args=()) #ruft Funktion fuer Beedo-blinken auf (RAspberry)
sound1 = Process (target=do_sound, args=(n,))
#blinken1 = Process (target=do_pseudo_blinken, args=()) #ruft Pseudo-Blinkfunktion auf (Windows)
blinken1.start()
sound1.start()
time.sleep(0.6)
sound1.terminate()
print MainApp.power
#---------------------------------------------------
except:
pass
#===============================================================================
class MainApp(App):
power = 0
def build(self):
return Builder.load_file("beedo_111.kv")
#---------------------------------------------------
# Kivy Callback functions
#---------------------------------------------------
def callback_power_0(self):
power = 0
print "Power =", power
def callback_power_1(self):
power = 1
print "Power =", power
#------------------------------------------------
def callback_terrasse_0(self):
terrasse=0
print "Terrasse =", terrasse
def callback_terrasse_1(self):
terrasse=1
print "Terrasse =", terrasse
#------------------------------------------------
def callback_umseck_0(self):
umseck=0
print "umseck =", umseck
def callback_umseck_1(self):
umseck=1
print "umseck =", umseck
#------------------------------------------------
def callback_eingang_0(self):
eingang=0
print "eingang =", eingang
def callback_eingang_1(self):
eingang=1
print "eingang =", eingang
#------------------------------------------------
def callback_strasse_0(self):
strasse=0
print "strasse =", strasse
def callback_strasse_1(self):
strasse=1
print "strasse =", strasse
#------------------------------------------------
def callback_pieplang_0(self):
pieplang=0
print "pieplang =", pieplang
def callback_pieplang_1(self):
pieplang=1
print "pieplang =", pieplang
#------------------------------------------------
if __name__ == "__main__":
#Window.size = (800, 480)
#------------------- config data ---------------
HOST = "0.0.0.0"
PORT1 = 47120
PORT2 = 47121
IP_NAME = {
"192.168.1.202" : [("Umseck ", "umseck.mp3")],
"192.168.1.203" : [("Eingang ", "eingang4.mp3"), ("Strasse", "strasse.mp3")],
"192.168.1.204" : [("Terrasse", "terrasse.mp3")],
"192.168.1.100" : [("Test ", "umseck.mp3"), ("Test2 ", "terrasse.mp3")],
"192.168.1.200" : [("Test ", "eingang2.mp3"), ("Test2 ", "strasse.mp3")],
"192.168.1.201" : ["CCU2"],
"127.0.0.1" : ["localhost"]
}
#-----------------------------------------------
EVENTS = Queue()
#-----------------------------------------------
server1 = Process (target=socket_server, args=(HOST, PORT1, EVENTS, 1))
server2 = Process (target=socket_server, args=(HOST, PORT2, EVENTS, 2))
handler = Process (target=event_handler, args=(EVENTS, IP_NAME))
server1.start()
server2.start()
handler.start()
MainApp().run()
server1.join()
server2.join()
handler.join()
Kivy Lang File:
#:kivy 1.8.0
#:import ListItemButton kivy.uix.listview.ListItemButton
#:import ListAdapter kivy.adapters.listadapter.ListAdapter
#:import Rectangle kivy.graphics.vertex_instructions.Rectangle
<ListItemButton>:
selected_color: 0, 0, 1, 1
deselected_color: 0, 0, 0, 1
GridLayout:
spacing: 7
padding: 7
cols: 2
rows: 1
canvas:
Color:
rgba: 0.4, 0.8, 0, 1
Rectangle:
pos: self.pos
size: self.size
Label
text: "Labelein"
GridLayout:
size_hint_x: 0.6
spacing: 1
padding: 1
cols: 1
rows: 6
ToggleButton:
id: button1
background_normal: "button4.png"
background_down: "button7.png"
text: "AUS" if self.state == 'normal' else 'AN'
bold: True
on_state:
if self.state == 'normal': app.callback_power_0()
else: app.callback_power_1()
border: 0,0,0,0
ToggleButton:
id: button2
background_normal: "button4.png"
background_down: "button7.png"
text: "Terrasse" if self.state == 'normal' else 'Terrasse'
bold: True
on_state:
if self.state == 'normal': app.callback_terrasse_0()
else: app.callback_terrasse_1()
border: 0,0,0,0
ToggleButton:
id: button3
background_normal: "button4.png"
background_down: "button7.png"
text: "Umseck" if self.state == 'normal' else 'Umseck'
bold: True
on_state:
if self.state == 'normal': app.callback_umseck_0()
else: app.callback_umseck_1()
border: 0,0,0,0
ToggleButton:
id: button4
background_normal: "button4.png"
background_down: "button7.png"
text: "Eingang" if self.state == 'normal' else 'Eingang'
bold: True
on_state:
if self.state == 'normal': app.callback_eingang_0()
else: app.callback_eingang_1()
border: 0,0,0,0
ToggleButton:
id: button5
background_normal: "button4.png"
background_down: "button7.png"
text: "Straße" if self.state == 'normal' else 'Straße'
bold: True
on_state:
if self.state == 'normal': app.callback_strasse_0()
else: app.callback_strasse_1()
border: 0,0,0,0
ToggleButton:
id: button6
background_normal: "button4.png"
background_down: "button7.png"
text: "Piep kurz" if self.state == 'normal' else 'Piep lang'
bold: True
on_state:
if self.state == 'normal': app.callback_pieplang_0()
else: app.callback_pieplang_1()
border: 0,0,0,0
SOLUTION
Problem can be solved by using multiprocessing.manager
. Another way - thats how I did it, is to use shard memory for the variable by using n.value
:
if __name__ == "__main__":
power = Value('b', 1)
....
handler = Process (target=event_handler, args=(power, ... #handing over to process
Kivy Class is using *.value to set variables in shared memory
class MainApp(App):
def build(self):
return Builder.load_file("beedo_111_manager.kv")
def callback_power_0(self):
power.value = 0
print "Power =", power.value
def callback_power_1(self):
power.value = 1
print "Power =", power.value
....