1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module hunt.shiro.event.support.DefaultEventBus; 20 21 import hunt.shiro.event.support.AnnotationEventListenerResolver; 22 import hunt.shiro.event.support.EventListener; 23 import hunt.shiro.event.support.EventListenerResolver; 24 import hunt.shiro.event.support.EventListenerComparator; 25 import hunt.shiro.event.support.SingleArgumentMethodEventListener; 26 27 import hunt.shiro.event.EventBus; 28 29 import hunt.collection; 30 import hunt.Exceptions; 31 import hunt.logging; 32 33 import core.sync.rwmutex; 34 import std.array; 35 import std.concurrency : initOnce; 36 37 // import java.util.concurrent.locks.Lock; 38 // import java.util.concurrent.locks.ReentrantReadWriteLock; 39 40 41 /** 42 * A default event bus implementation that synchronously publishes events to registered listeners. Listeners can be 43 * registered or unregistered for events as necessary. 44 * <p/> 45 * An event bus enables a publish/subscribe paradigm within Shiro - components can publish or consume events they 46 * find relevant without needing to be tightly coupled to other components. This affords great 47 * flexibility within Shiro by promoting loose coupling and high cohesion between components and a much safer 48 * pluggable architecture that is more resilient to change over time. 49 * <h2>Sending Events</h2> 50 * If a component wishes to publish events to other components: 51 * <pre> 52 * MyEvent myEvent = createMyEvent(); 53 * eventBus.publish(myEvent); 54 * </pre> 55 * The event bus will determine the type of event and then dispatch the event to components that wish to receive 56 * events of that type. 57 * <h2>Receiving Events</h2> 58 * A component can receive events of interest by doing the following. 59 * <ol> 60 * <li>For each type of event you wish to consume, create a method that accepts a single event argument. 61 * The method argument type indicates the type of event to receive.</li> 62 * <li>Annotate each of these methods with the {@link hunt.shiro.event.Subscribe Subscribe} annotation.</li> 63 * <li>Register the component with the event bus: 64 * <pre> 65 * eventBus.register(myComponent); 66 * </pre> 67 * </li> 68 * </ol> 69 * After registering the component, when when an event of a respective type is published, the component's 70 * {@code Subscribe}-annotated method(s) will be invoked as expected. 71 * 72 * This design (and its constituent helper components) was largely influenced by 73 * Guava's <a href="http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/eventbus/EventBus.html">EventBus</a> 74 * concept, although no code was shared/imported (even though Guava is Apache 2.0 licensed and could have 75 * been used). 76 * 77 * This implementation is thread-safe and may be used concurrently. 78 * 79 * @since 1.3 80 */ 81 class DefaultEventBus : EventBus { 82 83 private enum string EVENT_LISTENER_ERROR_MSG = "Event listener processing failed. Listeners should " ~ 84 "generally handle exceptions directly and not propagate to the event bus."; 85 86 //this is stateless, we can retain a static final reference: 87 private static EventListenerComparator EVENT_LISTENER_COMPARATOR() { 88 __gshared EventListenerComparator inst; 89 return initOnce!inst(new EventListenerComparator); 90 } 91 92 private EventListenerResolver eventListenerResolver; 93 94 //We want to preserve registration order to deliver events to objects in the order that they are registered 95 //with the event bus. This has the nice effect that any Shiro system-level components that are registered first 96 //(likely to happen upon startup) have precedence over those registered by end-user components later. 97 // 98 //One might think that this could have been done by just using a ConcurrentSkipListMap (which is available only on 99 //JDK 6 or later). However, this approach requires the implementation of a Comparator to sort elements, and this 100 //surfaces a problem: for any given random event listener, there isn't any guaranteed property to exist that can be 101 //inspected to determine order of registration, since registration order is an artifact of this EventBus 102 //implementation, not the listeners themselves. 103 // 104 //Therefore, we use a simple concurrent lock to wrap a LinkedHashMap - the LinkedHashMap retains insertion order 105 //and the lock provides thread-safety in probably a much simpler mechanism than attempting to write a 106 //EventBus-specific Comparator. This technique is also likely to be faster than a ConcurrentSkipListMap, which 107 //is about 3-5 times slower than a standard ConcurrentMap. 108 private Map!(Object, Subscription) registry; 109 private ReadWriteMutex.Reader registryReadLock; 110 private ReadWriteMutex.Writer registryWriteLock; 111 112 this() { 113 this.registry = new LinkedHashMap!(Object, Subscription)(); //not thread safe, so we need locks: 114 115 ReadWriteMutex rwl = new ReadWriteMutex(); 116 this.registryReadLock = rwl.reader(); 117 this.registryWriteLock = rwl.writer(); 118 this.eventListenerResolver = new AnnotationEventListenerResolver(); 119 } 120 121 EventListenerResolver getEventListenerResolver() { 122 return eventListenerResolver; 123 } 124 125 void setEventListenerResolver(EventListenerResolver eventListenerResolver) { 126 this.eventListenerResolver = eventListenerResolver; 127 } 128 129 void publish(Object event) { 130 if (event is null) { 131 info("Received null event for publishing. Ignoring and returning."); 132 return; 133 } 134 135 registryReadLock.lock(); 136 try { 137 //performing the entire iteration within the lock will be a slow operation if the registry has a lot of 138 //contention. However, it is expected that the very large majority of cases the registry will be 139 //read-mostly with very little writes (registrations or removals) occurring during a typical application 140 //lifetime. 141 // 142 //The alternative would be to copy the registry.values() collection to a new LinkedHashSet within the lock 143 //only and the iteration on this new collection could be outside the lock. This has the performance penalty 144 //however of always creating a new collection every time an event is published, which could be more 145 //costly for the majority of applications, especially if the number of listeners is large. 146 // 147 //Finally, the read lock is re-entrant, so multiple publish calls will be 148 //concurrent without penalty since publishing is a read-only operation on the registry. 149 150 foreach (Subscription subscription ; this.registry.values()) { 151 subscription.onEvent(event); 152 } 153 } finally { 154 registryReadLock.unlock(); 155 } 156 } 157 158 void register(Object instance) { 159 if (instance is null) { 160 info("Received null instance for event listener registration. Ignoring registration request."); 161 return; 162 } 163 164 unregister(instance); 165 EventListener[] listeners = getEventListenerResolver().getEventListeners(instance); 166 167 if (listeners.empty()) { 168 warningf("Unable to resolve event listeners for subscriber instance [%s]. Ignoring registration request.", 169 instance.toString()); 170 return; 171 } 172 173 Subscription subscription = new Subscription(listeners); 174 175 this.registryWriteLock.lock(); 176 try { 177 this.registry.put(instance, subscription); 178 } finally { 179 this.registryWriteLock.unlock(); 180 } 181 } 182 183 void unregister(Object instance) { 184 if (instance is null) { 185 return; 186 } 187 this.registryWriteLock.lock(); 188 try { 189 this.registry.remove(instance); 190 } finally { 191 this.registryWriteLock.unlock(); 192 } 193 } 194 195 private class Subscription { 196 197 private List!EventListener listeners; 198 199 this(EventListener[] listeners) { 200 List!EventListener toSort = new ArrayList!EventListener(listeners); 201 // Collections.sort(toSort, EVENT_LISTENER_COMPARATOR); 202 toSort.sort(EVENT_LISTENER_COMPARATOR); 203 this.listeners = toSort; 204 } 205 206 void onEvent(Object event) { 207 208 Set!Object delivered = new HashSet!Object(); 209 210 implementationMissing(false); 211 foreach (EventListener listener ; this.listeners) { 212 Object target = cast(Object)listener; 213 // SingleArgumentMethodEventListener singleArgListener = cast(SingleArgumentMethodEventListener) listener; 214 // if (singleArgListener !is null) { 215 // target = singleArgListener.getTarget(); 216 // } 217 if (listener.accepts(event) && !delivered.contains(target)) { 218 try { 219 listener.onEvent(event); 220 } catch (Throwable t) { 221 warning(EVENT_LISTENER_ERROR_MSG, t); 222 } 223 delivered.add(target); 224 } 225 } 226 } 227 } 228 }