1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.channel.oio;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelPipeline;
21 import io.netty.channel.RecvByteBufAllocator;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26
27 /**
28 * Abstract base class for OIO which reads and writes objects from/to a Socket
29 *
30 * @deprecated use NIO / EPOLL / KQUEUE transport.
31 */
32 @Deprecated
33 public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
34
35 private final List<Object> readBuf = new ArrayList<Object>();
36
37 protected AbstractOioMessageChannel(Channel parent) {
38 super(parent);
39 }
40
41 @Override
42 protected void doRead() {
43 if (!readPending) {
44 // We have to check readPending here because the Runnable to read could have been scheduled and later
45 // during the same read loop readPending was set to false.
46 return;
47 }
48 // In OIO we should set readPending to false even if the read was not successful so we can schedule
49 // another read on the event loop if no reads are done.
50 readPending = false;
51
52 final ChannelConfig config = config();
53 final ChannelPipeline pipeline = pipeline();
54 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
55 allocHandle.reset(config);
56
57 boolean closed = false;
58 Throwable exception = null;
59 try {
60 do {
61 // Perform a read.
62 int localRead = doReadMessages(readBuf);
63 if (localRead == 0) {
64 break;
65 }
66 if (localRead < 0) {
67 closed = true;
68 break;
69 }
70
71 allocHandle.incMessagesRead(localRead);
72 } while (allocHandle.continueReading());
73 } catch (Throwable t) {
74 exception = t;
75 }
76
77 boolean readData = false;
78 int size = readBuf.size();
79 if (size > 0) {
80 readData = true;
81 for (int i = 0; i < size; i++) {
82 readPending = false;
83 pipeline.fireChannelRead(readBuf.get(i));
84 }
85 readBuf.clear();
86 allocHandle.readComplete();
87 pipeline.fireChannelReadComplete();
88 }
89
90 if (exception != null) {
91 if (exception instanceof IOException) {
92 closed = true;
93 }
94
95 pipeline.fireExceptionCaught(exception);
96 }
97
98 if (closed) {
99 if (isOpen()) {
100 unsafe().close(unsafe().voidPromise());
101 }
102 } else if (readPending || config.isAutoRead() || !readData && isActive()) {
103 // Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we
104 // should execute read() again because no data may have been read.
105 read();
106 }
107 }
108
109 /**
110 * Read messages into the given array and return the amount which was read.
111 */
112 protected abstract int doReadMessages(List<Object> msgs) throws Exception;
113 }