查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.unix.Buffer;
19  import io.netty.util.internal.PlatformDependent;
20  
21  import java.nio.ByteBuffer;
22  
23  /**
24   * Represents an array of kevent structures, backed by offheap memory.
25   *
26   * struct kevent {
27   *  uintptr_t ident;
28   *  short     keventFilter;
29   *  u_short   flags;
30   *  u_int     fflags;
31   *  intptr_t  data;
32   *  void      *udata;
33   * };
34   */
35  final class KQueueEventArray {
36      private static final int KQUEUE_EVENT_SIZE = Native.sizeofKEvent();
37      private static final int KQUEUE_IDENT_OFFSET = Native.offsetofKEventIdent();
38      private static final int KQUEUE_FILTER_OFFSET = Native.offsetofKEventFilter();
39      private static final int KQUEUE_FFLAGS_OFFSET = Native.offsetofKEventFFlags();
40      private static final int KQUEUE_FLAGS_OFFSET = Native.offsetofKEventFlags();
41      private static final int KQUEUE_DATA_OFFSET = Native.offsetofKeventData();
42  
43      private ByteBuffer memory;
44      private long memoryAddress;
45      private int size;
46      private int capacity;
47  
48      KQueueEventArray(int capacity) {
49          if (capacity < 1) {
50              throw new IllegalArgumentException("capacity must be >= 1 but was " + capacity);
51          }
52          memory = Buffer.allocateDirectWithNativeOrder(calculateBufferCapacity(capacity));
53          memoryAddress = Buffer.memoryAddress(memory);
54          this.capacity = capacity;
55      }
56  
57      /**
58       * Return the {@code memoryAddress} which points to the start of this {@link KQueueEventArray}.
59       */
60      long memoryAddress() {
61          return memoryAddress;
62      }
63  
64      /**
65       * Return the capacity of the {@link KQueueEventArray} which represent the maximum number of {@code kevent}s
66       * that can be stored in it.
67       */
68      int capacity() {
69          return capacity;
70      }
71  
72      int size() {
73          return size;
74      }
75  
76      void clear() {
77          size = 0;
78      }
79  
80      void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
81          reallocIfNeeded();
82          evSet(getKEventOffset(size++) + memoryAddress, ch.socket.intValue(), filter, flags, fflags);
83      }
84  
85      private void reallocIfNeeded() {
86          if (size == capacity) {
87              realloc(true);
88          }
89      }
90  
91      /**
92       * Increase the storage of this {@link KQueueEventArray}.
93       */
94      void realloc(boolean throwIfFail) {
95          // Double the capacity while it is "sufficiently small", and otherwise increase by 50%.
96          int newLength = capacity <= 65536 ? capacity << 1 : capacity + capacity >> 1;
97  
98          try {
99              ByteBuffer buffer = Buffer.allocateDirectWithNativeOrder(calculateBufferCapacity(newLength));
100             // Copy over the old content of the memory and reset the position as we always act on the buffer as if
101             // the position was never increased.
102             memory.position(0).limit(size);
103             buffer.put(memory);
104             buffer.position(0);
105 
106             Buffer.free(memory);
107             memory = buffer;
108             memoryAddress = Buffer.memoryAddress(buffer);
109         } catch (OutOfMemoryError e) {
110             if (throwIfFail) {
111                 OutOfMemoryError error = new OutOfMemoryError(
112                         "unable to allocate " + newLength + " new bytes! Existing capacity is: " + capacity);
113                 error.initCause(e);
114                 throw error;
115             }
116         }
117     }
118 
119     /**
120      * Free this {@link KQueueEventArray}. Any usage after calling this method may segfault the JVM!
121      */
122     void free() {
123         Buffer.free(memory);
124         memoryAddress = size = capacity = 0;
125     }
126 
127     private static int getKEventOffset(int index) {
128         return index * KQUEUE_EVENT_SIZE;
129     }
130 
131     private long getKEventOffsetAddress(int index) {
132         return getKEventOffset(index) + memoryAddress;
133     }
134 
135     private short getShort(int index, int offset) {
136         if (PlatformDependent.hasUnsafe()) {
137             return PlatformDependent.getShort(getKEventOffsetAddress(index) + offset);
138         }
139         return memory.getShort(getKEventOffset(index) + offset);
140     }
141 
142     short flags(int index) {
143         return getShort(index, KQUEUE_FLAGS_OFFSET);
144     }
145 
146     short filter(int index) {
147         return getShort(index, KQUEUE_FILTER_OFFSET);
148     }
149 
150     short fflags(int index) {
151         return getShort(index, KQUEUE_FFLAGS_OFFSET);
152     }
153 
154     int fd(int index) {
155         if (PlatformDependent.hasUnsafe()) {
156             return PlatformDependent.getInt(getKEventOffsetAddress(index) + KQUEUE_IDENT_OFFSET);
157         }
158         return memory.getInt(getKEventOffset(index) + KQUEUE_IDENT_OFFSET);
159     }
160 
161     long data(int index) {
162         if (PlatformDependent.hasUnsafe()) {
163             return PlatformDependent.getLong(getKEventOffsetAddress(index) + KQUEUE_DATA_OFFSET);
164         }
165         return memory.getLong(getKEventOffset(index) + KQUEUE_DATA_OFFSET);
166     }
167 
168     private static int calculateBufferCapacity(int capacity) {
169         return capacity * KQUEUE_EVENT_SIZE;
170     }
171 
172     private static native void evSet(long keventAddress, int ident, short filter, short flags, int fflags);
173 }