A brief explanation
First, before start coding. You must read this, a good article that explains the difference between a publish subscribe and an observer. If you don't want read this article, I think that this graphic will be enough
As we can see in this graphic, an subscriber(callback function) will be attached to an event. Once this step completed, a publisher will add to the event channel a new event firing all the subscribers attached to that event.
Our class
class EventChannel(object): def __init__(self): self.subscribers = {} def unsubscribe(self, event, callback): if event is not None or event != ""\ and event in self.subscribers.keys(): self.subscribers[event] = list( filter( lambda x: x is not callback, self.subscribers[event] ) ) def subscribe(self, event, callback): if not callable(callback): raise ValueError("callback must be callable") if event is None or event == "": raise ValueError("Event cant be empty") if event not in self.subscribers.keys(): self.subscribers[event] = [callback] else: self.subscribers[event].append(callback) def publish(self, event, args): if event in self.subscribers.keys(): for callback in self.subscribers[event]: callback(args)
Testing
event_channel = EventChannel() callback = lambda x: print(x) event_channel.subscribe("myevent", callback) event_channel.publish("myevent", "Hello, world!") # out: "Hello, world!" event_channel.unsubscribe("myevent", callback) bus_instance.publish("myevent", "Hello, world!") # No output
Links
Wich things will you improve?.Thank you for reading, and write any thought below :D
Top comments (0)