查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.oio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.FileRegion;
21  import io.netty.channel.RecvByteBufAllocator;
22  import io.netty.util.internal.ObjectUtil;
23  
24  import java.io.EOFException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.OutputStream;
28  import java.nio.channels.Channels;
29  import java.nio.channels.ClosedChannelException;
30  import java.nio.channels.NotYetConnectedException;
31  import java.nio.channels.WritableByteChannel;
32  
33  /**
34   * Abstract base class for OIO Channels that are based on streams.
35   *
36   * @deprecated use NIO / EPOLL / KQUEUE transport.
37   */
38  @Deprecated
39  public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
40  
41      private static final InputStream CLOSED_IN = new InputStream() {
42          @Override
43          public int read() {
44              return -1;
45          }
46      };
47  
48      private static final OutputStream CLOSED_OUT = new OutputStream() {
49          @Override
50          public void write(int b) throws IOException {
51              throw new ClosedChannelException();
52          }
53      };
54  
55      private InputStream is;
56      private OutputStream os;
57      private WritableByteChannel outChannel;
58  
59      /**
60       * Create a new instance
61       *
62       * @param parent    the parent {@link Channel} which was used to create this instance. This can be null if the
63       *                  {@link} has no parent as it was created by your self.
64       */
65      protected OioByteStreamChannel(Channel parent) {
66          super(parent);
67      }
68  
69      /**
70       * Activate this instance. After this call {@link #isActive()} will return {@code true}.
71       */
72      protected final void activate(InputStream is, OutputStream os) {
73          if (this.is != null) {
74              throw new IllegalStateException("input was set already");
75          }
76          if (this.os != null) {
77              throw new IllegalStateException("output was set already");
78          }
79          this.is = ObjectUtil.checkNotNull(is, "is");
80          this.os = ObjectUtil.checkNotNull(os, "os");
81          if (readWhenInactive) {
82              eventLoop().execute(readTask);
83              readWhenInactive = false;
84          }
85      }
86  
87      @Override
88      public boolean isActive() {
89          InputStream is = this.is;
90          if (is == null || is == CLOSED_IN) {
91              return false;
92          }
93  
94          OutputStream os = this.os;
95          return !(os == null || os == CLOSED_OUT);
96      }
97  
98      @Override
99      protected int available() {
100         try {
101             return is.available();
102         } catch (IOException ignored) {
103             return 0;
104         }
105     }
106 
107     @Override
108     protected int doReadBytes(ByteBuf buf) throws Exception {
109         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
110         allocHandle.attemptedBytesRead(Math.max(1, Math.min(available(), buf.maxWritableBytes())));
111         return buf.writeBytes(is, allocHandle.attemptedBytesRead());
112     }
113 
114     @Override
115     protected void doWriteBytes(ByteBuf buf) throws Exception {
116         OutputStream os = this.os;
117         if (os == null) {
118             throw new NotYetConnectedException();
119         }
120         buf.readBytes(os, buf.readableBytes());
121     }
122 
123     @Override
124     protected void doWriteFileRegion(FileRegion region) throws Exception {
125         OutputStream os = this.os;
126         if (os == null) {
127             throw new NotYetConnectedException();
128         }
129         if (outChannel == null) {
130             outChannel = Channels.newChannel(os);
131         }
132 
133         long written = 0;
134         for (;;) {
135             long localWritten = region.transferTo(outChannel, written);
136             if (localWritten == -1) {
137                 checkEOF(region);
138                 return;
139             }
140             written += localWritten;
141 
142             if (written >= region.count()) {
143                 return;
144             }
145         }
146     }
147 
148     private static void checkEOF(FileRegion region) throws IOException {
149         if (region.transferred() < region.count()) {
150             throw new EOFException("Expected to be able to write " + region.count() + " bytes, " +
151                                    "but only wrote " + region.transferred());
152         }
153     }
154 
155     @Override
156     protected void doClose() throws Exception {
157         InputStream is = this.is;
158         OutputStream os = this.os;
159         this.is = CLOSED_IN;
160         this.os = CLOSED_OUT;
161 
162         try {
163             if (is != null) {
164                 is.close();
165             }
166         } finally {
167             if (os != null) {
168                 os.close();
169             }
170         }
171     }
172 }