The Observer pattern is used to create a relationship between two objects. The relationship is usually unidirectional with the observer waiting for notices from the observed. The relationships is a loose one as the observed only needs to know of the interest of the observer and the observer only needs to know of the set of and ordering of notice the observed will send. A downside of this looseness is that all notices pass through a single observer method for further dispatching.
10 package com.andrewgilmartin.common.observation; 11 12 import java.util.Set; 13 import java.util.concurrent.BlockingQueue; 14 import java.util.concurrent.CopyOnWriteArraySet; 15 import java.util.concurrent.LinkedBlockingQueue; 16
The Observation class is only used to group all the related interfaces and classes into one file. In a typical development environment each interface and class would be in its own file.
22 public class Observation { 23
The Observable is the object that is being watched. As changes are prepared and then made the observable will send consent and information notices to the observers. The notices are normally specialized classes that hold some context about the change. For example, if the notice concerns the addition of new inventory to the warehouse then the notice's class could have a method for enumerating the new inventory.
32 public interface Observable { 33
If it is necessary for the observable to have approval before making a change then a consent notification is sent to the observers. Each observer will be notified and if any observer opposes the change then it must return false. The current thread will be used to notify the observers and so all the observed must wait for all observers to consent.
42 boolean consentNotification(Object notice); 43
An information notification is normally sent after a change. Since the change has already occurred the notices are typically sent asynchronously by a background thread. The order of the notices is preserved, however.
50 void informationNotification(Object notice); 51 } 52
The observer is the object that is notified by the observable. There is no typed relationship beyond the Observable and Observer classes. As mentioned earlier, the notices are usually of specialized classes where each notice instance holds data relevant to the change.
59 public interface Observer { 60 61 boolean notice(Observable observable, Object notice); 62 } 63
A registry is the means of establishing the relationship between the observable and the observer. This interface is distinct from Observable as it is sometimes useful to indirectly register an observer via, for example, a registrar.
70 public interface Registry { 71
The order of the observers is undefined.
75 Set<Observer> getObservers(); 76
Adds the observer to the set of observers. Returns true if the observer was added.
81 boolean addObserver(Observer observer); 82
Removes the observer from the set of observers. Returns true if the observer was among the observers and was removed.
87 boolean deleteObserver(Observer observer); 88 } 89
There is often very little difference between implementations of Observable and Registry and so this base implementation can be widely employed by any class that wants to be observed.
When extending this class make sure to document the set of notices, their consent or information role, and what is their ordering.
98 public static class ObservableBase implements Observable, Registry { 99
The management of the set of observers needs to be thread-safe. The set is expected to be mostly stable over the life of the observed and so copy-on-write semantics is appropriate here.
105 private final Set<Observer> observers = new CopyOnWriteArraySet<Observer>();
Information notification notices will be sent by a background thread. A blocking queue will be used to coordinate the passing of notices from the observed to this background thread.
111 private final BlockingQueue<Object> informationNotices = new LinkedBlockingQueue<Object>(); 112 113 public ObservableBase() {
This implementation of the information notification background thread is quite simple and so uses an anonymous class for the implementation.
119 Thread informationEventsDispatcher = new Thread(new Runnable() { 120 @Override 121 public void run() { 122 try {
Here the thread waits for a new notice on the queue and then sends it to each of the current observers.
127 for (;;) { 128 Object notice = informationNotices.take(); 129 for (Observer observer : observers) { 130 observer.notice(ObservableBase.this, notice); 131 } 132 } 133 } 134 catch (InterruptedException e) { 135 // empty 136 } 137 } 138 }); 139 informationEventsDispatcher.setDaemon(true); 140 informationEventsDispatcher.start(); 141 } 142 143 @Override 144 public boolean consentNotification(Object notice) {
As mentioned earlier, consent notifications are performed by the observed's thread. In this way, as soon as any observer opposes the change the observed must reject the change.
150 for (Observer observer : observers) { 151 if (!observer.notice(this, notice)) { 152 return false; 153 } 154 } 155 return true; 156 } 157 158 @Override 159 public void informationNotification(Object notice) {
Pass along the notice to the background thread.
163 informationNotices.add(notice); 164 } 165 166 @Override 167 public Set<Observer> getObservers() { 168 return observers; 169 } 170 171 @Override 172 public boolean addObserver(Observer observer) { 173 return observers.add(observer); 174 } 175 176 @Override 177 public boolean deleteObserver(Observer observer) { 178 return observers.remove(observer); 179 } 180 } 181
Here is a small example of using the observation interfaces and classes.
185 public static void main(String... args) throws Exception { 186 187 class Notice { 188 189 private int senderId; 190 private int sequenceNumber; 191 192 public Notice(int senderId, int sequenceNumber) { 193 this.senderId = senderId; 194 this.sequenceNumber = sequenceNumber; 195 } 196 197 public int getSenderId() { 198 return senderId; 199 } 200 201 public int getSequenceNumber() { 202 return sequenceNumber; 203 } 204 } 205
The Sender is an observed. All that it does it to send a stream of notices consisting of sender-id & sequence-number pairs.
210 class Sender extends ObservableBase implements Runnable { 211 212 private int senderId; 213 214 public Sender(int id) { 215 this.senderId = id; 216 } 217 218 @Override 219 public void run() { 220 for (int sequenceNumber = 0;; sequenceNumber++) { 221 informationNotification(new Notice(senderId, sequenceNumber)); 222 sleep(); // add some randomness to the processing. 223 } 224 } 225 } 226
The Receiver is an observer. All that it does is to print the notice's facts. In this example all notices are information notifications and so the notice() return value does not matter. However, as a matter of course, notice() should always return true unless it is well sure of the consequences of opposing the change.
234 class Receiver implements Observer { 235 236 private int receiverId; 237 238 public Receiver(int id) { 239 this.receiverId = id; 240 } 241 242 @Override 243 public boolean notice(Observable observable, Object notice) { 244 if (notice instanceof Notice) { 245 Notice n = (Notice) notice; 246 System.out.printf("notice %d %d %d\n", receiverId, n.getSenderId(), n.getSequenceNumber()); 247 sleep(); // add some randomness to the processing. 248 } 249 return true; 250 } 251 } 252
Create a few senders.
256 Sender[] senders = new Sender[5]; 257 for (int i = 0; i < senders.length; i++) { 258 senders[i] = new Sender(i); 259 } 260
Create a few receivers.
264 Receiver[] receivers = new Receiver[3]; 265 for (int i = 0; i < receivers.length; i++) { 266 receivers[i] = new Receiver(i); 267 } 268
Have each receiver observe each sender
272 for (Receiver r : receivers) { 273 for (Sender s : senders) { 274 s.addObserver(r); 275 } 276 } 277
Startup the senders
281 for (Sender s : senders) { 282 Thread t = new Thread(s); 283 t.setDaemon(false); 284 t.start(); 285 } 286 } 287
Add some sleep of random duration to the current thread.
291 static void sleep() { 292 try { 293 Thread.sleep(Math.round(Math.random() * 25)); 294 } 295 catch (InterruptedException e) { 296 // empty 297 } 298 } 299 }
To build and run this from the command line, first compile using
303 javac -d /tmp ./src/com/andrewgilmartin/common/observation/Observation
And then run using
308 java -classpath /tmp com.andrewgilmartin.common.observation.Observation
You can show to yourself that the events are ordered by sorting the output on the receiver id and you will see that the events are in numeric order.
314 java -classpath /tmp com.andrewgilmartin.common.observation.Observation | head -20 | sort -k 2 -n -s
317 318 // END