1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel;
17
18 import org.jboss.netty.util.internal.ConcurrentHashMap;
19
20 import java.net.SocketAddress;
21 import java.util.Random;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
24
25
26
27
28 public abstract class AbstractChannel implements Channel {
29
30 static final ConcurrentMap<Integer, Channel> allChannels = new ConcurrentHashMap<Integer, Channel>();
31
32 private static final Random random = new Random();
33
34 private static Integer allocateId(Channel channel) {
35 Integer id = random.nextInt();
36 for (;;) {
37
38
39 if (allChannels.putIfAbsent(id, channel) == null) {
40
41 return id;
42 } else {
43
44 id = id.intValue() + 1;
45 }
46 }
47 }
48
49 private final Integer id;
50 private final Channel parent;
51 private final ChannelFactory factory;
52 private final ChannelPipeline pipeline;
53 private final ChannelFuture succeededFuture = new SucceededChannelFuture(this);
54 private final ChannelCloseFuture closeFuture = new ChannelCloseFuture();
55 private volatile int interestOps = OP_READ;
56
57
58 private boolean strValConnected;
59 private String strVal;
60 private volatile Object attachment;
61
62 private static final AtomicIntegerFieldUpdater<AbstractChannel> UNWRITABLE_UPDATER;
63 @SuppressWarnings("UnusedDeclaration")
64 private volatile int unwritable;
65
66 static {
67 UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannel.class, "unwritable");
68 }
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 protected AbstractChannel(
84 Channel parent, ChannelFactory factory,
85 ChannelPipeline pipeline, ChannelSink sink) {
86
87 this.parent = parent;
88 this.factory = factory;
89 this.pipeline = pipeline;
90
91 id = allocateId(this);
92
93 pipeline.attach(this, sink);
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 protected AbstractChannel(
111 Integer id,
112 Channel parent, ChannelFactory factory,
113 ChannelPipeline pipeline, ChannelSink sink) {
114
115 this.id = id;
116 this.parent = parent;
117 this.factory = factory;
118 this.pipeline = pipeline;
119 pipeline.attach(this, sink);
120 }
121
122 public final Integer getId() {
123 return id;
124 }
125
126 public Channel getParent() {
127 return parent;
128 }
129
130 public ChannelFactory getFactory() {
131 return factory;
132 }
133
134 public ChannelPipeline getPipeline() {
135 return pipeline;
136 }
137
138
139
140
141 protected ChannelFuture getSucceededFuture() {
142 return succeededFuture;
143 }
144
145
146
147
148
149 protected ChannelFuture getUnsupportedOperationFuture() {
150 return new FailedChannelFuture(this, new UnsupportedOperationException());
151 }
152
153
154
155
156 @Override
157 public final int hashCode() {
158 return id;
159 }
160
161
162
163
164
165 @Override
166 public final boolean equals(Object o) {
167 return this == o;
168 }
169
170
171
172
173 public final int compareTo(Channel o) {
174 return getId().compareTo(o.getId());
175 }
176
177 public boolean isOpen() {
178 return !closeFuture.isDone();
179 }
180
181
182
183
184
185
186
187
188
189 protected boolean setClosed() {
190
191
192 allChannels.remove(id);
193
194 return closeFuture.setClosed();
195 }
196
197 public ChannelFuture bind(SocketAddress localAddress) {
198 return Channels.bind(this, localAddress);
199 }
200
201 public ChannelFuture unbind() {
202 return Channels.unbind(this);
203 }
204
205 public ChannelFuture close() {
206 ChannelFuture returnedCloseFuture = Channels.close(this);
207 assert closeFuture == returnedCloseFuture;
208 return closeFuture;
209 }
210
211 public ChannelFuture getCloseFuture() {
212 return closeFuture;
213 }
214
215 public ChannelFuture connect(SocketAddress remoteAddress) {
216 return Channels.connect(this, remoteAddress);
217 }
218
219 public ChannelFuture disconnect() {
220 return Channels.disconnect(this);
221 }
222
223 public int getInterestOps() {
224 if (!isOpen()) {
225 return Channel.OP_WRITE;
226 }
227
228 int interestOps = getInternalInterestOps() & ~OP_WRITE;
229 if (!isWritable()) {
230 interestOps |= OP_WRITE;
231 }
232 return interestOps;
233 }
234
235 public ChannelFuture setInterestOps(int interestOps) {
236 return Channels.setInterestOps(this, interestOps);
237 }
238
239 protected int getInternalInterestOps() {
240 return interestOps;
241 }
242
243
244
245
246
247
248 protected void setInternalInterestOps(int interestOps) {
249 this.interestOps = interestOps;
250 }
251
252 public boolean isReadable() {
253 return (getInternalInterestOps() & OP_READ) != 0;
254 }
255
256 public boolean isWritable() {
257 return unwritable == 0;
258 }
259
260 public final boolean getUserDefinedWritability(int index) {
261 return (unwritable & writabilityMask(index)) == 0;
262 }
263
264 public final void setUserDefinedWritability(int index, boolean writable) {
265 if (writable) {
266 setUserDefinedWritability(index);
267 } else {
268 clearUserDefinedWritability(index);
269 }
270 }
271
272 private void setUserDefinedWritability(int index) {
273 final int mask = ~writabilityMask(index);
274 for (;;) {
275 final int oldValue = unwritable;
276 final int newValue = oldValue & mask;
277 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
278 if (oldValue != 0 && newValue == 0) {
279 getPipeline().sendUpstream(
280 new UpstreamChannelStateEvent(
281 this, ChannelState.INTEREST_OPS, getInterestOps()));
282 }
283 break;
284 }
285 }
286 }
287
288 private void clearUserDefinedWritability(int index) {
289 final int mask = writabilityMask(index);
290 for (;;) {
291 final int oldValue = unwritable;
292 final int newValue = oldValue | mask;
293 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
294 if (oldValue == 0 && newValue != 0) {
295 getPipeline().sendUpstream(
296 new UpstreamChannelStateEvent(
297 this, ChannelState.INTEREST_OPS, getInterestOps()));
298 }
299 break;
300 }
301 }
302 }
303
304 private static int writabilityMask(int index) {
305 if (index < 1 || index > 31) {
306 throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
307 }
308 return 1 << index;
309 }
310
311 protected boolean setWritable() {
312 for (;;) {
313 final int oldValue = unwritable;
314 final int newValue = oldValue & ~1;
315 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
316 if (oldValue != 0 && newValue == 0) {
317 return true;
318 }
319 break;
320 }
321 }
322 return false;
323 }
324
325 protected boolean setUnwritable() {
326 for (;;) {
327 final int oldValue = unwritable;
328 final int newValue = oldValue | 1;
329 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
330 if (oldValue == 0 && newValue != 0) {
331 return true;
332 }
333 break;
334 }
335 }
336 return false;
337 }
338
339 public ChannelFuture setReadable(boolean readable) {
340 if (readable) {
341 return setInterestOps(getInterestOps() | OP_READ);
342 } else {
343 return setInterestOps(getInterestOps() & ~OP_READ);
344 }
345 }
346
347 public ChannelFuture write(Object message) {
348 return Channels.write(this, message);
349 }
350
351 public ChannelFuture write(Object message, SocketAddress remoteAddress) {
352 return Channels.write(this, message, remoteAddress);
353 }
354
355 public Object getAttachment() {
356 return attachment;
357 }
358
359 public void setAttachment(Object attachment) {
360 this.attachment = attachment;
361 }
362
363
364
365
366
367
368 @Override
369 public String toString() {
370 boolean connected = isConnected();
371 if (strValConnected == connected && strVal != null) {
372 return strVal;
373 }
374
375 StringBuilder buf = new StringBuilder(128);
376 buf.append("[id: 0x");
377 buf.append(getIdString());
378
379 SocketAddress localAddress = getLocalAddress();
380 SocketAddress remoteAddress = getRemoteAddress();
381 if (remoteAddress != null) {
382 buf.append(", ");
383 if (getParent() == null) {
384 buf.append(localAddress);
385 buf.append(connected? " => " : " :> ");
386 buf.append(remoteAddress);
387 } else {
388 buf.append(remoteAddress);
389 buf.append(connected? " => " : " :> ");
390 buf.append(localAddress);
391 }
392 } else if (localAddress != null) {
393 buf.append(", ");
394 buf.append(localAddress);
395 }
396
397 buf.append(']');
398
399 String strVal = buf.toString();
400 this.strVal = strVal;
401 strValConnected = connected;
402 return strVal;
403 }
404
405 private String getIdString() {
406 String answer = Integer.toHexString(id.intValue());
407 switch (answer.length()) {
408 case 0:
409 answer = "00000000";
410 break;
411 case 1:
412 answer = "0000000" + answer;
413 break;
414 case 2:
415 answer = "000000" + answer;
416 break;
417 case 3:
418 answer = "00000" + answer;
419 break;
420 case 4:
421 answer = "0000" + answer;
422 break;
423 case 5:
424 answer = "000" + answer;
425 break;
426 case 6:
427 answer = "00" + answer;
428 break;
429 case 7:
430 answer = '0' + answer;
431 break;
432 }
433 return answer;
434 }
435
436 private final class ChannelCloseFuture extends DefaultChannelFuture {
437
438 ChannelCloseFuture() {
439 super(AbstractChannel.this, false);
440 }
441
442 @Override
443 public boolean setSuccess() {
444
445 return false;
446 }
447
448 @Override
449 public boolean setFailure(Throwable cause) {
450
451 return false;
452 }
453
454 boolean setClosed() {
455 return super.setSuccess();
456 }
457 }
458 }