1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.AbstractReferenceCountedByteBuf;
19 import io.netty.util.ReferenceCountUtil;
20 import io.netty.util.concurrent.EventExecutor;
21 import io.netty.util.concurrent.PromiseCombiner;
22 import io.netty.util.internal.ObjectPool;
23 import io.netty.util.internal.ObjectPool.ObjectCreator;
24 import io.netty.util.internal.ObjectUtil;
25 import io.netty.util.internal.SystemPropertyUtil;
26 import io.netty.util.internal.logging.InternalLogger;
27 import io.netty.util.internal.logging.InternalLoggerFactory;
28
29
30
31
32
33
34 public final class PendingWriteQueue {
35 private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class);
36
37
38
39
40 private static final int PENDING_WRITE_OVERHEAD =
41 SystemPropertyUtil.getInt("io.netty.transport.pendingWriteSizeOverhead", 64);
42
43 private final ChannelOutboundInvoker invoker;
44 private final EventExecutor executor;
45 private final PendingBytesTracker tracker;
46
47
48 private PendingWrite head;
49 private PendingWrite tail;
50 private int size;
51 private long bytes;
52
53 public PendingWriteQueue(ChannelHandlerContext ctx) {
54 tracker = PendingBytesTracker.newTracker(ctx.channel());
55 this.invoker = ctx;
56 this.executor = ctx.executor();
57 }
58
59 public PendingWriteQueue(Channel channel) {
60 tracker = PendingBytesTracker.newTracker(channel);
61 this.invoker = channel;
62 this.executor = channel.eventLoop();
63 }
64
65
66
67
68 public boolean isEmpty() {
69 assert executor.inEventLoop();
70 return head == null;
71 }
72
73
74
75
76 public int size() {
77 assert executor.inEventLoop();
78 return size;
79 }
80
81
82
83
84
85 public long bytes() {
86 assert executor.inEventLoop();
87 return bytes;
88 }
89
90 private int size(Object msg) {
91
92
93 int messageSize = tracker.size(msg);
94 if (messageSize < 0) {
95
96 messageSize = 0;
97 }
98 return messageSize + PENDING_WRITE_OVERHEAD;
99 }
100
101
102
103
104 public void add(Object msg, ChannelPromise promise) {
105 assert executor.inEventLoop();
106 ObjectUtil.checkNotNull(msg, "msg");
107 ObjectUtil.checkNotNull(promise, "promise");
108
109
110 int messageSize = size(msg);
111
112 PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise);
113 PendingWrite currentTail = tail;
114 if (currentTail == null) {
115 tail = head = write;
116 } else {
117 currentTail.next = write;
118 tail = write;
119 }
120 size ++;
121 bytes += messageSize;
122 tracker.incrementPendingOutboundBytes(write.size);
123
124
125
126
127 if (msg instanceof AbstractReferenceCountedByteBuf) {
128 ((AbstractReferenceCountedByteBuf) msg).touch();
129 } else {
130 ReferenceCountUtil.touch(msg);
131 }
132 }
133
134
135
136
137
138
139
140
141 public ChannelFuture removeAndWriteAll() {
142 assert executor.inEventLoop();
143
144 if (isEmpty()) {
145 return null;
146 }
147
148 ChannelPromise p = invoker.newPromise();
149 PromiseCombiner combiner = new PromiseCombiner(executor);
150 try {
151
152
153 for (PendingWrite write = head; write != null; write = head) {
154 head = tail = null;
155 size = 0;
156 bytes = 0;
157
158 while (write != null) {
159 PendingWrite next = write.next;
160 Object msg = write.msg;
161 ChannelPromise promise = write.promise;
162 recycle(write, false);
163 if (!(promise instanceof VoidChannelPromise)) {
164 combiner.add(promise);
165 }
166 invoker.write(msg, promise);
167 write = next;
168 }
169 }
170 combiner.finish(p);
171 } catch (Throwable cause) {
172 p.setFailure(cause);
173 }
174 assertEmpty();
175 return p;
176 }
177
178
179
180
181
182 public void removeAndFailAll(Throwable cause) {
183 assert executor.inEventLoop();
184 ObjectUtil.checkNotNull(cause, "cause");
185
186
187 for (PendingWrite write = head; write != null; write = head) {
188 head = tail = null;
189 size = 0;
190 bytes = 0;
191 while (write != null) {
192 PendingWrite next = write.next;
193 ReferenceCountUtil.safeRelease(write.msg);
194 ChannelPromise promise = write.promise;
195 recycle(write, false);
196 safeFail(promise, cause);
197 write = next;
198 }
199 }
200 assertEmpty();
201 }
202
203
204
205
206
207 public void removeAndFail(Throwable cause) {
208 assert executor.inEventLoop();
209 ObjectUtil.checkNotNull(cause, "cause");
210
211 PendingWrite write = head;
212 if (write == null) {
213 return;
214 }
215 ReferenceCountUtil.safeRelease(write.msg);
216 ChannelPromise promise = write.promise;
217 safeFail(promise, cause);
218 recycle(write, true);
219 }
220
221 private void assertEmpty() {
222 assert tail == null && head == null && size == 0;
223 }
224
225
226
227
228
229
230
231
232 public ChannelFuture removeAndWrite() {
233 assert executor.inEventLoop();
234 PendingWrite write = head;
235 if (write == null) {
236 return null;
237 }
238 Object msg = write.msg;
239 ChannelPromise promise = write.promise;
240 recycle(write, true);
241 return invoker.write(msg, promise);
242 }
243
244
245
246
247
248
249
250 public ChannelPromise remove() {
251 assert executor.inEventLoop();
252 PendingWrite write = head;
253 if (write == null) {
254 return null;
255 }
256 ChannelPromise promise = write.promise;
257 ReferenceCountUtil.safeRelease(write.msg);
258 recycle(write, true);
259 return promise;
260 }
261
262
263
264
265 public Object current() {
266 assert executor.inEventLoop();
267 PendingWrite write = head;
268 if (write == null) {
269 return null;
270 }
271 return write.msg;
272 }
273
274 private void recycle(PendingWrite write, boolean update) {
275 final PendingWrite next = write.next;
276 final long writeSize = write.size;
277
278 if (update) {
279 if (next == null) {
280
281
282 head = tail = null;
283 size = 0;
284 bytes = 0;
285 } else {
286 head = next;
287 size --;
288 bytes -= writeSize;
289 assert size > 0 && bytes >= 0;
290 }
291 }
292
293 write.recycle();
294 tracker.decrementPendingOutboundBytes(writeSize);
295 }
296
297 private static void safeFail(ChannelPromise promise, Throwable cause) {
298 if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
299 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
300 }
301 }
302
303
304
305
306 static final class PendingWrite {
307 private static final ObjectPool<PendingWrite> RECYCLER = ObjectPool.newPool(new ObjectCreator<PendingWrite>() {
308 @Override
309 public PendingWrite newObject(ObjectPool.Handle<PendingWrite> handle) {
310 return new PendingWrite(handle);
311 }
312 });
313
314 private final ObjectPool.Handle<PendingWrite> handle;
315 private PendingWrite next;
316 private long size;
317 private ChannelPromise promise;
318 private Object msg;
319
320 private PendingWrite(ObjectPool.Handle<PendingWrite> handle) {
321 this.handle = handle;
322 }
323
324 static PendingWrite newInstance(Object msg, int size, ChannelPromise promise) {
325 PendingWrite write = RECYCLER.get();
326 write.size = size;
327 write.msg = msg;
328 write.promise = promise;
329 return write;
330 }
331
332 private void recycle() {
333 size = 0;
334 next = null;
335 msg = null;
336 promise = null;
337 handle.recycle(this);
338 }
339 }
340 }