1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.util.internal.ObjectUtil;
20
21 import java.util.IdentityHashMap;
22 import java.util.Map;
23 import java.util.Map.Entry;
24 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
25 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
26
27 import static io.netty.channel.ChannelOption.ALLOCATOR;
28 import static io.netty.channel.ChannelOption.AUTO_CLOSE;
29 import static io.netty.channel.ChannelOption.AUTO_READ;
30 import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
31 import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ;
32 import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_WRITE;
33 import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR;
34 import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR;
35 import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP;
36 import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK;
37 import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK;
38 import static io.netty.channel.ChannelOption.WRITE_BUFFER_WATER_MARK;
39 import static io.netty.channel.ChannelOption.WRITE_SPIN_COUNT;
40 import static io.netty.util.internal.ObjectUtil.checkNotNull;
41 import static io.netty.util.internal.ObjectUtil.checkPositive;
42 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
43
44
45
46
47 public class DefaultChannelConfig implements ChannelConfig {
48 private static final MessageSizeEstimator DEFAULT_MSG_SIZE_ESTIMATOR = DefaultMessageSizeEstimator.DEFAULT;
49
50 private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
51
52 private static final AtomicIntegerFieldUpdater<DefaultChannelConfig> AUTOREAD_UPDATER =
53 AtomicIntegerFieldUpdater.newUpdater(DefaultChannelConfig.class, "autoRead");
54 private static final AtomicReferenceFieldUpdater<DefaultChannelConfig, WriteBufferWaterMark> WATERMARK_UPDATER =
55 AtomicReferenceFieldUpdater.newUpdater(
56 DefaultChannelConfig.class, WriteBufferWaterMark.class, "writeBufferWaterMark");
57
58 protected final Channel channel;
59
60 private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
61 private volatile RecvByteBufAllocator rcvBufAllocator;
62 private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
63
64 private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
65 private volatile int writeSpinCount = 16;
66 private volatile int maxMessagesPerWrite = Integer.MAX_VALUE;
67
68 @SuppressWarnings("FieldMayBeFinal")
69 private volatile int autoRead = 1;
70 private volatile boolean autoClose = true;
71 private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
72 private volatile boolean pinEventExecutor = true;
73
74 public DefaultChannelConfig(Channel channel) {
75 this(channel, new AdaptiveRecvByteBufAllocator());
76 }
77
78 protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
79 setRecvByteBufAllocator(allocator, channel.metadata());
80 this.channel = channel;
81 }
82
83 @Override
84 @SuppressWarnings("deprecation")
85 public Map<ChannelOption<?>, Object> getOptions() {
86 return getOptions(
87 null,
88 CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT,
89 ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK,
90 WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR,
91 SINGLE_EVENTEXECUTOR_PER_GROUP, MAX_MESSAGES_PER_WRITE);
92 }
93
94 protected Map<ChannelOption<?>, Object> getOptions(
95 Map<ChannelOption<?>, Object> result, ChannelOption<?>... options) {
96 if (result == null) {
97 result = new IdentityHashMap<ChannelOption<?>, Object>();
98 }
99 for (ChannelOption<?> o: options) {
100 result.put(o, getOption(o));
101 }
102 return result;
103 }
104
105 @SuppressWarnings("unchecked")
106 @Override
107 public boolean setOptions(Map<ChannelOption<?>, ?> options) {
108 ObjectUtil.checkNotNull(options, "options");
109
110 boolean setAllOptions = true;
111 for (Entry<ChannelOption<?>, ?> e: options.entrySet()) {
112 if (!setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
113 setAllOptions = false;
114 }
115 }
116
117 return setAllOptions;
118 }
119
120 @Override
121 @SuppressWarnings({ "unchecked", "deprecation" })
122 public <T> T getOption(ChannelOption<T> option) {
123 ObjectUtil.checkNotNull(option, "option");
124
125 if (option == CONNECT_TIMEOUT_MILLIS) {
126 return (T) Integer.valueOf(getConnectTimeoutMillis());
127 }
128 if (option == MAX_MESSAGES_PER_READ) {
129 return (T) Integer.valueOf(getMaxMessagesPerRead());
130 }
131 if (option == WRITE_SPIN_COUNT) {
132 return (T) Integer.valueOf(getWriteSpinCount());
133 }
134 if (option == ALLOCATOR) {
135 return (T) getAllocator();
136 }
137 if (option == RCVBUF_ALLOCATOR) {
138 return (T) getRecvByteBufAllocator();
139 }
140 if (option == AUTO_READ) {
141 return (T) Boolean.valueOf(isAutoRead());
142 }
143 if (option == AUTO_CLOSE) {
144 return (T) Boolean.valueOf(isAutoClose());
145 }
146 if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
147 return (T) Integer.valueOf(getWriteBufferHighWaterMark());
148 }
149 if (option == WRITE_BUFFER_LOW_WATER_MARK) {
150 return (T) Integer.valueOf(getWriteBufferLowWaterMark());
151 }
152 if (option == WRITE_BUFFER_WATER_MARK) {
153 return (T) getWriteBufferWaterMark();
154 }
155 if (option == MESSAGE_SIZE_ESTIMATOR) {
156 return (T) getMessageSizeEstimator();
157 }
158 if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
159 return (T) Boolean.valueOf(getPinEventExecutorPerGroup());
160 }
161 if (option == MAX_MESSAGES_PER_WRITE) {
162 return (T) Integer.valueOf(getMaxMessagesPerWrite());
163 }
164 return null;
165 }
166
167 @Override
168 @SuppressWarnings("deprecation")
169 public <T> boolean setOption(ChannelOption<T> option, T value) {
170 validate(option, value);
171
172 if (option == CONNECT_TIMEOUT_MILLIS) {
173 setConnectTimeoutMillis((Integer) value);
174 } else if (option == MAX_MESSAGES_PER_READ) {
175 setMaxMessagesPerRead((Integer) value);
176 } else if (option == WRITE_SPIN_COUNT) {
177 setWriteSpinCount((Integer) value);
178 } else if (option == ALLOCATOR) {
179 setAllocator((ByteBufAllocator) value);
180 } else if (option == RCVBUF_ALLOCATOR) {
181 setRecvByteBufAllocator((RecvByteBufAllocator) value);
182 } else if (option == AUTO_READ) {
183 setAutoRead((Boolean) value);
184 } else if (option == AUTO_CLOSE) {
185 setAutoClose((Boolean) value);
186 } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
187 setWriteBufferHighWaterMark((Integer) value);
188 } else if (option == WRITE_BUFFER_LOW_WATER_MARK) {
189 setWriteBufferLowWaterMark((Integer) value);
190 } else if (option == WRITE_BUFFER_WATER_MARK) {
191 setWriteBufferWaterMark((WriteBufferWaterMark) value);
192 } else if (option == MESSAGE_SIZE_ESTIMATOR) {
193 setMessageSizeEstimator((MessageSizeEstimator) value);
194 } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
195 setPinEventExecutorPerGroup((Boolean) value);
196 } else if (option == MAX_MESSAGES_PER_WRITE) {
197 setMaxMessagesPerWrite((Integer) value);
198 } else {
199 return false;
200 }
201
202 return true;
203 }
204
205 protected <T> void validate(ChannelOption<T> option, T value) {
206 ObjectUtil.checkNotNull(option, "option").validate(value);
207 }
208
209 @Override
210 public int getConnectTimeoutMillis() {
211 return connectTimeoutMillis;
212 }
213
214 @Override
215 public ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
216 checkPositiveOrZero(connectTimeoutMillis, "connectTimeoutMillis");
217 this.connectTimeoutMillis = connectTimeoutMillis;
218 return this;
219 }
220
221
222
223
224
225
226
227 @Override
228 @Deprecated
229 public int getMaxMessagesPerRead() {
230 try {
231 MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator();
232 return allocator.maxMessagesPerRead();
233 } catch (ClassCastException e) {
234 throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " +
235 "MaxMessagesRecvByteBufAllocator", e);
236 }
237 }
238
239
240
241
242
243
244
245 @Override
246 @Deprecated
247 public ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
248 try {
249 MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator();
250 allocator.maxMessagesPerRead(maxMessagesPerRead);
251 return this;
252 } catch (ClassCastException e) {
253 throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " +
254 "MaxMessagesRecvByteBufAllocator", e);
255 }
256 }
257
258
259
260
261
262 public int getMaxMessagesPerWrite() {
263 return maxMessagesPerWrite;
264 }
265
266
267
268
269
270 public ChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) {
271 this.maxMessagesPerWrite = ObjectUtil.checkPositive(maxMessagesPerWrite, "maxMessagesPerWrite");
272 return this;
273 }
274
275 @Override
276 public int getWriteSpinCount() {
277 return writeSpinCount;
278 }
279
280 @Override
281 public ChannelConfig setWriteSpinCount(int writeSpinCount) {
282 checkPositive(writeSpinCount, "writeSpinCount");
283
284
285
286
287 if (writeSpinCount == Integer.MAX_VALUE) {
288 --writeSpinCount;
289 }
290 this.writeSpinCount = writeSpinCount;
291 return this;
292 }
293
294 @Override
295 public ByteBufAllocator getAllocator() {
296 return allocator;
297 }
298
299 @Override
300 public ChannelConfig setAllocator(ByteBufAllocator allocator) {
301 this.allocator = ObjectUtil.checkNotNull(allocator, "allocator");
302 return this;
303 }
304
305 @SuppressWarnings("unchecked")
306 @Override
307 public <T extends RecvByteBufAllocator> T getRecvByteBufAllocator() {
308 return (T) rcvBufAllocator;
309 }
310
311 @Override
312 public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
313 rcvBufAllocator = checkNotNull(allocator, "allocator");
314 return this;
315 }
316
317
318
319
320
321
322
323 private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
324 checkNotNull(allocator, "allocator");
325 checkNotNull(metadata, "metadata");
326 if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
327 ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
328 }
329 setRecvByteBufAllocator(allocator);
330 }
331
332 @Override
333 public boolean isAutoRead() {
334 return autoRead == 1;
335 }
336
337 @Override
338 public ChannelConfig setAutoRead(boolean autoRead) {
339 boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
340 if (autoRead && !oldAutoRead) {
341 channel.read();
342 } else if (!autoRead && oldAutoRead) {
343 autoReadCleared();
344 }
345 return this;
346 }
347
348
349
350
351
352 protected void autoReadCleared() { }
353
354 @Override
355 public boolean isAutoClose() {
356 return autoClose;
357 }
358
359 @Override
360 public ChannelConfig setAutoClose(boolean autoClose) {
361 this.autoClose = autoClose;
362 return this;
363 }
364
365 @Override
366 public int getWriteBufferHighWaterMark() {
367 return writeBufferWaterMark.high();
368 }
369
370 @Override
371 public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
372 checkPositiveOrZero(writeBufferHighWaterMark, "writeBufferHighWaterMark");
373 for (;;) {
374 WriteBufferWaterMark waterMark = writeBufferWaterMark;
375 if (writeBufferHighWaterMark < waterMark.low()) {
376 throw new IllegalArgumentException(
377 "writeBufferHighWaterMark cannot be less than " +
378 "writeBufferLowWaterMark (" + waterMark.low() + "): " +
379 writeBufferHighWaterMark);
380 }
381 if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
382 new WriteBufferWaterMark(waterMark.low(), writeBufferHighWaterMark, false))) {
383 return this;
384 }
385 }
386 }
387
388 @Override
389 public int getWriteBufferLowWaterMark() {
390 return writeBufferWaterMark.low();
391 }
392
393 @Override
394 public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
395 checkPositiveOrZero(writeBufferLowWaterMark, "writeBufferLowWaterMark");
396 for (;;) {
397 WriteBufferWaterMark waterMark = writeBufferWaterMark;
398 if (writeBufferLowWaterMark > waterMark.high()) {
399 throw new IllegalArgumentException(
400 "writeBufferLowWaterMark cannot be greater than " +
401 "writeBufferHighWaterMark (" + waterMark.high() + "): " +
402 writeBufferLowWaterMark);
403 }
404 if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
405 new WriteBufferWaterMark(writeBufferLowWaterMark, waterMark.high(), false))) {
406 return this;
407 }
408 }
409 }
410
411 @Override
412 public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
413 this.writeBufferWaterMark = checkNotNull(writeBufferWaterMark, "writeBufferWaterMark");
414 return this;
415 }
416
417 @Override
418 public WriteBufferWaterMark getWriteBufferWaterMark() {
419 return writeBufferWaterMark;
420 }
421
422 @Override
423 public MessageSizeEstimator getMessageSizeEstimator() {
424 return msgSizeEstimator;
425 }
426
427 @Override
428 public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
429 this.msgSizeEstimator = ObjectUtil.checkNotNull(estimator, "estimator");
430 return this;
431 }
432
433 private ChannelConfig setPinEventExecutorPerGroup(boolean pinEventExecutor) {
434 this.pinEventExecutor = pinEventExecutor;
435 return this;
436 }
437
438 private boolean getPinEventExecutorPerGroup() {
439 return pinEventExecutor;
440 }
441
442 }