查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2020 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.pcap;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelDuplexHandler;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInboundHandlerAdapter;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.ServerChannel;
26  import io.netty.channel.socket.DatagramChannel;
27  import io.netty.channel.socket.DatagramPacket;
28  import io.netty.channel.socket.ServerSocketChannel;
29  import io.netty.channel.socket.SocketChannel;
30  import io.netty.util.NetUtil;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.io.Closeable;
35  import java.io.IOException;
36  import java.io.OutputStream;
37  import java.net.Inet4Address;
38  import java.net.Inet6Address;
39  import java.net.InetAddress;
40  import java.net.InetSocketAddress;
41  import java.net.UnknownHostException;
42  import java.util.concurrent.atomic.AtomicReference;
43  
44  import static io.netty.util.internal.ObjectUtil.checkNotNull;
45  
46  /**
47   * <p> {@link PcapWriteHandler} captures {@link ByteBuf} from {@link SocketChannel} / {@link ServerChannel}
48   * or {@link DatagramPacket} and writes it into Pcap {@link OutputStream}. </p>
49   *
50   * <p>
51   * Things to keep in mind when using {@link PcapWriteHandler} with TCP:
52   *
53   *    <ul>
54   *        <li> Whenever {@link ChannelInboundHandlerAdapter#channelActive(ChannelHandlerContext)} is called,
55   *        a fake TCP 3-way handshake (SYN, SYN+ACK, ACK) is simulated as new connection in Pcap. </li>
56   *
57   *        <li> Whenever {@link ChannelInboundHandlerAdapter#handlerRemoved(ChannelHandlerContext)} is called,
58   *        a fake TCP 3-way handshake (FIN+ACK, FIN+ACK, ACK) is simulated as connection shutdown in Pcap.  </li>
59   *
60   *        <li> Whenever {@link ChannelInboundHandlerAdapter#exceptionCaught(ChannelHandlerContext, Throwable)}
61   *        is called, a fake TCP RST is sent to simulate connection Reset in Pcap. </li>
62   *
63   *        <li> ACK is sent each time data is send / received. </li>
64   *
65   *        <li> Zero Length Data Packets can cause TCP Double ACK error in Wireshark. To tackle this,
66   *        set {@code captureZeroByte} to {@code false}. </li>
67   *    </ul>
68   * </p>
69   */
70  public final class PcapWriteHandler extends ChannelDuplexHandler implements Closeable {
71  
72      /**
73       * Logger for logging events
74       */
75      private final InternalLogger logger = InternalLoggerFactory.getInstance(PcapWriteHandler.class);
76  
77      /**
78       * {@link PcapWriter} Instance
79       */
80      private PcapWriter pCapWriter;
81  
82      /**
83       * {@link OutputStream} where we'll write Pcap data.
84       */
85      private final OutputStream outputStream;
86  
87      /**
88       * {@code true} if we want to capture packets with zero bytes else {@code false}.
89       */
90      private final boolean captureZeroByte;
91  
92      /**
93       * {@code true} if we want to write Pcap Global Header on initialization of
94       * {@link PcapWriter} else {@code false}.
95       */
96      private final boolean writePcapGlobalHeader;
97  
98      /**
99       * {@code true} if we want to synchronize on the {@link OutputStream} while writing
100      * else {@code false}.
101      */
102     private final boolean sharedOutputStream;
103 
104     /**
105      * TCP Sender Segment Number.
106      * It'll start with 1 and keep incrementing with number of bytes read/sent.
107      */
108     private int sendSegmentNumber = 1;
109 
110     /**
111      * TCP Receiver Segment Number.
112      * It'll start with 1 and keep incrementing with number of bytes read/sent.
113      */
114     private int receiveSegmentNumber = 1;
115 
116     /**
117      * Type of the channel this handler is registered on
118      */
119     private ChannelType channelType;
120 
121     /**
122      * Address of the initiator of the connection
123      */
124     private InetSocketAddress initiatorAddr;
125 
126     /**
127      * Address of the receiver of the connection
128      */
129     private InetSocketAddress handlerAddr;
130 
131     /**
132      * Set to {@code true} if this handler is registered on a server pipeline
133      */
134     private boolean isServerPipeline;
135 
136     /**
137      * Current of this {@link PcapWriteHandler}
138      */
139     private final AtomicReference<State> state = new AtomicReference<State>(State.INIT);
140 
141     /**
142      * Create new {@link PcapWriteHandler} Instance.
143      * {@code captureZeroByte} is set to {@code false} and
144      * {@code writePcapGlobalHeader} is set to {@code true}.
145      *
146      * @param outputStream OutputStream where Pcap data will be written. Call {@link #close()} to close this
147      *                     OutputStream.
148      * @throws NullPointerException If {@link OutputStream} is {@code null} then we'll throw an
149      *                              {@link NullPointerException}
150      *
151      * @deprecated Use {@link Builder} instead.
152      */
153     @Deprecated
154     public PcapWriteHandler(OutputStream outputStream) {
155         this(outputStream, false, true);
156     }
157 
158     /**
159      * Create new {@link PcapWriteHandler} Instance
160      *
161      * @param outputStream          OutputStream where Pcap data will be written. Call {@link #close()} to close this
162      *                              OutputStream.
163      * @param captureZeroByte       Set to {@code true} to enable capturing packets with empty (0 bytes) payload.
164      *                              Otherwise, if set to {@code false}, empty packets will be filtered out.
165      * @param writePcapGlobalHeader Set to {@code true} to write Pcap Global Header on initialization.
166      *                              Otherwise, if set to {@code false}, Pcap Global Header will not be written
167      *                              on initialization. This could when writing Pcap data on a existing file where
168      *                              Pcap Global Header is already present.
169      * @throws NullPointerException If {@link OutputStream} is {@code null} then we'll throw an
170      *                              {@link NullPointerException}
171      *
172      * @deprecated Use {@link Builder} instead.
173      */
174     @Deprecated
175     public PcapWriteHandler(OutputStream outputStream, boolean captureZeroByte, boolean writePcapGlobalHeader) {
176         this.outputStream = checkNotNull(outputStream, "OutputStream");
177         this.captureZeroByte = captureZeroByte;
178         this.writePcapGlobalHeader = writePcapGlobalHeader;
179         sharedOutputStream = false;
180     }
181 
182     private PcapWriteHandler(Builder builder, OutputStream outputStream) {
183         this.outputStream = outputStream;
184         captureZeroByte = builder.captureZeroByte;
185         sharedOutputStream = builder.sharedOutputStream;
186         writePcapGlobalHeader = builder.writePcapGlobalHeader;
187         channelType = builder.channelType;
188         handlerAddr = builder.handlerAddr;
189         initiatorAddr = builder.initiatorAddr;
190         isServerPipeline = builder.isServerPipeline;
191     }
192 
193     /**
194      * Writes the Pcap Global Header to the provided {@code OutputStream}
195      *
196      * @param outputStream OutputStream where Pcap data will be written.
197      * @throws IOException if there is an error writing to the {@code OutputStream}
198      */
199     public static void writeGlobalHeader(OutputStream outputStream) throws IOException {
200         PcapHeaders.writeGlobalHeader(outputStream);
201     }
202 
203     private void initializeIfNecessary(ChannelHandlerContext ctx) throws Exception {
204         // If State is not 'INIT' then it means we're already initialized so then no need to initiaize again.
205         if (state.get() != State.INIT) {
206             return;
207         }
208 
209         pCapWriter = new PcapWriter(this);
210 
211         if (channelType == null) {
212             // infer channel type
213             if (ctx.channel() instanceof SocketChannel) {
214                 channelType = ChannelType.TCP;
215 
216                 // If Channel belongs to `SocketChannel` then we're handling TCP.
217                 // Capture correct `localAddress` and `remoteAddress`
218                 if (ctx.channel().parent() instanceof ServerSocketChannel) {
219                     isServerPipeline = true;
220                     initiatorAddr = (InetSocketAddress) ctx.channel().remoteAddress();
221                     handlerAddr = getLocalAddress(ctx.channel(), initiatorAddr);
222                 } else {
223                     isServerPipeline = false;
224                     handlerAddr = (InetSocketAddress) ctx.channel().remoteAddress();
225                     initiatorAddr = getLocalAddress(ctx.channel(), handlerAddr);
226                 }
227             } else if (ctx.channel() instanceof DatagramChannel) {
228                 channelType = ChannelType.UDP;
229 
230                 DatagramChannel datagramChannel = (DatagramChannel) ctx.channel();
231 
232                 // If `DatagramChannel` is connected then we can get
233                 // `localAddress` and `remoteAddress` from Channel.
234                 if (datagramChannel.isConnected()) {
235                     handlerAddr = (InetSocketAddress) ctx.channel().remoteAddress();
236                     initiatorAddr = getLocalAddress(ctx.channel(), handlerAddr);
237                 }
238             }
239         }
240 
241         if (channelType == ChannelType.TCP) {
242             logger.debug("Initiating Fake TCP 3-Way Handshake");
243 
244             ByteBuf tcpBuf = ctx.alloc().buffer();
245 
246             try {
247                 // Write SYN with Normal Source and Destination Address
248                 TCPPacket.writePacket(tcpBuf, null, 0, 0,
249                         initiatorAddr.getPort(), handlerAddr.getPort(), TCPPacket.TCPFlag.SYN);
250                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, ctx.alloc(), ctx);
251 
252                 // Write SYN+ACK with Reversed Source and Destination Address
253                 TCPPacket.writePacket(tcpBuf, null, 0, 1,
254                         handlerAddr.getPort(), initiatorAddr.getPort(), TCPPacket.TCPFlag.SYN, TCPPacket.TCPFlag.ACK);
255                 completeTCPWrite(handlerAddr, initiatorAddr, tcpBuf, ctx.alloc(), ctx);
256 
257                 // Write ACK with Normal Source and Destination Address
258                 TCPPacket.writePacket(tcpBuf, null, 1, 1, initiatorAddr.getPort(),
259                         handlerAddr.getPort(), TCPPacket.TCPFlag.ACK);
260                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, ctx.alloc(), ctx);
261             } finally {
262                 tcpBuf.release();
263             }
264 
265             logger.debug("Finished Fake TCP 3-Way Handshake");
266         }
267 
268         state.set(State.WRITING);
269     }
270 
271     @Override
272     public void channelActive(ChannelHandlerContext ctx) throws Exception {
273         initializeIfNecessary(ctx);
274         super.channelActive(ctx);
275     }
276 
277     @Override
278     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
279         // Initialize if needed
280         if (state.get() == State.INIT) {
281             initializeIfNecessary(ctx);
282         }
283 
284         // Only write if State is STARTED
285         if (state.get() == State.WRITING) {
286             if (channelType == ChannelType.TCP) {
287                 handleTCP(ctx, msg, false);
288             } else if (channelType == ChannelType.UDP) {
289                 handleUDP(ctx, msg);
290             } else {
291                 logDiscard();
292             }
293         }
294         super.channelRead(ctx, msg);
295     }
296 
297     @Override
298     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
299         // Initialize if needed
300         if (state.get() == State.INIT) {
301             initializeIfNecessary(ctx);
302         }
303 
304         // Only write if State is STARTED
305         if (state.get() == State.WRITING) {
306             if (channelType == ChannelType.TCP) {
307                 handleTCP(ctx, msg, true);
308             } else if (channelType == ChannelType.UDP) {
309                 handleUDP(ctx, msg);
310             } else {
311                 logDiscard();
312             }
313         }
314         super.write(ctx, msg, promise);
315     }
316 
317     /**
318      * Handle TCP L4
319      *
320      * @param ctx              {@link ChannelHandlerContext} for {@link ByteBuf} allocation and
321      *                         {@code fireExceptionCaught}
322      * @param msg              {@link Object} must be {@link ByteBuf} else it'll be discarded
323      * @param isWriteOperation Set {@code true} if we have to process packet when packets are being sent out
324      *                         else set {@code false}
325      */
326     private void handleTCP(ChannelHandlerContext ctx, Object msg, boolean isWriteOperation) {
327         if (msg instanceof ByteBuf) {
328 
329             // If bytes are 0 and `captureZeroByte` is false, we won't capture this.
330             if (((ByteBuf) msg).readableBytes() == 0 && !captureZeroByte) {
331                 logger.debug("Discarding Zero Byte TCP Packet. isWriteOperation {}", isWriteOperation);
332                 return;
333             }
334 
335             ByteBufAllocator byteBufAllocator = ctx.alloc();
336             ByteBuf packet = ((ByteBuf) msg).duplicate();
337             ByteBuf tcpBuf = byteBufAllocator.buffer();
338             int bytes = packet.readableBytes();
339 
340             try {
341                 if (isWriteOperation) {
342                     final InetSocketAddress srcAddr;
343                     final InetSocketAddress dstAddr;
344                     if (isServerPipeline) {
345                         srcAddr = handlerAddr;
346                         dstAddr = initiatorAddr;
347                     } else {
348                         srcAddr = initiatorAddr;
349                         dstAddr = handlerAddr;
350                     }
351 
352                     TCPPacket.writePacket(tcpBuf, packet, sendSegmentNumber, receiveSegmentNumber, srcAddr.getPort(),
353                             dstAddr.getPort(), TCPPacket.TCPFlag.ACK);
354                     completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
355                     logTCP(true, bytes, sendSegmentNumber, receiveSegmentNumber, srcAddr, dstAddr, false);
356 
357                     sendSegmentNumber += bytes;
358 
359                     TCPPacket.writePacket(tcpBuf, null, receiveSegmentNumber, sendSegmentNumber, dstAddr.getPort(),
360                             srcAddr.getPort(), TCPPacket.TCPFlag.ACK);
361                     completeTCPWrite(dstAddr, srcAddr, tcpBuf, byteBufAllocator, ctx);
362                     logTCP(true, bytes, sendSegmentNumber, receiveSegmentNumber, dstAddr, srcAddr, true);
363                 } else {
364                     final InetSocketAddress srcAddr;
365                     final InetSocketAddress dstAddr;
366                     if (isServerPipeline) {
367                         srcAddr = initiatorAddr;
368                         dstAddr = handlerAddr;
369                     } else {
370                         srcAddr = handlerAddr;
371                         dstAddr = initiatorAddr;
372                     }
373 
374                     TCPPacket.writePacket(tcpBuf, packet, receiveSegmentNumber, sendSegmentNumber, srcAddr.getPort(),
375                             dstAddr.getPort(), TCPPacket.TCPFlag.ACK);
376                     completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
377                     logTCP(false, bytes, receiveSegmentNumber, sendSegmentNumber, srcAddr, dstAddr, false);
378 
379                     receiveSegmentNumber += bytes;
380 
381                     TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, dstAddr.getPort(),
382                             srcAddr.getPort(), TCPPacket.TCPFlag.ACK);
383                     completeTCPWrite(dstAddr, srcAddr, tcpBuf, byteBufAllocator, ctx);
384                     logTCP(false, bytes, sendSegmentNumber, receiveSegmentNumber, dstAddr, srcAddr, true);
385                 }
386             } finally {
387                 tcpBuf.release();
388             }
389         } else {
390             logger.debug("Discarding Pcap Write for TCP Object: {}", msg);
391         }
392     }
393 
394     /**
395      * Write TCP/IP L3 and L2 here.
396      *
397      * @param srcAddr          {@link InetSocketAddress} Source Address of this Packet
398      * @param dstAddr          {@link InetSocketAddress} Destination Address of this Packet
399      * @param tcpBuf           {@link ByteBuf} containing TCP L4 Data
400      * @param byteBufAllocator {@link ByteBufAllocator} for allocating bytes for TCP/IP L3 and L2 data.
401      * @param ctx              {@link ChannelHandlerContext} for {@code fireExceptionCaught}
402      */
403     private void completeTCPWrite(InetSocketAddress srcAddr, InetSocketAddress dstAddr, ByteBuf tcpBuf,
404                                   ByteBufAllocator byteBufAllocator, ChannelHandlerContext ctx) {
405 
406         ByteBuf ipBuf = byteBufAllocator.buffer();
407         ByteBuf ethernetBuf = byteBufAllocator.buffer();
408         ByteBuf pcap = byteBufAllocator.buffer();
409 
410         try {
411             if (srcAddr.getAddress() instanceof Inet4Address && dstAddr.getAddress() instanceof Inet4Address) {
412                 IPPacket.writeTCPv4(ipBuf, tcpBuf,
413                         NetUtil.ipv4AddressToInt((Inet4Address) srcAddr.getAddress()),
414                         NetUtil.ipv4AddressToInt((Inet4Address) dstAddr.getAddress()));
415 
416                 EthernetPacket.writeIPv4(ethernetBuf, ipBuf);
417             } else if (srcAddr.getAddress() instanceof Inet6Address && dstAddr.getAddress() instanceof Inet6Address) {
418                 IPPacket.writeTCPv6(ipBuf, tcpBuf,
419                         srcAddr.getAddress().getAddress(),
420                         dstAddr.getAddress().getAddress());
421 
422                 EthernetPacket.writeIPv6(ethernetBuf, ipBuf);
423             } else {
424                 logger.error("Source and Destination IP Address versions are not same. Source Address: {}, " +
425                         "Destination Address: {}", srcAddr.getAddress(), dstAddr.getAddress());
426                 return;
427             }
428 
429             // Write Packet into Pcap
430             pCapWriter.writePacket(pcap, ethernetBuf);
431         } catch (IOException ex) {
432             logger.error("Caught Exception While Writing Packet into Pcap", ex);
433             ctx.fireExceptionCaught(ex);
434         } finally {
435             ipBuf.release();
436             ethernetBuf.release();
437             pcap.release();
438         }
439     }
440 
441     /**
442      * Handle UDP l4
443      *
444      * @param ctx {@link ChannelHandlerContext} for {@code localAddress} / {@code remoteAddress},
445      *            {@link ByteBuf} allocation and {@code fireExceptionCaught}
446      * @param msg {@link DatagramPacket} or {@link ByteBuf}
447      */
448     private void handleUDP(ChannelHandlerContext ctx, Object msg) {
449         ByteBuf udpBuf = ctx.alloc().buffer();
450 
451         try {
452             if (msg instanceof DatagramPacket) {
453 
454                 // If bytes are 0 and `captureZeroByte` is false, we won't capture this.
455                 if (((DatagramPacket) msg).content().readableBytes() == 0 && !captureZeroByte) {
456                     logger.debug("Discarding Zero Byte UDP Packet");
457                     return;
458                 }
459 
460                 DatagramPacket datagramPacket = ((DatagramPacket) msg).duplicate();
461                 InetSocketAddress srcAddr = datagramPacket.sender();
462                 InetSocketAddress dstAddr = datagramPacket.recipient();
463 
464                 // If `datagramPacket.sender()` is `null` then DatagramPacket is initialized
465                 // `sender` (local) address. In this case, we'll get source address from Channel.
466                 if (srcAddr == null) {
467                     srcAddr = getLocalAddress(ctx.channel(), dstAddr);
468                 }
469 
470                 logger.debug("Writing UDP Data of {} Bytes, Src Addr {}, Dst Addr {}",
471                         datagramPacket.content().readableBytes(), srcAddr, dstAddr);
472 
473                 UDPPacket.writePacket(udpBuf, datagramPacket.content(), srcAddr.getPort(), dstAddr.getPort());
474                 completeUDPWrite(srcAddr, dstAddr, udpBuf, ctx.alloc(), ctx);
475             } else if (msg instanceof ByteBuf &&
476                     (!(ctx.channel() instanceof DatagramChannel) || ((DatagramChannel) ctx.channel()).isConnected())) {
477 
478                 // If bytes are 0 and `captureZeroByte` is false, we won't capture this.
479                 if (((ByteBuf) msg).readableBytes() == 0 && !captureZeroByte) {
480                     logger.debug("Discarding Zero Byte UDP Packet");
481                     return;
482                 }
483 
484                 ByteBuf byteBuf = ((ByteBuf) msg).duplicate();
485 
486                 logger.debug("Writing UDP Data of {} Bytes, Src Addr {}, Dst Addr {}",
487                         byteBuf.readableBytes(), initiatorAddr, handlerAddr);
488 
489                 UDPPacket.writePacket(udpBuf, byteBuf, initiatorAddr.getPort(), handlerAddr.getPort());
490                 completeUDPWrite(initiatorAddr, handlerAddr, udpBuf, ctx.alloc(), ctx);
491             } else {
492                 logger.debug("Discarding Pcap Write for UDP Object: {}", msg);
493             }
494         } finally {
495             udpBuf.release();
496         }
497     }
498 
499     /**
500      * Write UDP/IP L3 and L2 here.
501      *
502      * @param srcAddr          {@link InetSocketAddress} Source Address of this Packet
503      * @param dstAddr          {@link InetSocketAddress} Destination Address of this Packet
504      * @param udpBuf           {@link ByteBuf} containing UDP L4 Data
505      * @param byteBufAllocator {@link ByteBufAllocator} for allocating bytes for UDP/IP L3 and L2 data.
506      * @param ctx              {@link ChannelHandlerContext} for {@code fireExceptionCaught}
507      */
508     private void completeUDPWrite(InetSocketAddress srcAddr, InetSocketAddress dstAddr, ByteBuf udpBuf,
509                                   ByteBufAllocator byteBufAllocator, ChannelHandlerContext ctx) {
510 
511         ByteBuf ipBuf = byteBufAllocator.buffer();
512         ByteBuf ethernetBuf = byteBufAllocator.buffer();
513         ByteBuf pcap = byteBufAllocator.buffer();
514 
515         try {
516             if (srcAddr.getAddress() instanceof Inet4Address && dstAddr.getAddress() instanceof Inet4Address) {
517                 IPPacket.writeUDPv4(ipBuf, udpBuf,
518                         NetUtil.ipv4AddressToInt((Inet4Address) srcAddr.getAddress()),
519                         NetUtil.ipv4AddressToInt((Inet4Address) dstAddr.getAddress()));
520 
521                 EthernetPacket.writeIPv4(ethernetBuf, ipBuf);
522             } else if (srcAddr.getAddress() instanceof Inet6Address && dstAddr.getAddress() instanceof Inet6Address) {
523                 IPPacket.writeUDPv6(ipBuf, udpBuf,
524                         srcAddr.getAddress().getAddress(),
525                         dstAddr.getAddress().getAddress());
526 
527                 EthernetPacket.writeIPv6(ethernetBuf, ipBuf);
528             } else {
529                 logger.error("Source and Destination IP Address versions are not same. Source Address: {}, " +
530                         "Destination Address: {}", srcAddr.getAddress(), dstAddr.getAddress());
531                 return;
532             }
533 
534             // Write Packet into Pcap
535             pCapWriter.writePacket(pcap, ethernetBuf);
536         } catch (IOException ex) {
537             logger.error("Caught Exception While Writing Packet into Pcap", ex);
538             ctx.fireExceptionCaught(ex);
539         } finally {
540             ipBuf.release();
541             ethernetBuf.release();
542             pcap.release();
543         }
544     }
545 
546     /**
547      * Get the local address of a channel. If the address is a wildcard address ({@code 0.0.0.0} or {@code ::}), and
548      * the address family does not match that of the {@code remote}, return the wildcard address of the {@code remote}'s
549      * family instead.
550      *
551      * @param ch     The channel to get the local address from
552      * @param remote The remote address
553      * @return The fixed local address
554      */
555     private static InetSocketAddress getLocalAddress(Channel ch, InetSocketAddress remote) {
556         InetSocketAddress local = (InetSocketAddress) ch.localAddress();
557         if (remote != null && local.getAddress().isAnyLocalAddress()) {
558             if (local.getAddress() instanceof Inet4Address && remote.getAddress() instanceof Inet6Address) {
559                 return new InetSocketAddress(WildcardAddressHolder.wildcard6, local.getPort());
560             }
561             if (local.getAddress() instanceof Inet6Address && remote.getAddress() instanceof Inet4Address) {
562                 return new InetSocketAddress(WildcardAddressHolder.wildcard4, local.getPort());
563             }
564         }
565         return local;
566     }
567 
568     @Override
569     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
570 
571         // If `isTCP` is true, then we'll simulate a `FIN` flow.
572         if (channelType == ChannelType.TCP) {
573             logger.debug("Starting Fake TCP FIN+ACK Flow to close connection");
574 
575             ByteBufAllocator byteBufAllocator = ctx.alloc();
576             ByteBuf tcpBuf = byteBufAllocator.buffer();
577 
578             try {
579                 // Write FIN+ACK with Normal Source and Destination Address
580                 TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, initiatorAddr.getPort(),
581                         handlerAddr.getPort(), TCPPacket.TCPFlag.FIN, TCPPacket.TCPFlag.ACK);
582                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, byteBufAllocator, ctx);
583 
584                 // Write FIN+ACK with Reversed Source and Destination Address
585                 TCPPacket.writePacket(tcpBuf, null, receiveSegmentNumber, sendSegmentNumber, handlerAddr.getPort(),
586                         initiatorAddr.getPort(), TCPPacket.TCPFlag.FIN, TCPPacket.TCPFlag.ACK);
587                 completeTCPWrite(handlerAddr, initiatorAddr, tcpBuf, byteBufAllocator, ctx);
588 
589                 // Write ACK with Normal Source and Destination Address
590                 TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber + 1, receiveSegmentNumber + 1,
591                         initiatorAddr.getPort(), handlerAddr.getPort(), TCPPacket.TCPFlag.ACK);
592                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, byteBufAllocator, ctx);
593             } finally {
594                 tcpBuf.release();
595             }
596 
597             logger.debug("Finished Fake TCP FIN+ACK Flow to close connection");
598         }
599 
600         close();
601         super.handlerRemoved(ctx);
602     }
603 
604     @Override
605     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
606 
607         if (channelType == ChannelType.TCP) {
608             ByteBuf tcpBuf = ctx.alloc().buffer();
609 
610             try {
611                 // Write RST with Normal Source and Destination Address
612                 TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, initiatorAddr.getPort(),
613                         handlerAddr.getPort(), TCPPacket.TCPFlag.RST, TCPPacket.TCPFlag.ACK);
614                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, ctx.alloc(), ctx);
615             } finally {
616                 tcpBuf.release();
617             }
618 
619             logger.debug("Sent Fake TCP RST to close connection");
620         }
621 
622         close();
623         ctx.fireExceptionCaught(cause);
624     }
625 
626     /**
627      * Logger for TCP
628      */
629     private void logTCP(boolean isWriteOperation, int bytes, int sendSegmentNumber, int receiveSegmentNumber,
630                         InetSocketAddress srcAddr, InetSocketAddress dstAddr, boolean ackOnly) {
631         // If `ackOnly` is `true` when we don't need to write any data so we'll not
632         // log number of bytes being written and mark the operation as "TCP ACK".
633         if (logger.isDebugEnabled()) {
634             if (ackOnly) {
635                 logger.debug("Writing TCP ACK, isWriteOperation {}, Segment Number {}, Ack Number {}, Src Addr {}, "
636                         + "Dst Addr {}", isWriteOperation, sendSegmentNumber, receiveSegmentNumber, dstAddr, srcAddr);
637             } else {
638                 logger.debug("Writing TCP Data of {} Bytes, isWriteOperation {}, Segment Number {}, Ack Number {}, " +
639                                 "Src Addr {}, Dst Addr {}", bytes, isWriteOperation, sendSegmentNumber,
640                         receiveSegmentNumber, srcAddr, dstAddr);
641             }
642         }
643     }
644 
645     OutputStream outputStream() {
646         return outputStream;
647     }
648 
649     boolean sharedOutputStream() {
650         return sharedOutputStream;
651     }
652 
653     /**
654      * Returns {@code true} if the {@link PcapWriteHandler} is currently
655      * writing packets to the {@link OutputStream} else returns {@code false}.
656      */
657     public boolean isWriting() {
658         return state.get() == State.WRITING;
659     }
660 
661     State state() {
662         return state.get();
663     }
664 
665     /**
666      * Pause the {@link PcapWriteHandler} from writing packets to the {@link OutputStream}.
667      */
668     public void pause() {
669         if (!state.compareAndSet(State.WRITING, State.PAUSED)) {
670             throw new IllegalStateException("State must be 'STARTED' to pause but current state is: " + state);
671         }
672     }
673 
674     /**
675      * Resume the {@link PcapWriteHandler} to writing packets to the {@link OutputStream}.
676      */
677     public void resume() {
678         if (!state.compareAndSet(State.PAUSED, State.WRITING)) {
679             throw new IllegalStateException("State must be 'PAUSED' to resume but current state is: " + state);
680         }
681     }
682 
683     void markClosed() {
684         if (state.get() != State.CLOSED) {
685             state.set(State.CLOSED);
686         }
687     }
688 
689     // Visible for testing only.
690     PcapWriter pCapWriter() {
691         return pCapWriter;
692     }
693 
694     private void logDiscard() {
695         logger.warn("Discarding pcap write because channel type is unknown. The channel this handler is registered " +
696                 "on is not a SocketChannel or DatagramChannel, so the inference does not work. Please call " +
697                 "forceTcpChannel or forceUdpChannel before registering the handler.");
698     }
699 
700     @Override
701     public String toString() {
702         return "PcapWriteHandler{" +
703                 "captureZeroByte=" + captureZeroByte +
704                 ", writePcapGlobalHeader=" + writePcapGlobalHeader +
705                 ", sharedOutputStream=" + sharedOutputStream +
706                 ", sendSegmentNumber=" + sendSegmentNumber +
707                 ", receiveSegmentNumber=" + receiveSegmentNumber +
708                 ", channelType=" + channelType +
709                 ", initiatorAddr=" + initiatorAddr +
710                 ", handlerAddr=" + handlerAddr +
711                 ", isServerPipeline=" + isServerPipeline +
712                 ", state=" + state +
713                 '}';
714     }
715 
716     /**
717      * <p> Close {@code PcapWriter} and {@link OutputStream}. </p>
718      * <p> Note: Calling this method does not close {@link PcapWriteHandler}.
719      * Only Pcap Writes are closed. </p>
720      *
721      * @throws IOException If {@link OutputStream#close()} throws an exception
722      */
723     @Override
724     public void close() throws IOException {
725         if (state.get() == State.CLOSED) {
726             logger.debug("PcapWriterHandler is already closed");
727         } else {
728             pCapWriter.close();
729             markClosed();
730             logger.debug("PcapWriterHandler is now closed");
731         }
732     }
733 
734     private enum ChannelType {
735         TCP, UDP
736     }
737 
738     public static Builder builder() {
739         return new Builder();
740     }
741 
742     /**
743      * Builder for {@link PcapWriteHandler}
744      */
745     public static final class Builder {
746         private boolean captureZeroByte;
747         private boolean sharedOutputStream;
748         private boolean writePcapGlobalHeader = true;
749 
750         private ChannelType channelType;
751         private InetSocketAddress initiatorAddr;
752         private InetSocketAddress handlerAddr;
753         private boolean isServerPipeline;
754 
755         private Builder() {
756         }
757 
758         /**
759          * Set to {@code true} to enable capturing packets with empty (0 bytes) payload. Otherwise, if set to
760          * {@code false}, empty packets will be filtered out.
761          *
762          * @param captureZeroByte Whether to filter out empty packets.
763          * @return this builder
764          */
765         public Builder captureZeroByte(boolean captureZeroByte) {
766             this.captureZeroByte = captureZeroByte;
767             return this;
768         }
769 
770         /**
771          * Set to {@code true} if multiple {@link PcapWriteHandler} instances will be
772          * writing to the same {@link OutputStream} concurrently, and write locking is
773          * required. Otherwise, if set to {@code false}, no locking will be done.
774          * Additionally, {@link #close} will not close the underlying {@code OutputStream}.
775          * Note: it is probably an error to have both {@code writePcapGlobalHeader} and
776          * {@code sharedOutputStream} set to {@code true} at the same time.
777          *
778          * @param sharedOutputStream Whether {@link OutputStream} is shared or not
779          * @return this builder
780          */
781         public Builder sharedOutputStream(boolean sharedOutputStream) {
782             this.sharedOutputStream = sharedOutputStream;
783             return this;
784         }
785 
786         /**
787          * Set to {@code true} to write Pcap Global Header on initialization. Otherwise, if set to {@code false}, Pcap
788          * Global Header will not be written on initialization. This could when writing Pcap data on a existing file
789          * where Pcap Global Header is already present.
790          *
791          * @param writePcapGlobalHeader Whether to write the pcap global header.
792          * @return this builder
793          */
794         public Builder writePcapGlobalHeader(boolean writePcapGlobalHeader) {
795             this.writePcapGlobalHeader = writePcapGlobalHeader;
796             return this;
797         }
798 
799         /**
800          * Force this handler to write data as if they were TCP packets, with the given connection metadata. If this
801          * method isn't called, we determine the metadata from the channel.
802          *
803          * @param serverAddress The address of the TCP server (handler)
804          * @param clientAddress The address of the TCP client (initiator)
805          * @param isServerPipeline Whether the handler is part of the server channel
806          * @return this builder
807          */
808         public Builder forceTcpChannel(InetSocketAddress serverAddress, InetSocketAddress clientAddress,
809                                        boolean isServerPipeline) {
810             channelType = ChannelType.TCP;
811             handlerAddr = checkNotNull(serverAddress, "serverAddress");
812             initiatorAddr = checkNotNull(clientAddress, "clientAddress");
813             this.isServerPipeline = isServerPipeline;
814             return this;
815         }
816 
817         /**
818          * Force this handler to write data as if they were UDP packets, with the given connection metadata. If this
819          * method isn't called, we determine the metadata from the channel.
820          * <br>
821          * Note that even if this method is called, the address information on {@link DatagramPacket} takes precedence
822          * if it is present.
823          *
824          * @param localAddress  The address of the UDP local
825          * @param remoteAddress The address of the UDP remote
826          * @return this builder
827          */
828         public Builder forceUdpChannel(InetSocketAddress localAddress, InetSocketAddress remoteAddress) {
829             channelType = ChannelType.UDP;
830             handlerAddr = checkNotNull(remoteAddress, "remoteAddress");
831             initiatorAddr = checkNotNull(localAddress, "localAddress");
832             return this;
833         }
834 
835         /**
836          * Build the {@link PcapWriteHandler}.
837          *
838          * @param outputStream The output stream to write the pcap data to.
839          * @return The handler.
840          */
841         public PcapWriteHandler build(OutputStream outputStream) {
842             checkNotNull(outputStream, "outputStream");
843             return new PcapWriteHandler(this, outputStream);
844         }
845     }
846 
847     private static final class WildcardAddressHolder {
848         static final InetAddress wildcard4; // 0.0.0.0
849         static final InetAddress wildcard6; // ::
850 
851         static {
852             try {
853                 wildcard4 = InetAddress.getByAddress(new byte[4]);
854                 wildcard6 = InetAddress.getByAddress(new byte[16]);
855             } catch (UnknownHostException e) {
856                 // would only happen if the byte array was of incorrect size
857                 throw new AssertionError(e);
858             }
859         }
860     }
861 }