#/ # # Note: Requires a build of micropython with ESPNOW support, which I got from # https://github.com/glenn20/micropython-espnow-images import binascii import esp import esp32 import json import machine import math import network import random import _thread import time import tm1637 from machine import Pin, PWM COMM_TIMEOUT = 5 # Prune neighbor if not heard from in 10 seconds MIN_DELAY = 2000 # Minimum time for a random start MAX_DELAY = 5000 # Maximum time for a random start PIN_NUMBERS = { "BUZZER": 18, "LED": [ 26, 25, 23, 22, 21 ], "DISPLAY": [ 33, 32 ], "COASTER": 27, "READY": 14, "MODE": 15 } PINS = { "BUZZER": Pin(PIN_NUMBERS["BUZZER"], Pin.OUT), "LED": [ Pin(PIN_NUMBERS['LED'][0], Pin.OUT, value=0), Pin(PIN_NUMBERS['LED'][1], Pin.OUT, value=0), Pin(PIN_NUMBERS['LED'][2], Pin.OUT, value=0), Pin(PIN_NUMBERS['LED'][3], Pin.OUT, value=0), Pin(PIN_NUMBERS['LED'][4], Pin.OUT, value=0) ], "COASTER": Pin(PIN_NUMBERS['COASTER'], Pin.IN, Pin.PULL_UP), "READY": Pin(PIN_NUMBERS['READY'], Pin.IN, Pin.PULL_UP), "MODE": Pin(PIN_NUMBERS['MODE'], Pin.IN, Pin.PULL_UP) } display = tm1637.TM1637(clk=Pin(PIN_NUMBERS["DISPLAY"][0]), dio=Pin(PIN_NUMBERS["DISPLAY"][1])) wlan = network.WLAN(network.STA_IF) BROADCAST = b'\xff\xff\xff\xff\xff\xff' msgqueue = [] neighbors = {} neighborlist = [] abort = False winner = False songfinished = False def initialize(): print("Here's what I know about myself:") print('') print(f'\tMAC Address: {wlan.config('mac')}') print('') print(f'\tFrequency: {machine.freq()}') print(f'\tFlash Size: {esp.flash_size()}') print(f'\tTemperature: {esp32.raw_temperature()}') print('') print('Current Switch status:') print(f'\tCoaster: { get_switch("COASTER") }') print(f'\t Ready: { get_switch("READY") }') print(f'\t Mode: { get_switch("MODE") }') print('') for i in range(5): print(f'\tTesting LED {i}') led(i, True) beep() time.sleep(0.25) led(i, False) def led(pin, value): PINS["LED"][pin].value(value) def get_switch(name): return not PINS[name].value() def am_ready(): buttons_ready = get_switch("COASTER") and get_switch("READY") if buttons_ready is False: return False for k, n in neighbors.items(): if n['self'] is True: continue if n['neighborlist'] != neighborlist: return False return True ####################### # Buzzer Functions def tone(freq, delay): beeper = PWM(PINS["BUZZER"], freq=freq, duty=512) time.sleep_ms(delay) beeper.deinit() def buzz(): tone(freq=300, delay=800) def beep(): tone(freq=2000, delay=200) def test_buzzer(): for freq in range(0, 5500, 100): tone(freq=freq, delay=50) def play_song(): global songfinished frequencies = { 'c': 262, 'd': 294, 'e': 330, 'f': 349, 'g': 392, 'a': 440, 'b': 494, 'C': 523 } songs = { 'song1': { 'tempo': 118, 'notes': [ ['c', 1], ['d', 1], ['f', 1], ['d', 1], ['a', 1], [' ', 1], ['a', 4], ['g', 4], [' ', 2], ['c', 1], ['d', 1], ['f', 1], ['d', 1], ['g', 1], [' ', 1], ['g', 4], ['f', 4], [' ', 2], ] }, 'jingle_bells': { 'tempo': 118, 'notes': [ ['e', 1], ['e', 1], ['e', 2], ['e', 1], ['e', 1], ['e', 2], ['e', 1], ['g', 1], ['c', 1], ['d', 1], ['e', 4], [' ', 1], ['f', 1], ['f', 1], ['f', 1], ['f', 1], ['f', 1], ['e', 1], ['e', 1], ['e', 1], ['e', 1], ['d', 1], ['d', 1], ['e', 1], ['d', 2], ['g', 2] ] } } #s = random.choice(list(songs.keys())) if random.randint(0, 10) == 10: # one in 10 chance, for now s = 'jingle_bells' else: s = 'song1' print(f'Randomly chose song { s }') for note in songs[s]['notes']: duration = note[1] * songs[s]['tempo'] if note[0] == ' ': #print(f'Rest for { duration }') time.sleep_ms(duration) else: #print(f'Note { note[0] } at { frequencies[note[0]] } for { duration }') tone(frequencies[note[0]], duration) time.sleep_ms(math.floor(songs[s]['tempo'] / 10)) songfinished = True #int songLength2 = 26; #char notes2[] = "eeeeeeegcde fffffeeeeddedg"; #int beats2[] = { 1, 1, 2, 1, 1, 2, 1, 1, 1, 1, 4, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2}; #const int tempo2 = 130; ####################### # Display Functions def display_counts(): nr = count_ready() nc = count_neighbors() digits = [ 0, 0, 0, 0 ] if nr > 19 or nc > 19: display.show('OVER') return if nr < 10: digits[0] = display.encode_digit(nr) else: digits[0] = display.encode_digit(0x01) digits[1] = display.encode_digit(nr % 10) if nc < 10: digits[3] = display.encode_digit(nc) else: digits[2] = display.encode_digit(0x01) digits[3] = display.encode_digit(nc % 10) display.write(digits) ####################### # ESPNow Functions def initialize_network(): global wlan global espnow global neighbors neighbors[mac_to_hex(wlan.config('mac'))] = { 'self': True } # we're our own neighbor neighborlist.append(mac_to_hex(wlan.config('mac'))) print('Activating WLAN') wlan.active(True) print('Activating ESPNow') espnow = esp.espnow.ESPNow() espnow.init() espnow.config(on_recv=espnow_recv_handler) espnow.add_peer(BROADCAST) def espnow_recv_handler(e): # We should add this to a processing queue global msgqueue while(e.poll()): data = e.irecv(0) #neighbor = binascii.hexlify(data[0]).decode('ascii') msg = data[1].decode() msgqueue.append({ 'neighbor': data[0], 'msg': msg }) #print(f'Received from { neighbor }: {msg}') def process_queue(): global msgqueue global neighbors global neighborlist global abort while(len(msgqueue)>0): item = msgqueue.pop(0) mac = mac_to_hex(item['neighbor']) process_neighbor(mac) try: neighbor_json = json.loads(item["msg"]) except: print(f'ERROR: Could not decode message from { mac_to_hex(item["neighbor"]) }: { item["msg"] }') msg_type = neighbor_json.get('type', None) if msg_type is None: print(f'ERROR: No message type from { mac_to_hex(item["neighbor"]) }: { item["msg"] }') return False if msg_type == 'GOGOGO': print(f"I'm a peon, and master (or somebody--I don't check) said TIME TO GO!") return True # time to gO! if msg_type == 'ABORT': print(f'Neighbor { mac } disqualified.') abort = True return False if msg_type == 'FINISH': print(f'Neighbor { mac } finished.') neighbors[mac]['finished'] = True neighbors[mac]['time'] = neighbor_json['time'] return False if msg_type == 'beacon': neighbors[mac]['neighborlist'] = list(neighbor_json['neighborlist']) neighbors[mac]['mode'] = neighbor_json['mode'] # Only neighbors in the same mode as us can be ready if neighbors[mac]['mode'] != get_switch('MODE'): neighbors[mac]['ready'] = False else: neighbors[mac]['ready'] = neighbor_json['ready'] doc = json.dumps(neighbor_json) print(f'Received from { mac_to_hex(item["neighbor"]) } : { doc }') return False # Not time to go def process_neighbor(mac): if not(mac in neighbors.keys()): print(f'NEW NEIGHBOR: { mac }') neighbors[mac] = { 'self': False, 'lastseen': time.time(), 'mode': get_switch('MODE'), # Good a guess as any 'ready': False } neighborlist.append(mac) neighborlist.sort() neighbors[mac]['lastseen'] = time.time() def prune_neighbors(): for n in list(neighborlist): if neighbors[n]['self'] is True: continue if time.time() - neighbors[n]['lastseen'] > COMM_TIMEOUT: print(f'LOST NEIGHBOR: { n }') neighborlist.remove(n) neighbors.pop(n) def mac_to_hex(mac): return binascii.hexlify(mac).decode('ascii') def am_master(): # We're the master if we're the first one on the list return neighbors[neighborlist[0]]['self'] last_beacon = time.time() def broadcast_beacon(): global last_beacon if time.time() - last_beacon < 1: return False last_beacon = time.time() json_beacon = { 'type': 'beacon', 'neighborlist': neighborlist, 'mode': get_switch("MODE"), 'ready': am_ready() } beacon = json.dumps(json_beacon, separators=(',', ':')) #, 'neighbors': neighbors } if(len(beacon)) > 250: display.show('MANY') else: print(f'Sending JSON Broadcast ({len(beacon)} bytes): {beacon}') espnow.send(BROADCAST, beacon, False) return True def count_neighbors(): return len(neighborlist) def count_ready(): count = 0 for m, i in neighbors.items(): if i['self'] and am_ready(): count += 1 elif i.get('ready', False): count += 1 # For testing with only 1: #if count == 1: # return 2 return count ##### Sub-Ready Loops def call_regularly(): broadcast_beacon() time_to_go = process_queue() prune_neighbors() def get_ready_loop(should_display_counts): print(f'***** Stage 1: Get Ready Loop') time_to_go = False while time_to_go is False: call_regularly() if should_display_counts or get_switch('READY'): # we only display counts on the first run or once ready # has been pressed. On other runs # we wnat the time and/or disqualification to show should_display_counts = True display_counts() if(get_switch('COASTER')): led(4, 1) else: led(4, 0) # Time to go? if am_master() and count_neighbors() == count_ready(): print(f"I'm the master and it's TIME TO GO!") espnow.send(BROADCAST, json.dumps({ 'type': 'GOGOGO' }), False) time_to_go = True #time.sleep(1.0) def sleep_with_coaster_checks(ms): ''' Returns True if there was a disqualifying event ''' start = time.ticks_ms() while(time.ticks_diff(time.ticks_ms(), start) < ms): if not get_switch('COASTER'): return True return False def light_countdown(): ''' Countdown the lights, returns True if there was a disqualifying event ''' print(f'Mode is Drag Race. Running lights...') # Fix for early disqulaification if not get_switch('COASTER'): print('DISQUALIFIED') return True for i in range(3): print(f'\t GO LED {i}') led(i, True) beep() if(sleep_with_coaster_checks(500)): print('DISQUALIFIED') return True led(i, False) call_regularly() led(3, 1) # Turn on green led(4, 0) # turn off red return False def random_countdown(): delay = random.randint(MIN_DELAY, MAX_DELAY) print(f'Mode is random delay, waiting { delay } ms') start_ticks = time.ticks_ms() led(0, 1) led(1, 1) led(2, 1) led(3, 0) led(4, 1) while( time.ticks_diff(time.ticks_ms(), start_ticks) < delay ): call_regularly() if not get_switch('COASTER'): print('DISQUALIFIED') return True led(0, 0) led(1, 0) led(2, 0) led(3, 1) led(4, 0) return False def handle_disqualification(): global abort espnow.send(BROADCAST, json.dumps({ 'type': 'ABORT' }), False) for i in range(5): led(4, 1) display.show('FAUL') buzz() call_regularly() led(4, 0) display.show(' ') time.sleep(0.5) call_regularly() led(4,1) abort = True def is_everybody_finished(): for mac, state in neighbors.items(): if state['self'] is True: continue if state['finished'] is not True: return False return True def did_we_win(time_diff): for mac, state in neighbors.items(): if state['self'] is True: continue if state['time'] < time_diff: # somebody else did better return False return True def time_to_go_loop(): global abort global winner global songfinished display.show(" GO ") beep(); beep(); beep() # Reset State of the Game abort = False winner = False for mac, state in neighbors.items(): state['finished'] = False state['time'] = 0 time.sleep(1.0) call_regularly() if(get_switch('MODE')): disqualified = light_countdown() else: disqualified = random_countdown() if disqualified: handle_disqualification() led(0, 1) led(1, 1) led(2, 1) led(3, 0) led(4, 1) return elif abort: # somebody else disqualified led(0, 1) led(1, 0) led(2, 0) led(3, 0) led(4, 1) beep(); beep(); beep(); return # Soembody disqualified call_regularly() # Start go tone: beeper = PWM(PINS["BUZZER"], freq=2000, duty=512) start_time = time.ticks_ms() done = False coaster_lifted = False time_diff = 0 seconds = 0 ms = 0 while not done: call_regularly() time_diff = time.ticks_diff(time.ticks_ms(), start_time) if time_diff > 1000: beeper.deinit() seconds = int(time_diff / 1000) ms = int( (time_diff % 1000) / 10 ) #print(f' Time is {seconds}.{ms}') display.numbers(seconds, ms) # Can't win until coaster's been lifted if not get_switch('COASTER'): coaster_lifted = True if coaster_lifted and get_switch('COASTER'): # We've finished, and may or may not have won done = True led(3,0) # turn off the green light when finished espnow.send(BROADCAST, json.dumps({ 'type': 'FINISH', 'time': time_diff }), False) while not is_everybody_finished(): call_regularly() # Determine winner # If winner, blink a few times and play a song # If loser, play a sadder song if did_we_win(time_diff): songfinished = False songthread = _thread.start_new_thread(play_song, []) while not songfinished: # thread will change end condition display.show(' ') time.sleep_ms(200) display.numbers(seconds, ms) time.sleep_ms(200) call_regularly() else: buzz() ########################################################################## ########################################################################## ########################################################################## if __name__ == "__main__": initialize_network() initialize() should_display_counts = True while True: get_ready_loop(should_display_counts) time_to_go_loop() #test_buzzer() should_display_counts = False #while True: # ################################################################### # # Loop code goes inside the loop here, this is called repeatedly: # # ################################################################### # print(i) # i = 1 # time.sleep(1.0) # Delay for 1 second.