查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.ObjectUtil;
19  import io.netty.util.internal.logging.InternalLogger;
20  import io.netty.util.internal.logging.InternalLoggerFactory;
21  
22  import java.util.ArrayDeque;
23  import java.util.Queue;
24  import java.util.concurrent.TimeUnit;
25  
26  /**
27   * Executes {@link Runnable} objects in the caller's thread. If the {@link #execute(Runnable)} is reentrant it will be
28   * queued until the original {@link Runnable} finishes execution.
29   * <p>
30   * All {@link Throwable} objects thrown from {@link #execute(Runnable)} will be swallowed and logged. This is to ensure
31   * that all queued {@link Runnable} objects have the chance to be run.
32   */
33  public final class ImmediateEventExecutor extends AbstractEventExecutor {
34      private static final InternalLogger logger = InternalLoggerFactory.getInstance(ImmediateEventExecutor.class);
35      public static final ImmediateEventExecutor INSTANCE = new ImmediateEventExecutor();
36      /**
37       * A Runnable will be queued if we are executing a Runnable. This is to prevent a {@link StackOverflowError}.
38       */
39      private static final FastThreadLocal<Queue<Runnable>> DELAYED_RUNNABLES = new FastThreadLocal<Queue<Runnable>>() {
40          @Override
41          protected Queue<Runnable> initialValue() throws Exception {
42              return new ArrayDeque<Runnable>();
43          }
44      };
45      /**
46       * Set to {@code true} if we are executing a runnable.
47       */
48      private static final FastThreadLocal<Boolean> RUNNING = new FastThreadLocal<Boolean>() {
49          @Override
50          protected Boolean initialValue() throws Exception {
51              return false;
52          }
53      };
54  
55      private final Future<?> terminationFuture = new FailedFuture<Object>(
56              GlobalEventExecutor.INSTANCE, new UnsupportedOperationException());
57  
58      private ImmediateEventExecutor() { }
59  
60      @Override
61      public boolean inEventLoop() {
62          return true;
63      }
64  
65      @Override
66      public boolean inEventLoop(Thread thread) {
67          return true;
68      }
69  
70      @Override
71      public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
72          return terminationFuture();
73      }
74  
75      @Override
76      public Future<?> terminationFuture() {
77          return terminationFuture;
78      }
79  
80      @Override
81      @Deprecated
82      public void shutdown() { }
83  
84      @Override
85      public boolean isShuttingDown() {
86          return false;
87      }
88  
89      @Override
90      public boolean isShutdown() {
91          return false;
92      }
93  
94      @Override
95      public boolean isTerminated() {
96          return false;
97      }
98  
99      @Override
100     public boolean awaitTermination(long timeout, TimeUnit unit) {
101         return false;
102     }
103 
104     @Override
105     public void execute(Runnable command) {
106         ObjectUtil.checkNotNull(command, "command");
107         if (!RUNNING.get()) {
108             RUNNING.set(true);
109             try {
110                 command.run();
111             } catch (Throwable cause) {
112                 logger.info("Throwable caught while executing Runnable {}", command, cause);
113             } finally {
114                 Queue<Runnable> delayedRunnables = DELAYED_RUNNABLES.get();
115                 Runnable runnable;
116                 while ((runnable = delayedRunnables.poll()) != null) {
117                     try {
118                         runnable.run();
119                     } catch (Throwable cause) {
120                         logger.info("Throwable caught while executing Runnable {}", runnable, cause);
121                     }
122                 }
123                 RUNNING.set(false);
124             }
125         } else {
126             DELAYED_RUNNABLES.get().add(command);
127         }
128     }
129 
130     @Override
131     public <V> Promise<V> newPromise() {
132         return new ImmediatePromise<V>(this);
133     }
134 
135     @Override
136     public <V> ProgressivePromise<V> newProgressivePromise() {
137         return new ImmediateProgressivePromise<V>(this);
138     }
139 
140     static class ImmediatePromise<V> extends DefaultPromise<V> {
141         ImmediatePromise(EventExecutor executor) {
142             super(executor);
143         }
144 
145         @Override
146         protected void checkDeadLock() {
147             // No check
148         }
149     }
150 
151     static class ImmediateProgressivePromise<V> extends DefaultProgressivePromise<V> {
152         ImmediateProgressivePromise(EventExecutor executor) {
153             super(executor);
154         }
155 
156         @Override
157         protected void checkDeadLock() {
158             // No check
159         }
160     }
161 }