스위치 하나 누르면 여러개 스위치에서 신호를 보냅니다..
from pythonosc.osc_server import AsyncIOOSCUDPServer
from pythonosc.udp_client import SimpleUDPClient
from pythonosc.dispatcher import Dispatcher
import RPi.GPIO as GPIO
import datetime
import asyncio
# import orjson
import time
SWITCHES = [13, 19, 29, 33, 40]
RELAY = [11, 12, 15, 16]
STATUS = {'s0': 0, 's1': 0, 's2': 0, 's3': 0, 's4': 0, 'r0': 0, 'r1': 0, 'r2': 0, 'r3': 0}
# STATUS_LIST = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] # s0-4, r0-3
SERVER_IP ='192.168.1.101'
SERVER_PORT =1234
client = SimpleUDPClient(SERVER_IP, SERVER_PORT)
defrelay_handler(addr, args):
print(f"{addr}, {args}")
relay(args)
print(STATUS)
client.send_message('/switch', list(STATUS.values()))
return
defdefault_handler(addr, *args):
print(f"{addr}, {args}")
return
deftoggle(channel):
if GPIO.input(channel) == GPIO.HIGH:
STATUS[f's{SWITCHES.index(channel)}'] =0
# STATUS_LIST[SWITCHES.index(channel)] = 0
else:
STATUS[f's{SWITCHES.index(channel)}'] =1
# STATUS_LIST[SWITCHES.index(channel)] = 1
print(STATUS)
# client.send_message('/switch', STATUS)
client.send_message('/switch', list(STATUS.values()))
return
definitialize():
GPIO.setmode(GPIO.BOARD)
GPIO.setup(SWITCHES, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(RELAY, GPIO.OUT)
GPIO.output(RELAY, GPIO.HIGH) # turn it off
for SWITCH in SWITCHES:
GPIO.add_event_detect(SWITCH, GPIO.RISING, callback=toggle, bouncetime=200)
return
defrelay(args):
try:
index, action = args.split('_')
except:
print('WRONG MESSAGE')
return
index =int(index[-1])
iff'r{index}'in STATUS:
if action =='0':
STATUS[f'r{index}'] =0
# STATUS_LIST[SWITCHES.index(f'r{index}')] = 0
GPIO.output(RELAY[index], GPIO.HIGH)
elif action =='1':
STATUS[f'r{index}'] =1
# STATUS_LIST[SWITCHES.index(f'r{index}')] = 1
GPIO.output(RELAY[index], GPIO.LOW)
else:
pass
return
asyncdefloop():
"""Example main loop that only runs for 10 iterations before finishing"""
whileTrue:
await asyncio.sleep(0)
asyncdefinit_main():
try:
print('Start Switch')
initialize()
dispatcher = Dispatcher()
dispatcher.map('/relay', relay_handler)
dispatcher.set_default_handler(default_handler)
server = AsyncIOOSCUDPServer(('0.0.0.0', 1337), dispatcher, asyncio.get_event_loop())
transport, protocol =await server.create_serve_endpoint() # Create datagram endpoint and start serving
await loop() # Enter main loop of program
finally:
GPIO.cleanup()
print('Finishing')
transport.close() # Clean up serve endpoint
asyncio.run(init_main())