1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *
19 */
20 package org.apache.mina.core.polling;
21
22 import java.io.IOException;
23 import java.net.PortUnreachableException;
24 import java.nio.channels.ClosedSelectorException;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.mina.core.buffer.IoBuffer;
37 import org.apache.mina.core.file.FileRegion;
38 import org.apache.mina.core.filterchain.IoFilterChain;
39 import org.apache.mina.core.filterchain.IoFilterChainBuilder;
40 import org.apache.mina.core.future.DefaultIoFuture;
41 import org.apache.mina.core.service.AbstractIoService;
42 import org.apache.mina.core.service.IoProcessor;
43 import org.apache.mina.core.service.IoServiceListenerSupport;
44 import org.apache.mina.core.session.AbstractIoSession;
45 import org.apache.mina.core.session.IoSession;
46 import org.apache.mina.core.session.IoSessionConfig;
47 import org.apache.mina.core.session.SessionState;
48 import org.apache.mina.core.write.WriteRequest;
49 import org.apache.mina.core.write.WriteRequestQueue;
50 import org.apache.mina.core.write.WriteToClosedSessionException;
51 import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
52 import org.apache.mina.util.ExceptionMonitor;
53 import org.apache.mina.util.NamePreservingRunnable;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 /**
58 * An abstract implementation of {@link IoProcessor} which helps transport
59 * developers to write an {@link IoProcessor} easily. This class is in charge of
60 * active polling a set of {@link IoSession} and trigger events when some I/O
61 * operation is possible.
62 *
63 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
64 *
65 * @param <S>
66 * the type of the {@link IoSession} this processor can handle
67 */
68 public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
69 /** A logger for this class */
70 private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
71
72 /**
73 * A timeout used for the select, as we need to get out to deal with idle
74 * sessions
75 */
76 private static final long SELECT_TIMEOUT = 1000L;
77
78 /** A map containing the last Thread ID for each class */
79 private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>();
80
81 /** This IoProcessor instance name */
82 private final String threadName;
83
84 /** The executor to use when we need to start the inner Processor */
85 private final Executor executor;
86
87 /** A Session queue containing the newly created sessions */
88 private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
89
90 /** A queue used to store the sessions to be removed */
91 private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();
92
93 /** A queue used to store the sessions to be flushed */
94 private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
95
96 /**
97 * A queue used to store the sessions which have a trafficControl to be
98 * updated
99 */
100 private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
101
102 /** The processor thread : it handles the incoming messages */
103 private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
104
105 private long lastIdleCheckTime;
106
107 private final Object disposalLock = new Object();
108
109 private volatile boolean disposing;
110
111 private volatile boolean disposed;
112
113 private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
114
115 protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
116
117 /**
118 * Create an {@link AbstractPollingIoProcessor} with the given
119 * {@link Executor} for handling I/Os events.
120 *
121 * @param executor
122 * the {@link Executor} for handling I/O events
123 */
124 protected AbstractPollingIoProcessor(Executor executor) {
125 if (executor == null) {
126 throw new IllegalArgumentException("executor");
127 }
128
129 this.threadName = nextThreadName();
130 this.executor = executor;
131 }
132
133 /**
134 * Compute the thread ID for this class instance. As we may have different
135 * classes, we store the last ID number into a Map associating the class
136 * name to the last assigned ID.
137 *
138 * @return a name for the current thread, based on the class name and an
139 * incremental value, starting at 1.
140 */
141 private String nextThreadName() {
142 Class<?> cls = getClass();
143 int newThreadId;
144
145 AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1));
146
147 if (threadId == null) {
148 newThreadId = 1;
149 } else {
150 // Just increment the last ID, and get it.
151 newThreadId = threadId.incrementAndGet();
152 }
153
154 // Now we can compute the name for this thread
155 return cls.getSimpleName() + '-' + newThreadId;
156 }
157
158 /**
159 * {@inheritDoc}
160 */
161 public final boolean isDisposing() {
162 return disposing;
163 }
164
165 /**
166 * {@inheritDoc}
167 */
168 public final boolean isDisposed() {
169 return disposed;
170 }
171
172 /**
173 * {@inheritDoc}
174 */
175 public final void dispose() {
176 if (disposed || disposing) {
177 return;
178 }
179
180 synchronized (disposalLock) {
181 disposing = true;
182 startupProcessor();
183 }
184
185 disposalFuture.awaitUninterruptibly();
186 disposed = true;
187 }
188
189 /**
190 * Dispose the resources used by this {@link IoProcessor} for polling the
191 * client connections. The implementing class doDispose method will be
192 * called.
193 *
194 * @throws Exception
195 * if some low level IO error occurs
196 */
197 protected abstract void doDispose() throws Exception;
198
199 /**
200 * poll those sessions for the given timeout
201 *
202 * @param timeout
203 * milliseconds before the call timeout if no event appear
204 * @return The number of session ready for read or for write
205 * @throws Exception
206 * if some low level IO error occurs
207 */
208 protected abstract int select(long timeout) throws Exception;
209
210 /**
211 * poll those sessions forever
212 *
213 * @return The number of session ready for read or for write
214 * @throws Exception
215 * if some low level IO error occurs
216 */
217 protected abstract int select() throws Exception;
218
219 /**
220 * Say if the list of {@link IoSession} polled by this {@link IoProcessor}
221 * is empty
222 *
223 * @return <tt>true</tt> if at least a session is managed by this {@link IoProcessor}
224 */
225 protected abstract boolean isSelectorEmpty();
226
227 /**
228 * Interrupt the {@link #select(long)} call.
229 */
230 protected abstract void wakeup();
231
232 /**
233 * Get an {@link Iterator} for the list of {@link IoSession} polled by this
234 * {@link IoProcessor}
235 *
236 * @return {@link Iterator} of {@link IoSession}
237 */
238 protected abstract Iterator<S> allSessions();
239
240 /**
241 * Get an {@link Iterator} for the list of {@link IoSession} found selected
242 * by the last call of {@link #select(long)}
243 *
244 * @return {@link Iterator} of {@link IoSession} read for I/Os operation
245 */
246 protected abstract Iterator<S> selectedSessions();
247
248 /**
249 * Get the state of a session (One of OPENING, OPEN, CLOSING)
250 *
251 * @param session the {@link IoSession} to inspect
252 * @return the state of the session
253 */
254 protected abstract SessionState getState(S session);
255
256 /**
257 * Tells if the session ready for writing
258 *
259 * @param session the queried session
260 * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
261 */
262 protected abstract boolean isWritable(S session);
263
264 /**
265 * Tells if the session ready for reading
266 *
267 * @param session the queried session
268 * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
269 */
270 protected abstract boolean isReadable(S session);
271
272 /**
273 * Set the session to be informed when a write event should be processed
274 *
275 * @param session the session for which we want to be interested in write events
276 * @param isInterested <tt>true</tt> for registering, <tt>false</tt> for removing
277 * @throws Exception If there was a problem while registering the session
278 */
279 protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
280
281 /**
282 * Set the session to be informed when a read event should be processed
283 *
284 * @param session the session for which we want to be interested in read events
285 * @param isInterested <tt>true</tt> for registering, <tt>false</tt> for removing
286 * @throws Exception If there was a problem while registering the session
287 */
288 protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
289
290 /**
291 * Tells if this session is registered for reading
292 *
293 * @param session the queried session
294 * @return <tt>true</tt> is registered for reading
295 */
296 protected abstract boolean isInterestedInRead(S session);
297
298 /**
299 * Tells if this session is registered for writing
300 *
301 * @param session the queried session
302 * @return <tt>true</tt> is registered for writing
303 */
304 protected abstract boolean isInterestedInWrite(S session);
305
306 /**
307 * Initialize the polling of a session. Add it to the polling process.
308 *
309 * @param session the {@link IoSession} to add to the polling
310 * @throws Exception any exception thrown by the underlying system calls
311 */
312 protected abstract void init(S session) throws Exception;
313
314 /**
315 * Destroy the underlying client socket handle
316 *
317 * @param session the {@link IoSession}
318 * @throws Exception any exception thrown by the underlying system calls
319 */
320 protected abstract void destroy(S session) throws Exception;
321
322 /**
323 * Reads a sequence of bytes from a {@link IoSession} into the given
324 * {@link IoBuffer}. Is called when the session was found ready for reading.
325 *
326 * @param session the session to read
327 * @param buf the buffer to fill
328 * @return the number of bytes read
329 * @throws Exception any exception thrown by the underlying system calls
330 */
331 protected abstract int read(S session, IoBuffer buf) throws Exception;
332
333 /**
334 * Write a sequence of bytes to a {@link IoSession}, means to be called when
335 * a session was found ready for writing.
336 *
337 * @param session the session to write
338 * @param buf the buffer to write
339 * @param length the number of bytes to write can be superior to the number of
340 * bytes remaining in the buffer
341 * @return the number of byte written
342 * @throws Exception any exception thrown by the underlying system calls
343 */
344 protected abstract int write(S session, IoBuffer buf, int length) throws Exception;
345
346 /**
347 * Write a part of a file to a {@link IoSession}, if the underlying API
348 * isn't supporting system calls like sendfile(), you can throw a
349 * {@link UnsupportedOperationException} so the file will be send using
350 * usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
351 *
352 * @param session the session to write
353 * @param region the file region to write
354 * @param length the length of the portion to send
355 * @return the number of written bytes
356 * @throws Exception any exception thrown by the underlying system calls
357 */
358 protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
359
360 /**
361 * {@inheritDoc}
362 */
363 public final void add(S session) {
364 if (disposed || disposing) {
365 throw new IllegalStateException("Already disposed.");
366 }
367
368 // Adds the session to the newSession queue and starts the worker
369 newSessions.add(session);
370 startupProcessor();
371 }
372
373 /**
374 * {@inheritDoc}
375 */
376 public final void remove(S session) {
377 scheduleRemove(session);
378 startupProcessor();
379 }
380
381 private void scheduleRemove(S session) {
382 if (!removingSessions.contains(session)) {
383 removingSessions.add(session);
384 }
385 }
386
387 /**
388 * {@inheritDoc}
389 */
390 public void write(S session, WriteRequest writeRequest) {
391 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
392
393 writeRequestQueue.offer(session, writeRequest);
394
395 if (!session.isWriteSuspended()) {
396 this.flush(session);
397 }
398 }
399
400 /**
401 * {@inheritDoc}
402 */
403 public final void flush(S session) {
404 // add the session to the queue if it's not already
405 // in the queue, then wake up the select()
406 if (session.setScheduledForFlush(true)) {
407 flushingSessions.add(session);
408 wakeup();
409 }
410 }
411
412 private void scheduleFlush(S session) {
413 // add the session to the queue if it's not already
414 // in the queue
415 if (session.setScheduledForFlush(true)) {
416 flushingSessions.add(session);
417 }
418 }
419
420 /**
421 * Updates the traffic mask for a given session
422 *
423 * @param session the session to update
424 */
425 public final void updateTrafficMask(S session) {
426 trafficControllingSessions.add(session);
427 wakeup();
428 }
429
430 /**
431 * Starts the inner Processor, asking the executor to pick a thread in its
432 * pool. The Runnable will be renamed
433 */
434 private void startupProcessor() {
435 Processor processor = processorRef.get();
436
437 if (processor == null) {
438 processor = new Processor();
439
440 if (processorRef.compareAndSet(null, processor)) {
441 executor.execute(new NamePreservingRunnable(processor, threadName));
442 }
443 }
444
445 // Just stop the select() and start it again, so that the processor
446 // can be activated immediately.
447 wakeup();
448 }
449
450 /**
451 * In the case we are using the java select() method, this method is used to
452 * trash the buggy selector and create a new one, registring all the sockets
453 * on it.
454 *
455 * @throws IOException If we got an exception
456 */
457 abstract protected void registerNewSelector() throws IOException;
458
459 /**
460 * Check that the select() has not exited immediately just because of a
461 * broken connection. In this case, this is a standard case, and we just
462 * have to loop.
463 *
464 * @return <tt>true</tt> if a connection has been brutally closed.
465 * @throws IOException If we got an exception
466 */
467 abstract protected boolean isBrokenConnection() throws IOException;
468
469 /**
470 * Loops over the new sessions blocking queue and returns the number of
471 * sessions which are effectively created
472 *
473 * @return The number of new sessions
474 */
475 private int handleNewSessions() {
476 int addedSessions = 0;
477
478 for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
479 if (addNow(session)) {
480 // A new session has been created
481 addedSessions++;
482 }
483 }
484
485 return addedSessions;
486 }
487
488 /**
489 * Process a new session : - initialize it - create its chain - fire the
490 * CREATED listeners if any
491 *
492 * @param session The session to create
493 * @return <tt>true</tt> if the session has been registered
494 */
495 private boolean addNow(S session) {
496 boolean registered = false;
497
498 try {
499 init(session);
500 registered = true;
501
502 // Build the filter chain of this session.
503 IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
504 chainBuilder.buildFilterChain(session.getFilterChain());
505
506 // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
507 // in AbstractIoFilterChain.fireSessionOpened().
508 // Propagate the SESSION_CREATED event up to the chain
509 IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
510 listeners.fireSessionCreated(session);
511 } catch (Exception e) {
512 ExceptionMonitor.getInstance().exceptionCaught(e);
513
514 try {
515 destroy(session);
516 } catch (Exception e1) {
517 ExceptionMonitor.getInstance().exceptionCaught(e1);
518 } finally {
519 registered = false;
520 }
521 }
522
523 return registered;
524 }
525
526 private int removeSessions() {
527 int removedSessions = 0;
528
529 for (S session = removingSessions.poll(); session != null;session = removingSessions.poll()) {
530 SessionState state = getState(session);
531
532 // Now deal with the removal accordingly to the session's state
533 switch (state) {
534 case OPENED:
535 // Try to remove this session
536 if (removeNow(session)) {
537 removedSessions++;
538 }
539
540 break;
541
542 case CLOSING:
543 // Skip if channel is already closed
544 // In any case, remove the session from the queue
545 removedSessions++;
546 break;
547
548 case OPENING:
549 // Remove session from the newSessions queue and
550 // remove it
551 newSessions.remove(session);
552
553 if (removeNow(session)) {
554 removedSessions++;
555 }
556
557 break;
558
559 default:
560 throw new IllegalStateException(String.valueOf(state));
561 }
562 }
563
564 return removedSessions;
565 }
566
567 private boolean removeNow(S session) {
568 clearWriteRequestQueue(session);
569
570 try {
571 destroy(session);
572 return true;
573 } catch (Exception e) {
574 IoFilterChain filterChain = session.getFilterChain();
575 filterChain.fireExceptionCaught(e);
576 } finally {
577 try {
578 clearWriteRequestQueue(session);
579 ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
580 } catch (Exception e) {
581 // The session was either destroyed or not at this point.
582 // We do not want any exception thrown from this "cleanup" code to change
583 // the return value by bubbling up.
584 IoFilterChain filterChain = session.getFilterChain();
585 filterChain.fireExceptionCaught(e);
586 }
587 }
588
589 return false;
590 }
591
592 private void clearWriteRequestQueue(S session) {
593 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
594 WriteRequest req;
595
596 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
597
598 if ((req = writeRequestQueue.poll(session)) != null) {
599 Object message = req.getMessage();
600
601 if (message instanceof IoBuffer) {
602 IoBuffer buf = (IoBuffer) message;
603
604 // The first unwritten empty buffer must be
605 // forwarded to the filter chain.
606 if (buf.hasRemaining()) {
607 buf.reset();
608 failedRequests.add(req);
609 } else {
610 IoFilterChain filterChain = session.getFilterChain();
611 filterChain.fireMessageSent(req);
612 }
613 } else {
614 failedRequests.add(req);
615 }
616
617 // Discard others.
618 while ((req = writeRequestQueue.poll(session)) != null) {
619 failedRequests.add(req);
620 }
621 }
622
623 // Create an exception and notify.
624 if (!failedRequests.isEmpty()) {
625 WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
626
627 for (WriteRequest r : failedRequests) {
628 session.decreaseScheduledBytesAndMessages(r);
629 r.getFuture().setException(cause);
630 }
631
632 IoFilterChain filterChain = session.getFilterChain();
633 filterChain.fireExceptionCaught(cause);
634 }
635 }
636
637 private void process() throws Exception {
638 for (Iterator<S> i = selectedSessions(); i.hasNext();) {
639 S session = i.next();
640 process(session);
641 i.remove();
642 }
643 }
644
645 /**
646 * Deal with session ready for the read or write operations, or both.
647 */
648 private void process(S session) {
649 // Process Reads
650 if (isReadable(session) && !session.isReadSuspended()) {
651 read(session);
652 }
653
654 // Process writes
655 if (isWritable(session) && !session.isWriteSuspended()) {
656 // add the session to the queue, if it's not already there
657 if (session.setScheduledForFlush(true)) {
658 flushingSessions.add(session);
659 }
660 }
661 }
662
663 private void read(S session) {
664 IoSessionConfig config = session.getConfig();
665 int bufferSize = config.getReadBufferSize();
666 IoBuffer buf = IoBuffer.allocate(bufferSize);
667
668 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
669
670 try {
671 int readBytes = 0;
672 int ret;
673
674 try {
675 if (hasFragmentation) {
676
677 while ((ret = read(session, buf)) > 0) {
678 readBytes += ret;
679
680 if (!buf.hasRemaining()) {
681 break;
682 }
683 }
684 } else {
685 ret = read(session, buf);
686
687 if (ret > 0) {
688 readBytes = ret;
689 }
690 }
691 } finally {
692 buf.flip();
693 }
694
695 if (readBytes > 0) {
696 IoFilterChain filterChain = session.getFilterChain();
697 filterChain.fireMessageReceived(buf);
698 buf = null;
699
700 if (hasFragmentation) {
701 if (readBytes << 1 < config.getReadBufferSize()) {
702 session.decreaseReadBufferSize();
703 } else if (readBytes == config.getReadBufferSize()) {
704 session.increaseReadBufferSize();
705 }
706 }
707 }
708
709 if (ret < 0) {
710 // scheduleRemove(session);
711 IoFilterChain filterChain = session.getFilterChain();
712 filterChain.fireInputClosed();
713 }
714 } catch (Exception e) {
715 if (e instanceof IOException) {
716 if (!(e instanceof PortUnreachableException)
717 || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
718 || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
719 scheduleRemove(session);
720 }
721 }
722
723 IoFilterChain filterChain = session.getFilterChain();
724 filterChain.fireExceptionCaught(e);
725 }
726 }
727
728 private void notifyIdleSessions(long currentTime) throws Exception {
729 // process idle sessions
730 if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
731 lastIdleCheckTime = currentTime;
732 AbstractIoSession.notifyIdleness(allSessions(), currentTime);
733 }
734 }
735
736 /**
737 * Write all the pending messages
738 */
739 private void flush(long currentTime) {
740 if (flushingSessions.isEmpty()) {
741 return;
742 }
743
744 do {
745 S session = flushingSessions.poll(); // the same one with
746 // firstSession
747
748 if (session == null) {
749 // Just in case ... It should not happen.
750 break;
751 }
752
753 // Reset the Schedule for flush flag for this session,
754 // as we are flushing it now
755 session.unscheduledForFlush();
756
757 SessionState state = getState(session);
758
759 switch (state) {
760 case OPENED:
761 try {
762 boolean flushedAll = flushNow(session, currentTime);
763
764 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
765 && !session.isScheduledForFlush()) {
766 scheduleFlush(session);
767 }
768 } catch (Exception e) {
769 scheduleRemove(session);
770 session.close(true);
771 IoFilterChain filterChain = session.getFilterChain();
772 filterChain.fireExceptionCaught(e);
773 }
774
775 break;
776
777 case CLOSING:
778 // Skip if the channel is already closed.
779 break;
780
781 case OPENING:
782 // Retry later if session is not yet fully initialized.
783 // (In case that Session.write() is called before addSession()
784 // is processed)
785 scheduleFlush(session);
786 return;
787
788 default:
789 throw new IllegalStateException(String.valueOf(state));
790 }
791
792 } while (!flushingSessions.isEmpty());
793 }
794
795 private boolean flushNow(S session, long currentTime) {
796 if (!session.isConnected()) {
797 scheduleRemove(session);
798 return false;
799 }
800
801 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
802
803 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
804
805 // Set limitation for the number of written bytes for read-write
806 // fairness. I used maxReadBufferSize * 3 / 2, which yields best
807 // performance in my experience while not breaking fairness much.
808 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
809 + (session.getConfig().getMaxReadBufferSize() >>> 1);
810 int writtenBytes = 0;
811 WriteRequest req = null;
812
813 try {
814 // Clear OP_WRITE
815 setInterestedInWrite(session, false);
816
817 do {
818 // Check for pending writes.
819 req = session.getCurrentWriteRequest();
820
821 if (req == null) {
822 req = writeRequestQueue.poll(session);
823
824 if (req == null) {
825 break;
826 }
827
828 session.setCurrentWriteRequest(req);
829 }
830
831 int localWrittenBytes = 0;
832 Object message = req.getMessage();
833
834 if (message instanceof IoBuffer) {
835 localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
836 currentTime);
837
838 if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
839 // the buffer isn't empty, we re-interest it in writing
840 writtenBytes += localWrittenBytes;
841 setInterestedInWrite(session, true);
842 return false;
843 }
844 } else if (message instanceof FileRegion) {
845 localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
846 currentTime);
847
848 // Fix for Java bug on Linux
849 // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
850 // If there's still data to be written in the FileRegion,
851 // return 0 indicating that we need
852 // to pause until writing may resume.
853 if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
854 writtenBytes += localWrittenBytes;
855 setInterestedInWrite(session, true);
856 return false;
857 }
858 } else {
859 throw new IllegalStateException("Don't know how to handle message of type '"
860 + message.getClass().getName() + "'. Are you missing a protocol encoder?");
861 }
862
863 if (localWrittenBytes == 0) {
864 // Kernel buffer is full.
865 setInterestedInWrite(session, true);
866 return false;
867 }
868
869 writtenBytes += localWrittenBytes;
870
871 if (writtenBytes >= maxWrittenBytes) {
872 // Wrote too much
873 scheduleFlush(session);
874 return false;
875 }
876
877 if (message instanceof IoBuffer) {
878 ((IoBuffer) message).free();
879 }
880 } while (writtenBytes < maxWrittenBytes);
881 } catch (Exception e) {
882 if (req != null) {
883 req.getFuture().setException(e);
884 }
885
886 IoFilterChain filterChain = session.getFilterChain();
887 filterChain.fireExceptionCaught(e);
888 return false;
889 }
890
891 return true;
892 }
893
894 private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
895 throws Exception {
896 IoBuffer buf = (IoBuffer) req.getMessage();
897 int localWrittenBytes = 0;
898
899 if (buf.hasRemaining()) {
900 int length;
901
902 if (hasFragmentation) {
903 length = Math.min(buf.remaining(), maxLength);
904 } else {
905 length = buf.remaining();
906 }
907
908 try {
909 localWrittenBytes = write(session, buf, length);
910 } catch (IOException ioe) {
911 // We have had an issue while trying to send data to the
912 // peer : let's close the session.
913 buf.free();
914 session.close(true);
915 destroy(session);
916
917 return 0;
918 }
919
920 }
921
922 session.increaseWrittenBytes(localWrittenBytes, currentTime);
923
924 if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
925 // Buffer has been sent, clear the current request.
926 int pos = buf.position();
927 buf.reset();
928
929 fireMessageSent(session, req);
930
931 // And set it back to its position
932 buf.position(pos);
933 }
934
935 return localWrittenBytes;
936 }
937
938 private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
939 throws Exception {
940 int localWrittenBytes;
941 FileRegion region = (FileRegion) req.getMessage();
942
943 if (region.getRemainingBytes() > 0) {
944 int length;
945
946 if (hasFragmentation) {
947 length = (int) Math.min(region.getRemainingBytes(), maxLength);
948 } else {
949 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
950 }
951
952 localWrittenBytes = transferFile(session, region, length);
953 region.update(localWrittenBytes);
954 } else {
955 localWrittenBytes = 0;
956 }
957
958 session.increaseWrittenBytes(localWrittenBytes, currentTime);
959
960 if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
961 fireMessageSent(session, req);
962 }
963
964 return localWrittenBytes;
965 }
966
967 private void fireMessageSent(S session, WriteRequest req) {
968 session.setCurrentWriteRequest(null);
969 IoFilterChain filterChain = session.getFilterChain();
970 filterChain.fireMessageSent(req);
971 }
972
973 /**
974 * Update the trafficControl for all the session.
975 */
976 private void updateTrafficMask() {
977 int queueSize = trafficControllingSessions.size();
978
979 while (queueSize > 0) {
980 S session = trafficControllingSessions.poll();
981
982 if (session == null) {
983 // We are done with this queue.
984 return;
985 }
986
987 SessionState state = getState(session);
988
989 switch (state) {
990 case OPENED:
991 updateTrafficControl(session);
992
993 break;
994
995 case CLOSING:
996 break;
997
998 case OPENING:
999 // Retry later if session is not yet fully initialized.
1000 // (In case that Session.suspend??() or session.resume??() is
1001 // called before addSession() is processed)
1002 // We just put back the session at the end of the queue.
1003 trafficControllingSessions.add(session);
1004 break;
1005
1006 default:
1007 throw new IllegalStateException(String.valueOf(state));
1008 }
1009
1010 // As we have handled one session, decrement the number of
1011 // remaining sessions. The OPENING session will be processed
1012 // with the next select(), as the queue size has been decreased,
1013 // even
1014 // if the session has been pushed at the end of the queue
1015 queueSize--;
1016 }
1017 }
1018
1019 /**
1020 * {@inheritDoc}
1021 */
1022 public void updateTrafficControl(S session) {
1023 //
1024 try {
1025 setInterestedInRead(session, !session.isReadSuspended());
1026 } catch (Exception e) {
1027 IoFilterChain filterChain = session.getFilterChain();
1028 filterChain.fireExceptionCaught(e);
1029 }
1030
1031 try {
1032 setInterestedInWrite(session,
1033 !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended());
1034 } catch (Exception e) {
1035 IoFilterChain filterChain = session.getFilterChain();
1036 filterChain.fireExceptionCaught(e);
1037 }
1038 }
1039
1040 /**
1041 * The main loop. This is the place in charge to poll the Selector, and to
1042 * process the active sessions. It's done in - handle the newly created
1043 * sessions -
1044 */
1045 private class Processor implements Runnable {
1046 public void run() {
1047 assert (processorRef.get() == this);
1048
1049 int nSessions = 0;
1050 lastIdleCheckTime = System.currentTimeMillis();
1051
1052 for (;;) {
1053 try {
1054 // This select has a timeout so that we can manage
1055 // idle session when we get out of the select every
1056 // second. (note : this is a hack to avoid creating
1057 // a dedicated thread).
1058 long t0 = System.currentTimeMillis();
1059 int selected = select(SELECT_TIMEOUT);
1060 long t1 = System.currentTimeMillis();
1061 long delta = (t1 - t0);
1062
1063 if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
1064 // Last chance : the select() may have been
1065 // interrupted because we have had an closed channel.
1066 if (isBrokenConnection()) {
1067 LOG.warn("Broken connection");
1068 } else {
1069 LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
1070 // Ok, we are hit by the nasty epoll
1071 // spinning.
1072 // Basically, there is a race condition
1073 // which causes a closing file descriptor not to be
1074 // considered as available as a selected channel,
1075 // but
1076 // it stopped the select. The next time we will
1077 // call select(), it will exit immediately for the
1078 // same
1079 // reason, and do so forever, consuming 100%
1080 // CPU.
1081 // We have to destroy the selector, and
1082 // register all the socket on a new one.
1083 registerNewSelector();
1084 }
1085 }
1086
1087 // Manage newly created session first
1088 nSessions += handleNewSessions();
1089
1090 updateTrafficMask();
1091
1092 // Now, if we have had some incoming or outgoing events,
1093 // deal with them
1094 if (selected > 0) {
1095 // LOG.debug("Processing ..."); // This log hurts one of
1096 // the MDCFilter test...
1097 process();
1098 }
1099
1100 // Write the pending requests
1101 long currentTime = System.currentTimeMillis();
1102 flush(currentTime);
1103
1104 // And manage removed sessions
1105 nSessions -= removeSessions();
1106
1107 // Last, not least, send Idle events to the idle sessions
1108 notifyIdleSessions(currentTime);
1109
1110 // Get a chance to exit the infinite loop if there are no
1111 // more sessions on this Processor
1112 if (nSessions == 0) {
1113 processorRef.set(null);
1114
1115 if (newSessions.isEmpty() && isSelectorEmpty()) {
1116 // newSessions.add() precedes startupProcessor
1117 assert (processorRef.get() != this);
1118 break;
1119 }
1120
1121 assert (processorRef.get() != this);
1122
1123 if (!processorRef.compareAndSet(null, this)) {
1124 // startupProcessor won race, so must exit processor
1125 assert (processorRef.get() != this);
1126 break;
1127 }
1128
1129 assert (processorRef.get() == this);
1130 }
1131
1132 // Disconnect all sessions immediately if disposal has been
1133 // requested so that we exit this loop eventually.
1134 if (isDisposing()) {
1135 boolean hasKeys = false;
1136
1137 for (Iterator<S> i = allSessions(); i.hasNext();) {
1138 IoSession session = i.next();
1139
1140 if (session.isActive()) {
1141 scheduleRemove((S)session);
1142 hasKeys = true;
1143 }
1144 }
1145
1146 if (hasKeys) {
1147 wakeup();
1148 }
1149 }
1150 } catch (ClosedSelectorException cse) {
1151 // If the selector has been closed, we can exit the loop
1152 // But first, dump a stack trace
1153 ExceptionMonitor.getInstance().exceptionCaught(cse);
1154 break;
1155 } catch (Exception e) {
1156 ExceptionMonitor.getInstance().exceptionCaught(e);
1157
1158 try {
1159 Thread.sleep(1000);
1160 } catch (InterruptedException e1) {
1161 ExceptionMonitor.getInstance().exceptionCaught(e1);
1162 }
1163 }
1164 }
1165
1166 try {
1167 synchronized (disposalLock) {
1168 if (disposing) {
1169 doDispose();
1170 }
1171 }
1172 } catch (Exception e) {
1173 ExceptionMonitor.getInstance().exceptionCaught(e);
1174 } finally {
1175 disposalFuture.setValue(true);
1176 }
1177 }
1178 }
1179 }