Core Concepts¶
Example with the core concepts and usage of this library.
examples/python/ping_pong.py¶
1# SPDX-FileCopyrightText: Copyright (c) 2024 Aaron Silinskas for Mindwidgets
2#
3# SPDX-License-Identifier: MIT
4"""Simple example that ping-pongs between two States every 2 seconds.
5
6The example shows:
7- How to use the States class pattern to keep track of valid States
8 (see PingPongStates)
9- How to change between States (see PingState.update and
10 PongState.update)
11- How to track elapsed time within a State (see thing.time_active usage)
12- How to observer State changes (see LoggingObserver). State change
13 observers are typically only used for logging purposes. Code external
14 to the Thing should not have logic that references the internal
15 States of a Thing.
16"""
17
18import time
19from state_of_things import State, Thing, ThingObserver
20
21
22class PingPongStates:
23 """The States that a Ping Pong Thing can have. The intention of this
24 pattern is to help with type hinting and auto-completion, BUT the
25 down side is that each State must be instantiated and set into this
26 object after it is defined. See the comments below with 'NOTE:'"""
27
28 ping: State
29 pong: State
30
31
32class PingPongThing(Thing):
33 """Alternates between Ping and Pong States, starting with Ping."""
34
35 def __init__(self):
36 """Construct a new Ping Pong that starts on Ping."""
37 super().__init__(PingPongStates.ping)
38
39
40class PingState(State):
41 def enter(self, thing: PingPongThing):
42 # log when the Ping State is entered.
43 print("Ping!")
44
45 def update(self, thing: PingPongThing) -> State:
46 if thing.time_active > 2:
47 # thing has been in Ping State for 2 seconds, time for pong
48 return PingPongStates.pong
49
50 # not yet time for pong, stay in this State
51 return self
52
53
54# NOTE: Be sure to create an instance of each State and store it in the
55# States class!
56PingPongStates.ping = PingState()
57
58
59class PongState(State):
60 def enter(self, thing: PingPongThing):
61 # log when the Pong State is entered.
62 print("Pong!")
63
64 def update(self, thing: PingPongThing) -> State:
65 if thing.time_active > 2:
66 # thing has been in Pong State for 2 seconds, time for ping
67 return PingPongStates.ping
68
69 # not yet time for ping, stay in this State
70 return self
71
72
73# NOTE: Be sure to create an instance of each State and store it in the
74# States class!
75PingPongStates.pong = PongState()
76
77
78class LoggingObserver(ThingObserver):
79 """Prints a message when State changes occur."""
80
81 def state_changed(self, thing: Thing, old_state: State, new_state: State):
82 print(
83 f"State of {thing.name} changed from {old_state.name} to {new_state.name}"
84 )
85
86
87def main():
88 # create a new Ping Pong Thing.
89 ping_pong = PingPongThing()
90
91 # attach the logging observer so it receives notifications
92 ping_pong.observers.attach(LoggingObserver())
93
94 # keep pinging and ponging for 10.5 seconds (extra half a second
95 # to end on a Pong)
96 time_to_stop = time.monotonic() + 10.5
97 while time.monotonic() < time_to_stop:
98 ping_pong.update()
99
100
101if __name__ == "__main__":
102 main()
Alarm¶
examples/python/alarm.py¶
1# SPDX-FileCopyrightText: Copyright (c) 2024 Aaron Silinskas for Mindwidgets
2#
3# SPDX-License-Identifier: MIT
4"""Example of an Alarm that will wait for a while, loop between the
5alarm going off and snoozing a number of times, and then exiting
6the application.
7
8The example shows:
9- How to pass configuration into a Thing (see AlarmThing.__init__).
10- How to pass data between States (see AlarmThing.snooze_count).
11- How to branch into multiple States (see SnoozeState).
12- How to terminate an application with States (see AlarmThing.finished).
13"""
14
15from state_of_things import State, Thing
16
17
18class AlarmStates:
19 """The states that the Alarm can have."""
20
21 waiting: State
22 triggered: State
23 snooze: State
24 finished: State
25
26
27class AlarmThing(Thing):
28 """Waits a number of seconds and then triggers the alarm."""
29
30 def __init__(
31 self,
32 seconds_until_alarm: float,
33 alarm_seconds: float,
34 snooze_seconds: float,
35 snoozes: int,
36 ):
37 """Constructor that configures a new Alarm.
38
39 Args:
40 seconds_until_alarm (float): number of seconds to wait before
41 triggering the alarm
42 alarm_seconds (float): number of seconds to sound the alarm
43 snooze_seconds (float): number of seconds to snooze before
44 triggering the alarm again
45 snoozes (int): number of times the alarm was snoozed
46 """
47 super().__init__(AlarmStates.waiting)
48
49 self.__seconds_until_alarm = seconds_until_alarm
50 self.__alarm_seconds = alarm_seconds
51 self.__snooze_seconds = snooze_seconds
52 self.__snoozes = snoozes
53 self.__snooze_count = 0
54
55 @property
56 def seconds_until_alarm(self) -> float:
57 """Seconds until the alarm should trigger"""
58 return self.__seconds_until_alarm
59
60 @property
61 def alarm_seconds(self) -> float:
62 """Seconds the alarm should sound"""
63 return self.__alarm_seconds
64
65 @property
66 def snooze_seconds(self) -> float:
67 """Seconds to stay in snooze state"""
68 return self.__snooze_seconds
69
70 @property
71 def snoozes(self) -> int:
72 """The number of times the alarm should snooze"""
73 return self.__snoozes
74
75 @property
76 def snooze_count(self) -> int:
77 """The number of times the alarm was in snooze state"""
78 return self.__snooze_count
79
80 @snooze_count.setter
81 def snooze_count(self, count: int):
82 self.__snooze_count = count
83
84 @property
85 def finished(self) -> bool:
86 """True if this alarm has triggered"""
87 return self.current_state == AlarmStates.finished
88
89
90class WaitingState(State):
91 """
92 Remains in the current state until it has been active long enough
93 for the alarm to trigger.
94 """
95
96 def enter(self, thing: AlarmThing):
97 print(f"Waiting for {thing.seconds_until_alarm} seconds")
98 # reset the snooze count when starting to wait for an alarm
99 thing.snooze_count = 0
100
101 def update(self, thing: AlarmThing) -> State:
102 if thing.time_active >= thing.seconds_until_alarm:
103 # waited long enough, time to trigger the alarm
104 return AlarmStates.triggered
105
106 return self
107
108
109AlarmStates.waiting = WaitingState()
110
111
112class TriggeredState(State):
113 """Whoops every second and then snoozes."""
114
115 def enter(self, thing: AlarmThing):
116 print("Alarm has triggered!")
117
118 # triggered_last_whoop was not formally declared on AlarmThing.
119 # Be very careful when using this pattern, and only when the
120 # attribute is used within a single State.
121 thing.triggered_last_whoop = 0
122
123 def update(self, thing: AlarmThing) -> State:
124 if thing.time_active > thing.alarm_seconds:
125 # time to snooze the alarm
126 return AlarmStates.snooze
127
128 # use int to round time down to the nearest second
129 if int(thing.time_active) > thing.triggered_last_whoop:
130 # time for another whoop!
131 print("Whoop!")
132
133 # wait a second for the next whoop
134 thing.triggered_last_whoop = thing.triggered_last_whoop + 1
135
136 return self
137
138
139AlarmStates.triggered = TriggeredState()
140
141
142class SnoozeState(State):
143 """Wait for a while and then transition back to triggered."""
144
145 def enter(self, thing: AlarmThing):
146 print(f"Alarm snoozing for {thing.snooze_seconds}")
147
148 # keep track of number of snoozes
149 thing.snooze_count = thing.snooze_count + 1
150
151 def update(self, thing: AlarmThing) -> State:
152 # if snoozed enough times, the alarm is finished
153 if thing.snooze_count > thing.snoozes:
154 return AlarmStates.finished
155
156 # if snoozed long enough, trigger the alarm again
157 if thing.time_active > thing.snooze_seconds:
158 return AlarmStates.triggered
159
160 # otherwise, keep waiting
161 return self
162
163
164AlarmStates.snooze = SnoozeState()
165
166
167class FinishedState(State):
168 """Do nothing and stay in this state when the alarm is finished."""
169
170 def enter(self, thing: Thing):
171 print("Finished!")
172
173
174AlarmStates.finished = FinishedState()
175
176
177def main():
178 # configure a new Alarm
179 thing = AlarmThing(
180 seconds_until_alarm=5, alarm_seconds=4, snooze_seconds=3, snoozes=2
181 )
182
183 # keep updating the alarm until it is finished
184 while not thing.finished:
185 thing.update()
186
187
188if __name__ == "__main__":
189 main()
Traffic Lights¶
examples/python/traffic_lights.py¶
1# SPDX-FileCopyrightText: Copyright (c) 2024 Aaron Silinskas for Mindwidgets
2#
3# SPDX-License-Identifier: MIT
4"""Traffic light example that supports caution mode and external
5control.
6
7This example shows:
8- How to externally control a Thing, but still allow the Thing to
9 manage its State and transitions (see TrafficLightThing.should_go
10 and TrafficLightThing.caution_mode).
11- How to provide custom strongly typed observers (see
12 TrafficLightObserver).
13"""
14
15import time
16from state_of_things import State, Thing, ThingObserver
17
18
19class TrafficLightStates:
20 stop: State
21 go: State
22 slow: State
23 caution: State
24
25
26class TrafficLightThing(Thing):
27 """Externally controlled traffic light with a caution mode that
28 overrides other states.
29 """
30
31 def __init__(self, slow_seconds: float):
32 """Construct a new traffic light.
33
34 Args:
35 slow_seconds (float): number of seconds to remain in slow
36 State before changing to stop.
37 """
38 super().__init__(TrafficLightStates.stop)
39 self.__slow_seconds = slow_seconds
40 self.__should_go = False
41 self.__caution_mode = False
42
43 def stop(self):
44 """Request change to stop"""
45 self.__should_go = False
46 print("Stop requested.")
47
48 def go(self):
49 """Request change to go"""
50 self.__should_go = True
51 print(f"Go requested (caution mode={self.caution_mode}).")
52
53 @property
54 def slow_seconds(self) -> float:
55 """Number of seconds to remain in slow before stop"""
56 return self.__slow_seconds
57
58 @property
59 def should_go(self) -> bool:
60 """True if the traffic light should change to go (unless in
61 caution mode)"""
62 return self.__should_go
63
64 @property
65 def caution_mode(self) -> bool:
66 """True if the traffic light is in caution mode."""
67 return self.__caution_mode
68
69 @caution_mode.setter
70 def caution_mode(self, enabled: bool):
71 """Enable or disable caution mode."""
72 self.__caution_mode = enabled
73 print(f"Caution mode set to {enabled}")
74
75
76class StopState(State):
77 """This State will transition to go if requested, or to caution if
78 caution mode is enabled."""
79
80 def enter(self, thing: TrafficLightThing):
81 # notify observers of stop event
82 thing.observers.notify("changed_to_stop", self)
83
84 def update(self, thing: TrafficLightThing) -> State:
85 if thing.caution_mode:
86 # caution mode is enabled, change to caution State
87 return TrafficLightStates.caution
88
89 if thing.should_go:
90 # go State requested
91 return TrafficLightStates.go
92
93 return self
94
95
96TrafficLightStates.stop = StopState()
97
98
99class GoState(State):
100 """This State will transition to slow when stop is requested, or to
101 caution if caution mode is enabled."""
102
103 def enter(self, thing: TrafficLightThing):
104 # notify observers of go event
105 thing.observers.notify("changed_to_go", self)
106
107 def update(self, thing: TrafficLightThing) -> State:
108 if thing.caution_mode:
109 # caution mode is enabled, change to caution State
110 return TrafficLightStates.caution
111
112 if not thing.should_go:
113 # stop State requested, but need to slow first
114 return TrafficLightStates.slow
115
116 return self
117
118
119TrafficLightStates.go = GoState()
120
121
122class SlowState(State):
123 """After a number of seconds, this State will transition to go or
124 stop based on the last traffic light request. If caution mode is
125 enabled, it will immediately switch to caution State."""
126
127 def enter(self, thing: TrafficLightThing):
128 # notify observers of slow event
129 thing.observers.notify("changed_to_slow", self)
130
131 def update(self, thing: TrafficLightThing) -> State:
132 if thing.caution_mode:
133 # caution mode is enabled, immediately change to caution
134 # State
135 return TrafficLightStates.caution
136
137 if thing.time_active < thing.slow_seconds:
138 # always wait in this State before proceeding to go or stop
139 return self
140
141 if thing.should_go:
142 # go requested, change to it
143 return TrafficLightStates.go
144
145 # default to stop State after slow
146 return TrafficLightStates.stop
147
148
149TrafficLightStates.slow = SlowState()
150
151
152class CautionState(State):
153 """Stays in this State as long as caution mode is enabled, sending a
154 blink notification every second. If caution mode is disabled, it
155 always changes to stop State even if go was requested."""
156
157 def enter(self, thing: TrafficLightThing):
158 # notify observers of caution event
159 thing.observers.notify("changed_to_caution", self)
160
161 # local attributes only used within this State
162 thing.caution_next_blink = 0
163 thing.caution_blink_count = 0
164
165 def update(self, thing: TrafficLightThing) -> State:
166 if not thing.caution_mode:
167 # caution mode is disabled, go to stop State
168 thing.stop()
169 return TrafficLightStates.stop
170
171 if time.monotonic() > thing.caution_next_blink:
172 # it is time to blink again, and set next blink to 1 second
173 # in the future.
174 thing.caution_next_blink = time.monotonic() + 1
175 thing.caution_blink_count = thing.caution_blink_count + 1
176
177 # notify observers of blink event
178 thing.observers.notify("caution_blink", self, thing.caution_blink_count)
179
180 return self
181
182
183TrafficLightStates.caution = CautionState()
184
185
186class TrafficLightObserver(ThingObserver):
187 """Custom Thing observer that receives events specific to the
188 Traffic Light Thing."""
189
190 def changed_to_go(self, thing: TrafficLightThing):
191 """Traffic light changed to go."""
192 pass
193
194 def changed_to_stop(self, thing: TrafficLightThing):
195 """Traffic light changed to stop."""
196 pass
197
198 def changed_to_slow(self, thing: TrafficLightThing):
199 """Traffic light changed to slow."""
200 pass
201
202 def changed_to_caution(self, thing: TrafficLightThing):
203 """Traffic light changed to caution mode."""
204 pass
205
206 def caution_blink(self, thing: TrafficLightThing, blink_count: int):
207 """Traffic light blinked while in caution mode."""
208 pass
209
210
211class TrafficLoggingObserver(TrafficLightObserver):
212 """Prints messages when traffic light events are observed."""
213
214 def changed_to_go(self, thing: TrafficLightThing):
215 print("-> Green Light")
216
217 def changed_to_stop(self, thing: TrafficLightThing):
218 print("-> Red Light")
219
220 def changed_to_slow(self, thing: TrafficLightThing):
221 print("-> Yellow Light")
222
223 def changed_to_caution(self, thing: TrafficLightThing):
224 print("-> Caution!")
225
226 def caution_blink(self, thing: TrafficLightThing, blink_count: int):
227 print(f"-> Blink Yellow (count {blink_count})")
228
229
230def update_thing(thing: Thing, seconds_to_update: float):
231 """Block while updating a Thing for a number of seconds.
232
233 Args:
234 thing (Thing): the Thing to update.
235 seconds_to_update (float): number of seconds to update.
236 """
237 time_until_done = time.monotonic() + seconds_to_update
238 while time.monotonic() < time_until_done:
239 thing.update()
240
241
242def main():
243 # create a new traffic light.
244 traffic_light = TrafficLightThing(slow_seconds=3)
245 traffic_light.observers.attach(TrafficLoggingObserver())
246 # update to change to initial State of stop
247 traffic_light.update()
248
249 # request change to go State
250 traffic_light.go()
251 update_thing(traffic_light, seconds_to_update=2)
252
253 # request change to stop State
254 traffic_light.stop()
255 update_thing(traffic_light, seconds_to_update=5)
256
257 # enable caution mode
258 traffic_light.caution_mode = True
259 update_thing(traffic_light, seconds_to_update=2)
260
261 # does not do anything, light is in caution mode!
262 traffic_light.go()
263 update_thing(traffic_light, seconds_to_update=2)
264
265 # disable caution mode, which sets light back to stop State (not
266 # go State as set above)
267 traffic_light.caution_mode = False
268 update_thing(traffic_light, seconds_to_update=2)
269
270 # request change to go State
271 traffic_light.go()
272 update_thing(traffic_light, seconds_to_update=2)
273
274 # caution mode also breaks out of go State
275 traffic_light.caution_mode = True
276 update_thing(traffic_light, seconds_to_update=3)
277
278 print("Done.")
279
280
281if __name__ == "__main__":
282 main()
Stand-alone Observers¶
examples/python/echo_observer.py¶
1# SPDX-FileCopyrightText: Copyright (c) 2024 Aaron Silinskas for Mindwidgets
2#
3# SPDX-License-Identifier: MIT
4"""Example using Observers as a stand-alone feature.
5
6The example shows:
7- How to direct events to a set of observers
8"""
9
10from state_of_things import Observers
11
12
13class InputObserver:
14 """Notified when input is received."""
15
16 def input_received(self, input_string: str):
17 pass
18
19
20class LoggingObserver(InputObserver):
21 """Logs the observed input."""
22
23 def input_received(self, input_string: str):
24 print(f"Observed: {input_string}")
25
26
27class WordCountObserver(InputObserver):
28 """Logs the number of words observed."""
29
30 def input_received(self, input_string: str):
31 print(f"Word Count: {len(input_string.split())}")
32
33
34def main():
35 # set up Observers for input
36 observers = Observers()
37 observers.attach(LoggingObserver())
38 observers.attach(WordCountObserver())
39
40 # display instructions. Type "exit" to terminate the program.
41 print(
42 'Input text and press enter to cause an event. Input "exit" to terminate this application.'
43 )
44
45 # will be set to True when "exit" is input
46 exit_requested = False
47
48 while not exit_requested:
49 # display a prompt and wait until a line of input is received
50 print("> ", end="")
51 input_string = input()
52
53 # notify observers of input
54 observers.notify("input_received", input_string)
55
56 # if "exit" is received, leave the loop
57 exit_requested = input_string == "exit"
58
59
60if __name__ == "__main__":
61 main()