查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.channel.ChannelFuture;
20  import org.jboss.netty.channel.DefaultFileRegion;
21  import org.jboss.netty.channel.FileRegion;
22  
23  import java.io.IOException;
24  import java.io.OutputStream;
25  import java.io.PushbackInputStream;
26  import java.net.SocketException;
27  import java.nio.channels.Channels;
28  import java.nio.channels.ClosedChannelException;
29  import java.nio.channels.WritableByteChannel;
30  import java.util.regex.Pattern;
31  
32  import static org.jboss.netty.channel.Channels.*;
33  
34  class OioWorker extends AbstractOioWorker<OioSocketChannel> {
35  
36      private static final Pattern SOCKET_CLOSED_MESSAGE = Pattern.compile(
37              "^.*(?:Socket.*closed).*$", Pattern.CASE_INSENSITIVE);
38  
39      OioWorker(OioSocketChannel channel) {
40          super(channel);
41      }
42  
43      @Override
44      public void run() {
45          boolean fireConnected = channel instanceof OioAcceptedSocketChannel;
46          if (fireConnected && channel.isOpen()) {
47              // Fire the channelConnected event for OioAcceptedSocketChannel.
48              // See #287
49              fireChannelConnected(channel, channel.getRemoteAddress());
50          }
51          super.run();
52      }
53  
54      @Override
55      boolean process() throws IOException {
56          byte[] buf;
57          int readBytes;
58          PushbackInputStream in = channel.getInputStream();
59          int bytesToRead = in.available();
60          if (bytesToRead > 0) {
61              buf = new byte[bytesToRead];
62              readBytes = in.read(buf);
63          } else {
64              int b = in.read();
65              if (b < 0) {
66                  return false;
67              }
68              in.unread(b);
69              return true;
70          }
71          fireMessageReceived(channel, channel.getConfig().getBufferFactory().getBuffer(buf, 0, readBytes));
72  
73          return true;
74      }
75  
76      static void write(
77              OioSocketChannel channel, ChannelFuture future,
78              Object message) {
79  
80          boolean iothread = isIoThread(channel);
81          OutputStream out = channel.getOutputStream();
82          if (out == null) {
83              Exception e = new ClosedChannelException();
84              future.setFailure(e);
85              if (iothread) {
86                  fireExceptionCaught(channel, e);
87              } else {
88                  fireExceptionCaughtLater(channel, e);
89              }
90              return;
91          }
92  
93          try {
94              int length = 0;
95  
96              // Add support to write a FileRegion. This in fact will not give any performance gain
97              // but at least it not fail and we did the best to emulate it
98              if (message instanceof FileRegion) {
99                  FileRegion fr = (FileRegion) message;
100                 try {
101                     synchronized (out) {
102                         WritableByteChannel  bchannel = Channels.newChannel(out);
103 
104                         long i;
105                         while ((i = fr.transferTo(bchannel, length)) > 0) {
106                             length += i;
107                             if (length >= fr.getCount()) {
108                                 break;
109                             }
110                         }
111                     }
112                 } finally {
113                     if (fr instanceof DefaultFileRegion) {
114                         DefaultFileRegion dfr = (DefaultFileRegion) fr;
115                         if (dfr.releaseAfterTransfer()) {
116                             fr.releaseExternalResources();
117                         }
118                     }
119                 }
120             } else {
121                 ChannelBuffer a = (ChannelBuffer) message;
122                 length = a.readableBytes();
123                 synchronized (out) {
124                     a.getBytes(a.readerIndex(), out, length);
125                 }
126             }
127 
128             future.setSuccess();
129             if (iothread) {
130                 fireWriteComplete(channel, length);
131             } else {
132                 fireWriteCompleteLater(channel, length);
133             }
134 
135         } catch (Throwable t) {
136             // Convert 'SocketException: Socket closed' to
137             // ClosedChannelException.
138             if (t instanceof SocketException &&
139                     SOCKET_CLOSED_MESSAGE.matcher(
140                             String.valueOf(t.getMessage())).matches()) {
141                 t = new ClosedChannelException();
142             }
143             future.setFailure(t);
144             if (iothread) {
145                 fireExceptionCaught(channel, t);
146             } else {
147                 fireExceptionCaughtLater(channel, t);
148             }
149         }
150     }
151 }