1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.example.redis;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelInitializer;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.EventLoopGroup;
24 import io.netty.channel.nio.NioEventLoopGroup;
25 import io.netty.channel.socket.SocketChannel;
26 import io.netty.channel.socket.nio.NioSocketChannel;
27 import io.netty.handler.codec.redis.RedisArrayAggregator;
28 import io.netty.handler.codec.redis.RedisBulkStringAggregator;
29 import io.netty.handler.codec.redis.RedisDecoder;
30 import io.netty.handler.codec.redis.RedisEncoder;
31 import io.netty.util.concurrent.GenericFutureListener;
32
33 import java.io.BufferedReader;
34 import java.io.InputStreamReader;
35
36
37
38
39 public class RedisClient {
40 private static final String HOST = System.getProperty("host", "127.0.0.1");
41 private static final int PORT = Integer.parseInt(System.getProperty("port", "6379"));
42
43 public static void main(String[] args) throws Exception {
44 EventLoopGroup group = new NioEventLoopGroup();
45 try {
46 Bootstrap b = new Bootstrap();
47 b.group(group)
48 .channel(NioSocketChannel.class)
49 .handler(new ChannelInitializer<SocketChannel>() {
50 @Override
51 protected void initChannel(SocketChannel ch) throws Exception {
52 ChannelPipeline p = ch.pipeline();
53 p.addLast(new RedisDecoder());
54 p.addLast(new RedisBulkStringAggregator());
55 p.addLast(new RedisArrayAggregator());
56 p.addLast(new RedisEncoder());
57 p.addLast(new RedisClientHandler());
58 }
59 });
60
61
62 Channel ch = b.connect(HOST, PORT).sync().channel();
63
64
65 System.out.println("Enter Redis commands (quit to end)");
66 ChannelFuture lastWriteFuture = null;
67 BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
68 for (;;) {
69 final String input = in.readLine();
70 final String line = input != null ? input.trim() : null;
71 if (line == null || "quit".equalsIgnoreCase(line)) {
72 ch.close().sync();
73 break;
74 } else if (line.isEmpty()) {
75 continue;
76 }
77
78 lastWriteFuture = ch.writeAndFlush(line);
79 lastWriteFuture.addListener(new GenericFutureListener<ChannelFuture>() {
80 @Override
81 public void operationComplete(ChannelFuture future) throws Exception {
82 if (!future.isSuccess()) {
83 System.err.print("write failed: ");
84 future.cause().printStackTrace(System.err);
85 }
86 }
87 });
88 }
89
90
91 if (lastWriteFuture != null) {
92 lastWriteFuture.sync();
93 }
94 } finally {
95 group.shutdownGracefully();
96 }
97 }
98 }