经典幽默笑话,师德反思,谁有网站你懂的
pipelines - .net中的新io api指引 作者 marcgravell
此系列前两篇网上已有的译文
pipelines - .net中的新io api指引(一)
pipelines - .net中的新io api指引(二)
关于system.io.pipelines的一篇说明
system.io.pipelines: .net高性能io
本篇不是翻译,边看边译边记而已。
system.io.pipelines 是对io的统一抽象,文件、com口、网络等等,重点在于让调用者注意力集中在读、写缓冲区上,典型的就是 iduplexpipe中的input output。
可以理解为将io类抽象为读、写两个缓冲区。
目前官方实现还处于preview状态,作者使用socket和networkstream 实现了一个 pipelines.sockets.unofficial
作者在前两篇中提到使用system.io.pipelines 改造stackexchange.redis,在本篇中作者采用了改造现有的simplsockets库来说明system.io.pipelines的使用。
(simplpipelines,kestrelserver )
public interface imemoryowner<t> : idisposable { memory<t> memory { get; } }
private sealed class arraypoolowner<t>:imemoryowner<t>{ private readonly int _length; private t[] _oversized; internal arraypoolowner(t[] oversized,int length){ _length=length; _oversized=oversized; } public memory<t> memory=>new memory<t>(getarray(),0,_length); private t[] getarray()=>interlocked.compareexchange(ref _oversized,null,null) ?? throw new objectdisposedexception(tostring()); public void dispose(){ var arr=interlocked.exchange(ref _oversized,null); if(arr!=null) arraypool<t>.shared.return(arr); } }
void dosomething(imemoryowner<byte> data){ using(data){ // ... other things here ... dothething(data.memory); } // ... more things here ... }
通过arraypool的借、还机制避免频繁分配。
public static imemoryowner<t> lease<t>(this readonlysequence<t> source) { if (source.isempty) return empty<t>(); int len = checked((int)source.length); var arr = arraypool<t>.shared.rent(len);//借出 source.copyto(arr); return new arraypoolowner<t>(arr, len);//dispose时归还 }
public abstract class simplpipeline : idisposable { private iduplexpipe _pipe; protected simplpipeline(iduplexpipe pipe) => _pipe = pipe; public void dispose() => close(); public void close() {/* burn the pipe*/} }
protected async valuetask writeasync(imemoryowner<byte> payload, int messageid)//调用方不再使用payload,需要我们清理 { using (payload) { await writeasync(payload.memory, messageid); } } protected valuetask writeasync(readonlymemory<byte> payload, int messageid);//调用方自己清理
messageid标识一条消息,写入消息头部, 用于之后处理响应回复信息。
private readonly semaphoreslim _singlewriter= new semaphoreslim(1); protected async valuetask writeasync(readonlymemory<byte> payload, int messageid) { await _singlewriter.waitasync(); try { writeframeheader(writer, payload.length, messageid); await writer.writeasync(payload); } finally { _singlewriter.release(); } }
protected valuetask writeasync(readonlymemory<byte> payload, int messageid) { // try to get the conch; if not, switch to async //writer已经被占用,异步 if (!_singlewriter.wait(0)) return writeasyncslowpath(payload, messageid); bool release = true; try { writeframeheader(writer, payload.length, messageid); var write = writer.writeasync(payload); if (write.iscompletedsuccessfully) return default; release = false; return awaitflushandrelease(write); } finally { if (release) _singlewriter.release(); } } async valuetask awaitflushandrelease(valuetask<flushresult> flush) { try { await flush; } finally { _singlewriter.release(); } }
三个地方
void writeframeheader(pipewriter writer, int length, int messageid) { var span = writer.getspan(8); binaryprimitives.writeint32littleendian( span, length); binaryprimitives.writeint32littleendian( span.slice(4), messageid); writer.advance(8); }
public class simplpipelineclient : simplpipeline { public async task<imemoryowner<byte>> sendreceiveasync(readonlymemory<byte> message) { var tcs = new taskcompletionsource<imemoryowner<byte>>(); int messageid; lock (_awaitingresponses) { messageid = ++_nextmessageid; if (messageid == 0) messageid = 1; _awaitingresponses.add(messageid, tcs); } await writeasync(message, messageid); return await tcs.task; } public async task<imemoryowner<byte>> sendreceiveasync(imemoryowner<byte> message) { using (message) { return await sendreceiveasync(message.memory); } } }
- _awaitingresponses 是个字典,保存已经发送的消息,用于将来处理对某条(messageid)消息的回复。
protected async task startreceiveloopasync(cancellationtoken cancellationtoken = default) { try { while (!cancellationtoken.iscancellationrequested) { var readresult = await reader.readasync(cancellationtoken); if (readresult.iscanceled) break; var buffer = readresult.buffer; var makingprogress = false; while (tryparseframe(ref buffer, out var payload, out var messageid)) { makingprogress = true; await onreceiveasync(payload, messageid); } reader.advanceto(buffer.start, buffer.end); if (!makingprogress && readresult.iscompleted) break; } try { reader.complete(); } catch { } } catch (exception ex) { try { reader.complete(ex); } catch { } } } protected abstract valuetask onreceiveasync(readonlysequence<byte> payload, int messageid);
private bool tryparseframe( ref readonlysequence<byte> input, out readonlysequence<byte> payload, out int messageid) { if (input.length < 8) { // not enough data for the header payload = default; messageid = default; return false; } int length; if (input.first.length >= 8) { // already 8 bytes in the first segment length = parseframeheader( input.first.span, out messageid); } else { // copy 8 bytes into a local span span<byte> local = stackalloc byte[8]; input.slice(0, 8).copyto(local); length = parseframeheader( local, out messageid); } // do we have the "length" bytes? if (input.length < length + 8) { payload = default; return false; } // success! payload = input.slice(8, length); input = input.slice(payload.end); return true; }
static int parseframeheader( readonlyspan<byte> input, out int messageid) { var length = binaryprimitives .readint32littleendian(input); messageid = binaryprimitives .readint32littleendian(input.slice(4)); return length; }
protected override valuetask onreceiveasync( readonlysequence<byte> payload, int messageid) { if (messageid != 0) { // request/response taskcompletionsource<imemoryowner<byte>> tcs; lock (_awaitingresponses) { if (_awaitingresponses.trygetvalue(messageid, out tcs)) { _awaitingresponses.remove(messageid); } } tcs?.trysetresult(payload.lease()); } else { // unsolicited messagereceived?.invoke(payload.lease()); } return default; }
如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复
Net Core Web Api项目与在NginX下发布的方法
asp.net core3.1 引用的元包dll版本兼容性问题解决方案
IdentityServer4实现.Net Core API接口权限认证(快速入门)
ASP.NET Core MVC通过IViewLocationExpander扩展视图搜索路径的实现
网友评论