1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.concurrent.RejectedExecutionHandler;
19 import io.netty.util.concurrent.RejectedExecutionHandlers;
20 import io.netty.util.concurrent.SingleThreadEventExecutor;
21 import io.netty.util.internal.ObjectUtil;
22 import io.netty.util.internal.SystemPropertyUtil;
23 import io.netty.util.internal.UnstableApi;
24
25 import java.util.Iterator;
26 import java.util.NoSuchElementException;
27 import java.util.Queue;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.ThreadFactory;
30
31
32
33
34
35 public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
36
37 protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
38 SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
39
40 private final Queue<Runnable> tailTasks;
41
42 protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
43 this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
44 }
45
46 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
47 this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
48 }
49
50 protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
51 boolean addTaskWakesUp, int maxPendingTasks,
52 RejectedExecutionHandler rejectedExecutionHandler) {
53 super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
54 tailTasks = newTaskQueue(maxPendingTasks);
55 }
56
57 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
58 boolean addTaskWakesUp, int maxPendingTasks,
59 RejectedExecutionHandler rejectedExecutionHandler) {
60 super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
61 tailTasks = newTaskQueue(maxPendingTasks);
62 }
63
64 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
65 boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
66 RejectedExecutionHandler rejectedExecutionHandler) {
67 super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
68 tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
69 }
70
71 @Override
72 public EventLoopGroup parent() {
73 return (EventLoopGroup) super.parent();
74 }
75
76 @Override
77 public EventLoop next() {
78 return (EventLoop) super.next();
79 }
80
81 @Override
82 public ChannelFuture register(Channel channel) {
83 return register(new DefaultChannelPromise(channel, this));
84 }
85
86 @Override
87 public ChannelFuture register(final ChannelPromise promise) {
88 ObjectUtil.checkNotNull(promise, "promise");
89 promise.channel().unsafe().register(this, promise);
90 return promise;
91 }
92
93 @Deprecated
94 @Override
95 public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
96 ObjectUtil.checkNotNull(promise, "promise");
97 ObjectUtil.checkNotNull(channel, "channel");
98 channel.unsafe().register(this, promise);
99 return promise;
100 }
101
102
103
104
105
106
107 @UnstableApi
108 public final void executeAfterEventLoopIteration(Runnable task) {
109 ObjectUtil.checkNotNull(task, "task");
110 if (isShutdown()) {
111 reject();
112 }
113
114 if (!tailTasks.offer(task)) {
115 reject(task);
116 }
117
118 if (wakesUpForTask(task)) {
119 wakeup(inEventLoop());
120 }
121 }
122
123
124
125
126
127
128
129
130 @UnstableApi
131 final boolean removeAfterEventLoopIterationTask(Runnable task) {
132 return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
133 }
134
135 @Override
136 protected void afterRunningAllTasks() {
137 runAllTasksFrom(tailTasks);
138 }
139
140 @Override
141 protected boolean hasTasks() {
142 return super.hasTasks() || !tailTasks.isEmpty();
143 }
144
145 @Override
146 public int pendingTasks() {
147 return super.pendingTasks() + tailTasks.size();
148 }
149
150
151
152
153
154
155 @UnstableApi
156 public int registeredChannels() {
157 return -1;
158 }
159
160
161
162
163
164
165
166
167 @UnstableApi
168 public Iterator<Channel> registeredChannelsIterator() {
169 throw new UnsupportedOperationException("registeredChannelsIterator");
170 }
171
172 protected static final class ChannelsReadOnlyIterator<T extends Channel> implements Iterator<Channel> {
173 private final Iterator<T> channelIterator;
174
175 public ChannelsReadOnlyIterator(Iterable<T> channelIterable) {
176 this.channelIterator =
177 ObjectUtil.checkNotNull(channelIterable, "channelIterable").iterator();
178 }
179
180 @Override
181 public boolean hasNext() {
182 return channelIterator.hasNext();
183 }
184
185 @Override
186 public Channel next() {
187 return channelIterator.next();
188 }
189
190 @Override
191 public void remove() {
192 throw new UnsupportedOperationException("remove");
193 }
194
195 @SuppressWarnings("unchecked")
196 public static <T> Iterator<T> empty() {
197 return (Iterator<T>) EMPTY;
198 }
199
200 private static final Iterator<Object> EMPTY = new Iterator<Object>() {
201 @Override
202 public boolean hasNext() {
203 return false;
204 }
205
206 @Override
207 public Object next() {
208 throw new NoSuchElementException();
209 }
210
211 @Override
212 public void remove() {
213 throw new UnsupportedOperationException("remove");
214 }
215 };
216 }
217 }