1 final class socketreceivebufferpool { 2 3 private static final int pool_size = 8; 4 5 @suppresswarnings("unchecked") 6 private final softreference<bytebuffer>[] pool = new softreference[pool_size]; 7 8 socketreceivebufferpool() { 9 super(); 10 } 11 12 final bytebuffer acquire(int size) { 13 final softreference<bytebuffer>[] pool = this.pool; 14 for (int i = 0; i < pool_size; i ++) { 15 softreference<bytebuffer> ref = pool[i]; 16 if (ref == null) { 17 continue; 18 } 19 20 bytebuffer buf = ref.get(); 21 if (buf == null) { 22 pool[i] = null; 23 continue; 24 } 25 26 if (buf.capacity() < size) { 27 continue; 28 } 29 30 pool[i] = null; 31 32 buf.clear(); 33 return buf; 34 } 35 36 bytebuffer buf = bytebuffer.allocatedirect(normalizecapacity(size)); 37 buf.clear(); 38 return buf; 39 } 40 41 final void release(bytebuffer buffer) { 42 final softreference<bytebuffer>[] pool = this.pool; 43 for (int i = 0; i < pool_size; i ++) { 44 softreference<bytebuffer> ref = pool[i]; 45 if (ref == null || ref.get() == null) { 46 pool[i] = new softreference<bytebuffer>(buffer); 47 return; 48 } 49 } 50 51 // pool is full - replace one 52 final int capacity = buffer.capacity(); 53 for (int i = 0; i< pool_size; i ++) { 54 softreference<bytebuffer> ref = pool[i]; 55 bytebuffer pooled = ref.get(); 56 if (pooled == null) { 57 pool[i] = null; 58 continue; 59 } 60 61 if (pooled.capacity() < capacity) { 62 pool[i] = new softreference<bytebuffer>(buffer); 63 return; 64 } 65 } 66 } 67 68 private static final int normalizecapacity(int capacity) { 69 // normalize to multiple of 1024 70 int q = capacity >>> 10; 71 int r = capacity & 1023; 72 if (r != 0) { 73 q ++; 74 } 75 return q << 10; 76 } 77 }
1 private boolean read(selectionkey k) { 2 final socketchannel ch = (socketchannel) k.channel(); 3 final niosocketchannel channel = (niosocketchannel) k.attachment(); 4 5 final receivebuffersizepredictor predictor = 6 channel.getconfig().getreceivebuffersizepredictor(); 7 final int predictedrecvbufsize = predictor.nextreceivebuffersize(); 8 9 int ret = 0; 10 int readbytes = 0; 11 boolean failure = true; 12 13 bytebuffer bb = recvbufferpool.acquire(predictedrecvbufsize); 14 15 try { 16 while ((ret = ch.read(bb)) > 0) { 17 readbytes += ret; 18 if (!bb.hasremaining()) { 19 break; 20 } 21 } 22 failure = false; 23 } catch (closedchannelexception e) { 24 // can happen, and does not need a user attention. 25 } catch (throwable t) { 26 fireexceptioncaught(channel, t); 27 } 28 29 if (readbytes > 0) { 30 bb.flip(); 31 32 final channelbufferfactory bufferfactory = 33 channel.getconfig().getbufferfactory(); 34 final channelbuffer buffer = bufferfactory.getbuffer(readbytes); 35 buffer.setbytes(0, bb); 36 buffer.writerindex(readbytes); 37 //if(buffer instanceof bigendianheapchannelbuffer){ 38 // logger2.info("buffer instanceof bigendianheapchannelbuffer."); 39 //} 40 recvbufferpool.release(bb); 41 42 // update the predi||\\||||| 43 predictor.previousreceivebuffersize(readbytes); 44 45 // fire the event. 46 firemessagereceived(channel, buffer); 47 } else { 48 recvbufferpool.release(bb); 49 } 50 51 if (ret < 0 || failure) { 52 k.cancel(); // some jdk implementations run into an infinite loop without this. 53 close(channel, succeededfuture(channel)); 54 return false; 55 } 56 57 return true; 58 }
如对本文有疑问, 点击进行留言回复!!
详解SpringBoot修改启动端口server.port的四种方式
网友评论