1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.execution;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelEvent;
20 import org.jboss.netty.channel.ChannelFuture;
21 import org.jboss.netty.channel.ChannelFutureListener;
22 import org.jboss.netty.util.ObjectSizeEstimator;
23
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.RejectedExecutionException;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.TimeUnit;
29
30 /**
31 * {@link Executor} which should be used for downstream {@link ChannelEvent}'s. This implementation
32 * will take care of preserve the order of the events in a {@link Channel}. If you don't need to
33 * preserve the order just use one of the {@link Executor} implementations provided by the static
34 * methods of {@link Executors}.
35 * <br>
36 * <br>
37 * For more informations about how the order is preserved see {@link OrderedMemoryAwareThreadPoolExecutor}
38 */
39 public final class OrderedDownstreamThreadPoolExecutor extends OrderedMemoryAwareThreadPoolExecutor {
40
41 /**
42 * Creates a new instance.
43 *
44 * @param corePoolSize the maximum number of active threads
45 */
46 public OrderedDownstreamThreadPoolExecutor(int corePoolSize) {
47 super(corePoolSize, 0L, 0L);
48 }
49
50 /**
51 * Creates a new instance.
52 *
53 * @param corePoolSize the maximum number of active threads
54 * @param keepAliveTime the amount of time for an inactive thread to shut itself down
55 * @param unit the {@link TimeUnit} of {@code keepAliveTime}
56 */
57 public OrderedDownstreamThreadPoolExecutor(
58 int corePoolSize, long keepAliveTime, TimeUnit unit) {
59 super(corePoolSize, 0L, 0L, keepAliveTime, unit);
60 }
61
62 /**
63 * Creates a new instance.
64 *
65 * @param corePoolSize the maximum number of active threads
66 * @param keepAliveTime the amount of time for an inactive thread to shut itself down
67 * @param unit the {@link TimeUnit} of {@code keepAliveTime}
68 * @param threadFactory the {@link ThreadFactory} of this pool
69 */
70 public OrderedDownstreamThreadPoolExecutor(
71 int corePoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
72 super(corePoolSize, 0L, 0L,
73 keepAliveTime, unit, threadFactory);
74 }
75
76 /**
77 * Return {@code null}
78 */
79 @Override
80 public ObjectSizeEstimator getObjectSizeEstimator() {
81 return null;
82 }
83
84 /**
85 * Throws {@link UnsupportedOperationException} as there is not support for limit the memory
86 * size in this implementation
87 */
88 @Override
89 public void setObjectSizeEstimator(ObjectSizeEstimator objectSizeEstimator) {
90 throw new UnsupportedOperationException("Not supported by this implementation");
91 }
92
93 /**
94 * Returns {@code 0L}
95 */
96 @Override
97 public long getMaxChannelMemorySize() {
98 return 0L;
99 }
100
101 /**
102 * Throws {@link UnsupportedOperationException} as there is not support for limit the memory
103 * size in this implementation
104 */
105 @Override
106 public void setMaxChannelMemorySize(long maxChannelMemorySize) {
107 throw new UnsupportedOperationException("Not supported by this implementation");
108 }
109
110 /**
111 * Returns {@code 0L}
112 */
113 @Override
114 public long getMaxTotalMemorySize() {
115 return 0L;
116 }
117
118 /**
119 * Return {@code false} as we not need to cound the memory in this implementation
120 */
121 @Override
122 protected boolean shouldCount(Runnable task) {
123 return false;
124 }
125
126 @Override
127 public void execute(Runnable command) {
128
129 // check if the Runnable was of an unsupported type
130 if (command instanceof ChannelUpstreamEventRunnable) {
131 throw new RejectedExecutionException("command must be enclosed with an downstream event.");
132 }
133 doExecute(command);
134 }
135
136 @Override
137 protected Executor getChildExecutor(ChannelEvent e) {
138 final Object key = getChildExecutorKey(e);
139 Executor executor = childExecutors.get(key);
140 if (executor == null) {
141 executor = new ChildExecutor();
142 Executor oldExecutor = childExecutors.putIfAbsent(key, executor);
143 if (oldExecutor != null) {
144 executor = oldExecutor;
145 } else {
146 // register a listener so that the ChildExecutor will get removed once the channel was closed
147 e.getChannel().getCloseFuture().addListener(new ChannelFutureListener() {
148
149 public void operationComplete(ChannelFuture future) throws Exception {
150 removeChildExecutor(key);
151 }
152 });
153 }
154 }
155
156 return executor;
157 }
158 }