1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.channel.ChannelFuture;
20 import org.jboss.netty.channel.DefaultFileRegion;
21 import org.jboss.netty.channel.FileRegion;
22
23 import java.io.IOException;
24 import java.io.OutputStream;
25 import java.io.PushbackInputStream;
26 import java.net.SocketException;
27 import java.nio.channels.Channels;
28 import java.nio.channels.ClosedChannelException;
29 import java.nio.channels.WritableByteChannel;
30 import java.util.regex.Pattern;
31
32 import static org.jboss.netty.channel.Channels.*;
33
34 class OioWorker extends AbstractOioWorker<OioSocketChannel> {
35
36 private static final Pattern SOCKET_CLOSED_MESSAGE = Pattern.compile(
37 "^.*(?:Socket.*closed).*$", Pattern.CASE_INSENSITIVE);
38
39 OioWorker(OioSocketChannel channel) {
40 super(channel);
41 }
42
43 @Override
44 public void run() {
45 boolean fireConnected = channel instanceof OioAcceptedSocketChannel;
46 if (fireConnected && channel.isOpen()) {
47
48
49 fireChannelConnected(channel, channel.getRemoteAddress());
50 }
51 super.run();
52 }
53
54 @Override
55 boolean process() throws IOException {
56 byte[] buf;
57 int readBytes;
58 PushbackInputStream in = channel.getInputStream();
59 int bytesToRead = in.available();
60 if (bytesToRead > 0) {
61 buf = new byte[bytesToRead];
62 readBytes = in.read(buf);
63 } else {
64 int b = in.read();
65 if (b < 0) {
66 return false;
67 }
68 in.unread(b);
69 return true;
70 }
71 fireMessageReceived(channel, channel.getConfig().getBufferFactory().getBuffer(buf, 0, readBytes));
72
73 return true;
74 }
75
76 static void write(
77 OioSocketChannel channel, ChannelFuture future,
78 Object message) {
79
80 boolean iothread = isIoThread(channel);
81 OutputStream out = channel.getOutputStream();
82 if (out == null) {
83 Exception e = new ClosedChannelException();
84 future.setFailure(e);
85 if (iothread) {
86 fireExceptionCaught(channel, e);
87 } else {
88 fireExceptionCaughtLater(channel, e);
89 }
90 return;
91 }
92
93 try {
94 int length = 0;
95
96
97
98 if (message instanceof FileRegion) {
99 FileRegion fr = (FileRegion) message;
100 try {
101 synchronized (out) {
102 WritableByteChannel bchannel = Channels.newChannel(out);
103
104 long i;
105 while ((i = fr.transferTo(bchannel, length)) > 0) {
106 length += i;
107 if (length >= fr.getCount()) {
108 break;
109 }
110 }
111 }
112 } finally {
113 if (fr instanceof DefaultFileRegion) {
114 DefaultFileRegion dfr = (DefaultFileRegion) fr;
115 if (dfr.releaseAfterTransfer()) {
116 fr.releaseExternalResources();
117 }
118 }
119 }
120 } else {
121 ChannelBuffer a = (ChannelBuffer) message;
122 length = a.readableBytes();
123 synchronized (out) {
124 a.getBytes(a.readerIndex(), out, length);
125 }
126 }
127
128 future.setSuccess();
129 if (iothread) {
130 fireWriteComplete(channel, length);
131 } else {
132 fireWriteCompleteLater(channel, length);
133 }
134
135 } catch (Throwable t) {
136
137
138 if (t instanceof SocketException &&
139 SOCKET_CLOSED_MESSAGE.matcher(
140 String.valueOf(t.getMessage())).matches()) {
141 t = new ClosedChannelException();
142 }
143 future.setFailure(t);
144 if (iothread) {
145 fireExceptionCaught(channel, t);
146 } else {
147 fireExceptionCaughtLater(channel, t);
148 }
149 }
150 }
151 }