123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615 |
- #/
- #
- # Note: Requires a build of micropython with ESPNOW support, which I got from
- # https://github.com/glenn20/micropython-espnow-images
- import binascii
- import esp
- import esp32
- import json
- import machine
- import math
- import network
- import random
- import _thread
- import time
- import tm1637
- from machine import Pin, PWM
- COMM_TIMEOUT = 5 # Prune neighbor if not heard from in 10 seconds
- MIN_DELAY = 2000 # Minimum time for a random start
- MAX_DELAY = 5000 # Maximum time for a random start
- PIN_NUMBERS = {
- "BUZZER": 18,
- "LED": [ 26, 25, 23, 22, 21 ],
- "DISPLAY": [ 33, 32 ],
- "COASTER": 27,
- "READY": 14,
- "MODE": 15
- }
- PINS = {
- "BUZZER": Pin(PIN_NUMBERS["BUZZER"], Pin.OUT),
- "LED": [
- Pin(PIN_NUMBERS['LED'][0], Pin.OUT, value=0),
- Pin(PIN_NUMBERS['LED'][1], Pin.OUT, value=0),
- Pin(PIN_NUMBERS['LED'][2], Pin.OUT, value=0),
- Pin(PIN_NUMBERS['LED'][3], Pin.OUT, value=0),
- Pin(PIN_NUMBERS['LED'][4], Pin.OUT, value=0)
- ],
- "COASTER": Pin(PIN_NUMBERS['COASTER'], Pin.IN, Pin.PULL_UP),
- "READY": Pin(PIN_NUMBERS['READY'], Pin.IN, Pin.PULL_UP),
- "MODE": Pin(PIN_NUMBERS['MODE'], Pin.IN, Pin.PULL_UP)
- }
- display = tm1637.TM1637(clk=Pin(PIN_NUMBERS["DISPLAY"][0]), dio=Pin(PIN_NUMBERS["DISPLAY"][1]))
- wlan = network.WLAN(network.STA_IF)
- BROADCAST = b'\xff\xff\xff\xff\xff\xff'
- msgqueue = []
- neighbors = {}
- neighborlist = []
- abort = False
- winner = False
- songfinished = False
- def initialize():
- print("Here's what I know about myself:")
- print('')
- print(f'\tMAC Address: {wlan.config('mac')}')
- print('')
- print(f'\tFrequency: {machine.freq()}')
- print(f'\tFlash Size: {esp.flash_size()}')
- print(f'\tTemperature: {esp32.raw_temperature()}')
- print('')
- print('Current Switch status:')
- print(f'\tCoaster: { get_switch("COASTER") }')
- print(f'\t Ready: { get_switch("READY") }')
- print(f'\t Mode: { get_switch("MODE") }')
- print('')
- for i in range(5):
- print(f'\tTesting LED {i}')
- led(i, True)
- beep()
- time.sleep(0.25)
- led(i, False)
- def led(pin, value):
- PINS["LED"][pin].value(value)
- def get_switch(name):
- return not PINS[name].value()
- def am_ready():
- buttons_ready = get_switch("COASTER") and get_switch("READY")
- if buttons_ready is False:
- return False
- for k, n in neighbors.items():
- if n['self'] is True:
- continue
- if n['neighborlist'] != neighborlist:
- return False
- return True
- #######################
- # Buzzer Functions
- def tone(freq, delay):
- beeper = PWM(PINS["BUZZER"], freq=freq, duty=512)
- time.sleep_ms(delay)
- beeper.deinit()
- def buzz():
- tone(freq=300, delay=800)
- def beep():
- tone(freq=2000, delay=200)
- def test_buzzer():
- for freq in range(0, 5500, 100):
- tone(freq=freq, delay=50)
- def play_song():
- global songfinished
- frequencies = {
- 'c': 262,
- 'd': 294,
- 'e': 330,
- 'f': 349,
- 'g': 392,
- 'a': 440,
- 'b': 494,
- 'C': 523
- }
- songs = {
- 'song1': {
- 'tempo': 118,
- 'notes': [
- ['c', 1],
- ['d', 1],
- ['f', 1],
- ['d', 1],
- ['a', 1],
- [' ', 1],
- ['a', 4],
- ['g', 4],
- [' ', 2],
- ['c', 1],
- ['d', 1],
- ['f', 1],
- ['d', 1],
- ['g', 1],
- [' ', 1],
- ['g', 4],
- ['f', 4],
- [' ', 2],
- ]
- },
- 'jingle_bells': {
- 'tempo': 118,
- 'notes': [
- ['e', 1],
- ['e', 1],
- ['e', 2],
- ['e', 1],
- ['e', 1],
- ['e', 2],
- ['e', 1],
- ['g', 1],
- ['c', 1],
- ['d', 1],
- ['e', 4],
- [' ', 1],
- ['f', 1],
- ['f', 1],
- ['f', 1],
- ['f', 1],
- ['f', 1],
- ['e', 1],
- ['e', 1],
- ['e', 1],
- ['e', 1],
- ['d', 1],
- ['d', 1],
- ['e', 1],
- ['d', 2],
- ['g', 2]
- ]
- }
- }
- #s = random.choice(list(songs.keys()))
- if random.randint(0, 10) == 10: # one in 10 chance, for now
- s = 'jingle_bells'
- else:
- s = 'song1'
- print(f'Randomly chose song { s }')
- for note in songs[s]['notes']:
- duration = note[1] * songs[s]['tempo']
- if note[0] == ' ':
- #print(f'Rest for { duration }')
- time.sleep_ms(duration)
- else:
- #print(f'Note { note[0] } at { frequencies[note[0]] } for { duration }')
- tone(frequencies[note[0]], duration)
- time.sleep_ms(math.floor(songs[s]['tempo'] / 10))
- songfinished = True
- #int songLength2 = 26;
- #char notes2[] = "eeeeeeegcde fffffeeeeddedg";
- #int beats2[] = { 1, 1, 2, 1, 1, 2, 1, 1, 1, 1, 4, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2};
- #const int tempo2 = 130;
- #######################
- # Display Functions
- def display_counts():
- nr = count_ready()
- nc = count_neighbors()
- digits = [ 0, 0, 0, 0 ]
- if nr > 19 or nc > 19:
- display.show('OVER')
- return
- if nr < 10:
- digits[0] = display.encode_digit(nr)
- else:
- digits[0] = display.encode_digit(0x01)
- digits[1] = display.encode_digit(nr % 10)
- if nc < 10:
- digits[3] = display.encode_digit(nc)
- else:
- digits[2] = display.encode_digit(0x01)
- digits[3] = display.encode_digit(nc % 10)
- display.write(digits)
- #######################
- # ESPNow Functions
- def initialize_network():
- global wlan
- global espnow
- global neighbors
- neighbors[mac_to_hex(wlan.config('mac'))] = { 'self': True } # we're our own neighbor
- neighborlist.append(mac_to_hex(wlan.config('mac')))
- print('Activating WLAN')
- wlan.active(True)
- print('Activating ESPNow')
- espnow = esp.espnow.ESPNow()
- espnow.init()
- espnow.config(on_recv=espnow_recv_handler)
- espnow.add_peer(BROADCAST)
- def espnow_recv_handler(e):
- # We should add this to a processing queue
- global msgqueue
- while(e.poll()):
- data = e.irecv(0)
- #neighbor = binascii.hexlify(data[0]).decode('ascii')
- msg = data[1].decode()
- msgqueue.append({ 'neighbor': data[0], 'msg': msg })
- #print(f'Received from { neighbor }: {msg}')
- def process_queue():
- global msgqueue
- global neighbors
- global neighborlist
- global abort
- while(len(msgqueue)>0):
- item = msgqueue.pop(0)
- mac = mac_to_hex(item['neighbor'])
- process_neighbor(mac)
- try:
- neighbor_json = json.loads(item["msg"])
- except:
- print(f'ERROR: Could not decode message from { mac_to_hex(item["neighbor"]) }: { item["msg"] }')
- msg_type = neighbor_json.get('type', None)
- if msg_type is None:
- print(f'ERROR: No message type from { mac_to_hex(item["neighbor"]) }: { item["msg"] }')
- return False
- if msg_type == 'GOGOGO':
- print(f"I'm a peon, and master (or somebody--I don't check) said TIME TO GO!")
- return True # time to gO!
- if msg_type == 'ABORT':
- print(f'Neighbor { mac } disqualified.')
- abort = True
- return False
- if msg_type == 'FINISH':
- print(f'Neighbor { mac } finished.')
- neighbors[mac]['finished'] = True
- neighbors[mac]['time'] = neighbor_json['time']
- return False
- if msg_type == 'beacon':
- neighbors[mac]['neighborlist'] = list(neighbor_json['neighborlist'])
- neighbors[mac]['mode'] = neighbor_json['mode']
- # Only neighbors in the same mode as us can be ready
- if neighbors[mac]['mode'] != get_switch('MODE'):
- neighbors[mac]['ready'] = False
- else:
- neighbors[mac]['ready'] = neighbor_json['ready']
- doc = json.dumps(neighbor_json)
- print(f'Received from { mac_to_hex(item["neighbor"]) } : { doc }')
- return False # Not time to go
- def process_neighbor(mac):
- if not(mac in neighbors.keys()):
- print(f'NEW NEIGHBOR: { mac }')
- neighbors[mac] = {
- 'self': False,
- 'lastseen': time.time(),
- 'mode': get_switch('MODE'), # Good a guess as any
- 'ready': False
- }
- neighborlist.append(mac)
- neighborlist.sort()
- neighbors[mac]['lastseen'] = time.time()
- def prune_neighbors():
- for n in list(neighborlist):
- if neighbors[n]['self'] is True:
- continue
- if time.time() - neighbors[n]['lastseen'] > COMM_TIMEOUT:
- print(f'LOST NEIGHBOR: { n }')
- neighborlist.remove(n)
- neighbors.pop(n)
- def mac_to_hex(mac):
- return binascii.hexlify(mac).decode('ascii')
- def am_master():
- # We're the master if we're the first one on the list
- return neighbors[neighborlist[0]]['self']
- last_beacon = time.time()
- def broadcast_beacon():
- global last_beacon
- if time.time() - last_beacon < 1:
- return False
- last_beacon = time.time()
- json_beacon = {
- 'type': 'beacon',
- 'neighborlist': neighborlist,
- 'mode': get_switch("MODE"),
- 'ready': am_ready()
- }
- beacon = json.dumps(json_beacon, separators=(',', ':'))
- #, 'neighbors': neighbors }
- if(len(beacon)) > 250:
- display.show('MANY')
- else:
- print(f'Sending JSON Broadcast ({len(beacon)} bytes): {beacon}')
- espnow.send(BROADCAST, beacon, False)
- return True
- def count_neighbors():
- return len(neighborlist)
- def count_ready():
- count = 0
- for m, i in neighbors.items():
- if i['self'] and am_ready():
- count += 1
- elif i.get('ready', False):
- count += 1
- # For testing with only 1:
- #if count == 1:
- # return 2
- return count
- ##### Sub-Ready Loops
- def call_regularly():
- broadcast_beacon()
- time_to_go = process_queue()
- prune_neighbors()
- def get_ready_loop(should_display_counts):
- print(f'***** Stage 1: Get Ready Loop')
- time_to_go = False
- while time_to_go is False:
- call_regularly()
- if should_display_counts or get_switch('READY'):
- # we only display counts on the first run or once ready
- # has been pressed. On other runs
- # we wnat the time and/or disqualification to show
- should_display_counts = True
- display_counts()
-
- if(get_switch('COASTER')):
- led(4, 1)
- else:
- led(4, 0)
-
- # Time to go?
- if am_master() and count_neighbors() == count_ready():
- print(f"I'm the master and it's TIME TO GO!")
- espnow.send(BROADCAST, json.dumps({ 'type': 'GOGOGO' }), False)
- time_to_go = True
- #time.sleep(1.0)
- def sleep_with_coaster_checks(ms):
- ''' Returns True if there was a disqualifying event '''
- start = time.ticks_ms()
- while(time.ticks_diff(time.ticks_ms(), start) < ms):
- if not get_switch('COASTER'):
- return True
- return False
- def light_countdown():
- ''' Countdown the lights, returns True if there was a disqualifying event '''
- print(f'Mode is Drag Race. Running lights...')
- # Fix for early disqulaification
- if not get_switch('COASTER'):
- print('DISQUALIFIED')
- return True
- for i in range(3):
- print(f'\t GO LED {i}')
- led(i, True)
- beep()
- if(sleep_with_coaster_checks(500)):
- print('DISQUALIFIED')
- return True
- led(i, False)
- call_regularly()
- led(3, 1) # Turn on green
- led(4, 0) # turn off red
- return False
- def random_countdown():
- delay = random.randint(MIN_DELAY, MAX_DELAY)
- print(f'Mode is random delay, waiting { delay } ms')
- start_ticks = time.ticks_ms()
- led(0, 1)
- led(1, 1)
- led(2, 1)
- led(3, 0)
- led(4, 1)
- while( time.ticks_diff(time.ticks_ms(), start_ticks) < delay ):
- call_regularly()
- if not get_switch('COASTER'):
- print('DISQUALIFIED')
- return True
- led(0, 0)
- led(1, 0)
- led(2, 0)
- led(3, 1)
- led(4, 0)
- return False
- def handle_disqualification():
- global abort
- espnow.send(BROADCAST, json.dumps({ 'type': 'ABORT' }), False)
- for i in range(5):
- led(4, 1)
- display.show('FAUL')
- buzz()
- call_regularly()
- led(4, 0)
- display.show(' ')
- time.sleep(0.5)
- call_regularly()
- led(4,1)
- abort = True
- def is_everybody_finished():
- for mac, state in neighbors.items():
- if state['self'] is True:
- continue
- if state['finished'] is not True:
- return False
- return True
- def did_we_win(time_diff):
- for mac, state in neighbors.items():
- if state['self'] is True:
- continue
- if state['time'] < time_diff:
- # somebody else did better
- return False
- return True
- def time_to_go_loop():
- global abort
- global winner
- global songfinished
- display.show(" GO ")
- beep(); beep(); beep()
- # Reset State of the Game
- abort = False
- winner = False
- for mac, state in neighbors.items():
- state['finished'] = False
- state['time'] = 0
- time.sleep(1.0)
- call_regularly()
- if(get_switch('MODE')):
- disqualified = light_countdown()
- else:
- disqualified = random_countdown()
- if disqualified:
- handle_disqualification()
- led(0, 1)
- led(1, 1)
- led(2, 1)
- led(3, 0)
- led(4, 1)
- return
- elif abort:
- # somebody else disqualified
- led(0, 1)
- led(1, 0)
- led(2, 0)
- led(3, 0)
- led(4, 1)
- beep(); beep(); beep();
- return # Soembody disqualified
- call_regularly()
- # Start go tone:
- beeper = PWM(PINS["BUZZER"], freq=2000, duty=512)
- start_time = time.ticks_ms()
- done = False
- coaster_lifted = False
- time_diff = 0
- seconds = 0
- ms = 0
- while not done:
- call_regularly()
- time_diff = time.ticks_diff(time.ticks_ms(), start_time)
- if time_diff > 1000:
- beeper.deinit()
- seconds = int(time_diff / 1000)
- ms = int( (time_diff % 1000) / 10 )
- #print(f' Time is {seconds}.{ms}')
- display.numbers(seconds, ms)
- # Can't win until coaster's been lifted
- if not get_switch('COASTER'):
- coaster_lifted = True
- if coaster_lifted and get_switch('COASTER'):
- # We've finished, and may or may not have won
- done = True
- led(3,0) # turn off the green light when finished
- espnow.send(BROADCAST, json.dumps({ 'type': 'FINISH', 'time': time_diff }), False)
- while not is_everybody_finished():
- call_regularly()
- # Determine winner
- # If winner, blink a few times and play a song
- # If loser, play a sadder song
- if did_we_win(time_diff):
- songfinished = False
- songthread = _thread.start_new_thread(play_song, [])
- while not songfinished:
- # thread will change end condition
- display.show(' ')
- time.sleep_ms(200)
- display.numbers(seconds, ms)
- time.sleep_ms(200)
- call_regularly()
- else:
- buzz()
- ##########################################################################
- ##########################################################################
- ##########################################################################
- if __name__ == "__main__":
- initialize_network()
- initialize()
- should_display_counts = True
- while True:
- get_ready_loop(should_display_counts)
- time_to_go_loop()
- #test_buzzer()
- should_display_counts = False
- #while True:
- # ###################################################################
- # # Loop code goes inside the loop here, this is called repeatedly: #
- # ###################################################################
- # print(i)
- # i = 1
- # time.sleep(1.0) # Delay for 1 second.
|