1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module hunt.shiro.event.support.DefaultEventBus;
20 
21 import hunt.shiro.event.support.AnnotationEventListenerResolver;
22 import hunt.shiro.event.support.EventListener;
23 import hunt.shiro.event.support.EventListenerResolver;
24 import hunt.shiro.event.support.EventListenerComparator;
25 import hunt.shiro.event.support.SingleArgumentMethodEventListener;
26 
27 import hunt.shiro.event.EventBus;
28 
29 import hunt.collection;
30 import hunt.Exceptions;
31 import hunt.logging;
32 
33 import core.sync.rwmutex;
34 import std.array;
35 import std.concurrency : initOnce;
36 
37 // import java.util.concurrent.locks.Lock;
38 // import java.util.concurrent.locks.ReentrantReadWriteLock;
39 
40 
41 /**
42  * A default event bus implementation that synchronously publishes events to registered listeners.  Listeners can be
43  * registered or unregistered for events as necessary.
44  * <p/>
45  * An event bus enables a publish/subscribe paradigm within Shiro - components can publish or consume events they
46  * find relevant without needing to be tightly coupled to other components.  This affords great
47  * flexibility within Shiro by promoting loose coupling and high cohesion between components and a much safer
48  * pluggable architecture that is more resilient to change over time.
49  * <h2>Sending Events</h2>
50  * If a component wishes to publish events to other components:
51  * <pre>
52  *     MyEvent myEvent = createMyEvent();
53  *     eventBus.publish(myEvent);
54  * </pre>
55  * The event bus will determine the type of event and then dispatch the event to components that wish to receive
56  * events of that type.
57  * <h2>Receiving Events</h2>
58  * A component can receive events of interest by doing the following.
59  * <ol>
60  * <li>For each type of event you wish to consume, create a method that accepts a single event argument.
61  * The method argument type indicates the type of event to receive.</li>
62  * <li>Annotate each of these methods with the {@link hunt.shiro.event.Subscribe Subscribe} annotation.</li>
63  * <li>Register the component with the event bus:
64  * <pre>
65  *         eventBus.register(myComponent);
66  *     </pre>
67  * </li>
68  * </ol>
69  * After registering the component, when when an event of a respective type is published, the component's
70  * {@code Subscribe}-annotated method(s) will be invoked as expected.
71  *
72  * This design (and its constituent helper components) was largely influenced by
73  * Guava's <a href="http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/eventbus/EventBus.html">EventBus</a>
74  * concept, although no code was shared/imported (even though Guava is Apache 2.0 licensed and could have
75  * been used).
76  *
77  * This implementation is thread-safe and may be used concurrently.
78  *
79  * @since 1.3
80  */
81 class DefaultEventBus : EventBus {
82 
83     private enum string EVENT_LISTENER_ERROR_MSG = "Event listener processing failed.  Listeners should " ~
84             "generally handle exceptions directly and not propagate to the event bus.";
85 
86     //this is stateless, we can retain a static final reference:
87     private static EventListenerComparator EVENT_LISTENER_COMPARATOR() {
88         __gshared EventListenerComparator inst;
89         return initOnce!inst(new EventListenerComparator);
90     }
91 
92     private EventListenerResolver eventListenerResolver;
93 
94     //We want to preserve registration order to deliver events to objects in the order that they are registered
95     //with the event bus.  This has the nice effect that any Shiro system-level components that are registered first
96     //(likely to happen upon startup) have precedence over those registered by end-user components later.
97     //
98     //One might think that this could have been done by just using a ConcurrentSkipListMap (which is available only on
99     //JDK 6 or later).  However, this approach requires the implementation of a Comparator to sort elements, and this
100     //surfaces a problem: for any given random event listener, there isn't any guaranteed property to exist that can be
101     //inspected to determine order of registration, since registration order is an artifact of this EventBus
102     //implementation, not the listeners themselves.
103     //
104     //Therefore, we use a simple concurrent lock to wrap a LinkedHashMap - the LinkedHashMap retains insertion order
105     //and the lock provides thread-safety in probably a much simpler mechanism than attempting to write a
106     //EventBus-specific Comparator.  This technique is also likely to be faster than a ConcurrentSkipListMap, which
107     //is about 3-5 times slower than a standard ConcurrentMap.
108     private Map!(Object, Subscription) registry;
109     private ReadWriteMutex.Reader registryReadLock;
110     private ReadWriteMutex.Writer registryWriteLock;
111 
112     this() {
113         this.registry = new LinkedHashMap!(Object, Subscription)(); //not thread safe, so we need locks:
114 
115         ReadWriteMutex rwl = new ReadWriteMutex();
116         this.registryReadLock = rwl.reader();
117         this.registryWriteLock = rwl.writer();
118         this.eventListenerResolver = new AnnotationEventListenerResolver();
119     }
120 
121     EventListenerResolver getEventListenerResolver() {
122         return eventListenerResolver;
123     }
124 
125     void setEventListenerResolver(EventListenerResolver eventListenerResolver) {
126         this.eventListenerResolver = eventListenerResolver;
127     }
128 
129     void publish(Object event) {
130         if (event is null) {
131             info("Received null event for publishing.  Ignoring and returning.");
132             return;
133         }
134 
135         registryReadLock.lock();
136         try {
137             //performing the entire iteration within the lock will be a slow operation if the registry has a lot of
138             //contention.  However, it is expected that the very large majority of cases the registry will be
139             //read-mostly with very little writes (registrations or removals) occurring during a typical application
140             //lifetime.
141             //
142             //The alternative would be to copy the registry.values() collection to a new LinkedHashSet within the lock
143             //only and the iteration on this new collection could be outside the lock.  This has the performance penalty
144             //however of always creating a new collection every time an event is published,  which could be more
145             //costly for the majority of applications, especially if the number of listeners is large.
146             //
147             //Finally, the read lock is re-entrant, so multiple publish calls will be
148             //concurrent without penalty since publishing is a read-only operation on the registry.
149 
150             foreach (Subscription subscription ; this.registry.values()) {
151                 subscription.onEvent(event);
152             }
153         } finally {
154             registryReadLock.unlock();
155         }
156     }
157 
158     void register(Object instance) {
159         if (instance is null) {
160             info("Received null instance for event listener registration.  Ignoring registration request.");
161             return;
162         }
163 
164         unregister(instance);
165         EventListener[] listeners = getEventListenerResolver().getEventListeners(instance);
166 
167         if (listeners.empty()) {
168             warningf("Unable to resolve event listeners for subscriber instance [%s]. Ignoring registration request.",
169                     instance.toString());
170             return;
171         }
172 
173         Subscription subscription = new Subscription(listeners);
174 
175         this.registryWriteLock.lock();
176         try {
177             this.registry.put(instance, subscription);
178         } finally {
179             this.registryWriteLock.unlock();
180         }
181     }
182 
183     void unregister(Object instance) {
184         if (instance is null) {
185             return;
186         }
187         this.registryWriteLock.lock();
188         try {
189             this.registry.remove(instance);
190         } finally {
191             this.registryWriteLock.unlock();
192         }
193     }
194 
195     private class Subscription {
196 
197         private List!EventListener listeners;
198 
199         this(EventListener[] listeners) {
200             List!EventListener toSort = new ArrayList!EventListener(listeners);
201             // Collections.sort(toSort, EVENT_LISTENER_COMPARATOR);
202             toSort.sort(EVENT_LISTENER_COMPARATOR);
203             this.listeners = toSort;
204         }
205 
206         void onEvent(Object event) {
207 
208             Set!Object delivered = new HashSet!Object();
209 
210             implementationMissing(false);
211             foreach (EventListener listener ; this.listeners) {
212                 Object target = cast(Object)listener;
213                 // SingleArgumentMethodEventListener singleArgListener = cast(SingleArgumentMethodEventListener) listener;
214                 // if (singleArgListener !is null) {
215                 //     target = singleArgListener.getTarget();
216                 // }
217                 if (listener.accepts(event) && !delivered.contains(target)) {
218                     try {
219                         listener.onEvent(event);
220                     } catch (Throwable t) {
221                         warning(EVENT_LISTENER_ERROR_MSG, t);
222                     }
223                     delivered.add(target);
224                 }
225             }
226         }
227     }
228 }