1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.embedded;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelPromise;
21 import io.netty.channel.DefaultChannelPromise;
22 import io.netty.channel.EventLoop;
23 import io.netty.channel.EventLoopGroup;
24 import io.netty.util.concurrent.AbstractScheduledEventExecutor;
25 import io.netty.util.concurrent.Future;
26 import io.netty.util.internal.ObjectUtil;
27
28 import java.util.ArrayDeque;
29 import java.util.Queue;
30 import java.util.concurrent.TimeUnit;
31
32 final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {
33
34
35
36
37
38
39
40 private long startTime = initialNanoTime();
41
42
43
44 private long frozenTimestamp;
45
46
47
48 private boolean timeFrozen;
49
50 private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
51
52 @Override
53 public EventLoopGroup parent() {
54 return (EventLoopGroup) super.parent();
55 }
56
57 @Override
58 public EventLoop next() {
59 return (EventLoop) super.next();
60 }
61
62 @Override
63 public void execute(Runnable command) {
64 tasks.add(ObjectUtil.checkNotNull(command, "command"));
65 }
66
67 void runTasks() {
68 for (;;) {
69 Runnable task = tasks.poll();
70 if (task == null) {
71 break;
72 }
73
74 task.run();
75 }
76 }
77
78 boolean hasPendingNormalTasks() {
79 return !tasks.isEmpty();
80 }
81
82 long runScheduledTasks() {
83 long time = getCurrentTimeNanos();
84 for (;;) {
85 Runnable task = pollScheduledTask(time);
86 if (task == null) {
87 return nextScheduledTaskNano();
88 }
89
90 task.run();
91 }
92 }
93
94 long nextScheduledTask() {
95 return nextScheduledTaskNano();
96 }
97
98 @Override
99 protected long getCurrentTimeNanos() {
100 if (timeFrozen) {
101 return frozenTimestamp;
102 }
103 return System.nanoTime() - startTime;
104 }
105
106 void advanceTimeBy(long nanos) {
107 if (timeFrozen) {
108 frozenTimestamp += nanos;
109 } else {
110
111 startTime -= nanos;
112 }
113 }
114
115 void freezeTime() {
116 if (!timeFrozen) {
117 frozenTimestamp = getCurrentTimeNanos();
118 timeFrozen = true;
119 }
120 }
121
122 void unfreezeTime() {
123 if (timeFrozen) {
124
125
126
127 startTime = System.nanoTime() - frozenTimestamp;
128 timeFrozen = false;
129 }
130 }
131
132 @Override
133 protected void cancelScheduledTasks() {
134 super.cancelScheduledTasks();
135 }
136
137 @Override
138 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
139 throw new UnsupportedOperationException();
140 }
141
142 @Override
143 public Future<?> terminationFuture() {
144 throw new UnsupportedOperationException();
145 }
146
147 @Override
148 @Deprecated
149 public void shutdown() {
150 throw new UnsupportedOperationException();
151 }
152
153 @Override
154 public boolean isShuttingDown() {
155 return false;
156 }
157
158 @Override
159 public boolean isShutdown() {
160 return false;
161 }
162
163 @Override
164 public boolean isTerminated() {
165 return false;
166 }
167
168 @Override
169 public boolean awaitTermination(long timeout, TimeUnit unit) {
170 return false;
171 }
172
173 @Override
174 public ChannelFuture register(Channel channel) {
175 return register(new DefaultChannelPromise(channel, this));
176 }
177
178 @Override
179 public ChannelFuture register(ChannelPromise promise) {
180 ObjectUtil.checkNotNull(promise, "promise");
181 promise.channel().unsafe().register(this, promise);
182 return promise;
183 }
184
185 @Deprecated
186 @Override
187 public ChannelFuture register(Channel channel, ChannelPromise promise) {
188 channel.unsafe().register(this, promise);
189 return promise;
190 }
191
192 @Override
193 public boolean inEventLoop() {
194 return true;
195 }
196
197 @Override
198 public boolean inEventLoop(Thread thread) {
199 return true;
200 }
201 }