当前位置: 移动技术网 > IT编程>脚本编程>Go语言 > [Golang] 剑走偏锋 -- IoComplete ports

[Golang] 剑走偏锋 -- IoComplete ports

2020年01月18日  | 移动技术网IT编程  | 我要评论
前言 Golang 目前的主要應用領域還是後臺微服務,雖然在業務領域也有所應用但仍然是比較小衆的選擇。大多數的服務運行環境都是linux,而在windows中golang應用更少,而作者因爲特殊情況,不得已要在widows環境中用golang去寫本地代理服務。在我的使用場景中實時性要求非常高(視頻通 ...

前言

golang 目前的主要應用領域還是後臺微服務,雖然在業務領域也有所應用但仍然是比較小衆的選擇。大多數的服務運行環境都是linux,而在windows中golang應用更少,而作者因爲特殊情況,不得已要在widows環境中用golang去寫本地代理服務。在我的使用場景中實時性要求非常高(視頻通信),對tcp數據處理要足夠快,否則會造成tcp 服務端的 receive buffer 溢出造成 packet loss,影響實時性和數據的完整性。

作者閲讀了golang 在windows 環境下 tcp 部分syscall 的實現,最終確認它的底層模型是用了完成端口(異步io模型)的。
但是由於作者本人比較喜歡折騰,所以用golang 底層的syscall 實現了一下tcp 完成端口服務。

iocompletion port

以下為windows環境下用golang實現的 iocompletion port server

iocompletionrootcontext

管理指定 port 上所有 accepted socket:

type iocompletionrootcontext struct {
    socket     windows.handle
    socketaddr windows.sockaddrinet4
    ioset      []*iocompletioncontext

    sync.mutex
}


func (root *iocompletionrootcontext) newiocontext() *iocompletioncontext {
    root.lock()
    defer root.unlock()
    res := &iocompletioncontext{
        data: make([]byte, 65535),
        overlapped: windows.overlapped{
            internal:     0,
            internalhigh: 0,
            offset:       0,
            offsethigh:   0,
            hevent:       0,
        },
    }

    res.wsabuf.buf = &res.data[0]
    res.wsabuf.len = uint32(65535)

    root.ioset = append(root.ioset, res)
    return res
}

func newrootcontext() *iocompletionrootcontext {
    return &iocompletionrootcontext{
        ioset: make([]*iocompletioncontext, 0),
    }
}

iocompletioncontext

accepted socket 的上下文:

    type iocompletioncontext struct {
        socket     windows.handle
        socketaddr windows.sockaddrinet4
        wsabuf     windows.wsabuf
        data       []byte
        optype     op_type
        overlapped windows.overlapped
    }

iocompletionserver

完成端口服務:

type iocompletionserver struct {
    addr     string
    port     int
    recvfunc func(data []byte) error
    rootctx  *iocompletionrootcontext
    // 爲了防止内存移動,采用此種方式
    accepts           sync.map
    hiocompletionport windows.handle
}


func (ss *iocompletionserver) saveiorootctx(id uint32, ctx *iocompletionrootcontext) {
    ss.accepts.store(id, ctx)
}

func (ss *iocompletionserver) loadiorootctx(id uint32) *iocompletionrootcontext {
    if id == uint32(ss.rootctx.socket) {
        return ss.rootctx
    }

    if v, isok := ss.accepts.load(id); isok {
        if res, isok := v.(*iocompletionrootcontext); isok {
            return res
        }
    }

    return nil
}

func (ss *iocompletionserver) remove(id uint32) {
    ss.accepts.delete(id)
}

func (ss *iocompletionserver) registerreceivefunc(rfunc func([]byte) error) {
    ss.recvfunc = rfunc
}

func (ss *iocompletionserver) listen() {
    dwbytestransfered := uint32(0)
    var ctxid uint32
    var overlapped *windows.overlapped
    for {
        err := windows.getqueuedcompletionstatus(ss.hiocompletionport, &dwbytestransfered,
            &ctxid, &overlapped, windows.infinite)
        if err != nil {
            fmt.printf("syscall.getqueuedcompletionstatus: %v\n", err)
        }

        if overlapped == nil {
            continue
        }

        // 通过位移取得ioctx
        ioctx := (*iocompletioncontext)(unsafe.pointer(uintptr(unsafe.pointer(overlapped)) - unsafe.offsetof(iocompletioncontext{}.overlapped)))
        switch ioctx.optype {
        case accept_posted:
            {
                ss.doacceptex(ss.loadiorootctx(ctxid), ioctx)
            }
        case recv_posted:
            {
                ss.doreceive(ss.loadiorootctx(ctxid), ioctx)
            }
        case send_posted:
        case null_posted:
        default:
        }
    }
}

func (ss *iocompletionserver) doacceptex(rootctx *iocompletionrootcontext, ioctx *iocompletioncontext) (err error) {
    nfdctx := newrootcontext()
    nfdctx.socket = ioctx.socket
    addrsize := uint32(unsafe.sizeof(windows.rawsockaddrany{}))

    var localaddr, remoteaddr *windows.rawsockaddrany
    lrsalen := int32(addrsize)
    rrsalen := int32(addrsize)

    // 與windows c++ 不同,此處函數無需去函數指針即可使用
    windows.getacceptexsockaddrs(ioctx.wsabuf.buf, ioctx.wsabuf.len-(addrsize+16)*2,
        addrsize+16, addrsize+16, &localaddr, &lrsalen, &remoteaddr, &rrsalen)

    if ss.recvfunc != nil {
        ss.recvfunc(ioctx.data[:ioctx.overlapped.internalhigh])
    }

    // 继承listen socket的属性
    err = windows.setsockopt(nfdctx.socket, windows.sol_socket, windows.so_update_accept_context,
        (*byte)(unsafe.pointer(&ss.rootctx.socket)), int32(unsafe.sizeof(ss.rootctx.socket)))
    if err != nil {
        return errors.wrap(err, "syscall.acceptex")
    }

    err = windows.setsockoptint(nfdctx.socket, windows.sol_socket, windows.so_rcvbuf, 65535)
    if err != nil {
        return errors.wrap(err, "windows.setsockoptint")
    }

    // 綁定到完成端口, 此步驟很關鍵
    handle, err := windows.createiocompletionport(nfdctx.socket,
        ss.hiocompletionport, uint32(nfdctx.socket), 0)
    if err != nil {
        return errors.wrap(err, "syscall.createiocompletionport")
    } else {
        fmt.println(handle, rootctx.socket)
    }

    // 投遞接收請求, 此處可以自行修改
    for i := 0; i < 16; i++ {
        nfdioctx := nfdctx.newiocontext()
        nfdioctx.socket = nfdctx.socket
        if err = ss.receive(nfdioctx); err != nil {
            return err
        }
    }

    //投遞接收連接請求
    if err = ss.acceptex(ioctx); err != nil {
        return err
    }

    // 保存到context中
    ss.saveiorootctx(uint32(nfdctx.socket), nfdctx)
    return nil
}

func (ss *iocompletionserver) acceptex(ctx *iocompletioncontext) (err error) {
    ctx.socket = windows.handle(c.mwsasocket())

    dwbytes := uint32(0)
    addrsize := uint32(unsafe.sizeof(windows.rawsockaddrany{}))
    ctx.optype = accept_posted
    //err = syscall.acceptex(ss.rootctx.socket, ctx.socket, ctx.wsabuf.buf,
    //  ctx.wsabuf.len-2*(addrsize+16), addrsize+16,
    //  addrsize+16, &dwbytes, &ctx.overlapped)

    //windows.wsaioctl(ss.rootctx.socket, windows.sio_get_extension_function_pointer)
    err = windows.acceptex(ss.rootctx.socket, ctx.socket, ctx.wsabuf.buf,
        ctx.wsabuf.len-2*(addrsize+16), addrsize+16,
        addrsize+16, &dwbytes, &ctx.overlapped)
    if err != nil {
        if err == windows.errno(997) { // error_io_pending 表示尚未接收到鏈接
            err = nil
        } else {
            err = errors.wrap(err, "syscall.acceptex")
        }
    }

    return err
}

func (ss *iocompletionserver) doreceive(rootctx *iocompletionrootcontext, ctx *iocompletioncontext) {
    if ctx.overlapped.internalhigh == 0 {
        if rootctx != nil {
            ss.remove(uint32(rootctx.socket))
            c.mclose(c.int(rootctx.socket))
        }
        return
    }

    if ss.recvfunc != nil {
        ss.recvfunc(ctx.data[:ctx.overlapped.internalhigh])
    }

    ss.receive(ctx)
}

func (ss *iocompletionserver) receive(ioctx *iocompletioncontext) error {
    recv := uint32(0)
    flags := uint32(0)
    ioctx.optype = recv_posted

    err := windows.wsarecv(ioctx.socket, &ioctx.wsabuf,
        1, &recv, &flags, &ioctx.overlapped, nil)
    if err != nil {
        if err == windows.errno(997) { // error_io_pending 表示尚未接收到數據
            err = nil
        } else {
            err = errors.wrap(err, "syscall.acceptex")
        }
    }

    return err
}

func setdefaultsockopt(handle windows.handle) error {
    err := windows.setsockoptint(handle, windows.sol_socket, windows.so_reuseaddr, 1)
    if err != nil {
        return errors.wrap(err, "syscall.setsockoptint")
    }

    //err = windows.setsockoptint(handle, windows.sol_socket, windows.so, 1)
    //if err != nil {
    //  return errors.wrap(err, "syscall.setsockoptint")
    //}

    return nil
}

func (ss *iocompletionserver) start() error {
    fmt.println(windows.wsastartup(2, &windows.wsadata{}))

    // 初始創建一個用於綁定的 listen socket 的 iocompletion 句柄
    hiocompletionport, err := windows.createiocompletionport(windows.invalidhandle, 0, 0, 0)
    if err != nil {
        return errors.wrap(err, "syscall.createiocompletionport")
    }

    ss.hiocompletionport = hiocompletionport

    rootctx := newrootcontext()
    rootctx.socket = windows.handle(c.mwsasocket())
    setdefaultsockopt(rootctx.socket)
    ss.rootctx = rootctx

    handle, err := windows.createiocompletionport(rootctx.socket,
        hiocompletionport, uint32(ss.rootctx.socket), 0)
    if err != nil {
        return errors.wrap(err, "syscall.createiocompletionport")
    } else {
        fmt.println(handle, rootctx.socket)
    }

    sockaddr := windows.sockaddrinet4{}
    sockaddr.port = ss.port

    if err := windows.bind(rootctx.socket, &sockaddr); err != nil {
        return errors.wrap(err, "syscall.bind")
    }

    if err := windows.listen(rootctx.socket, max_post_accept); err != nil {
        return errors.wrap(err, "windows.listen")
    }

    ss.rootctx = rootctx

    if err := ss.acceptex(rootctx.newiocontext()); err != nil {
        return err
    }
    return nil
}

example

完成端口服務使用示例:

ss = &streamserver{
    addr: "127.0.0.1:10050",
    port: 10050,
    accepts: sync.map{},
}

ss.registerreceivefunc(func(data []byte) error {
    fmt.println("receive data len:", string(data))
    return nil
})

// 可以啓動多個携程來接收請求,但是需要特別注意的是
// 多携程可能會導致接受數據包時序發生亂序
ss.listen()

結尾

以上代碼經過實際測試檢驗,可以正常使用,尚未與標準庫進行 效率\性能 對比,沒有實現 send 功能,此處需要提醒的是,使用 iocompletion port 發送數據要注意時序的把握。

iocompletion port 是windows 系統中十分優秀的io模型, 深入瞭解其工作機制及原理, 也有助於我們對操作系統 io 數據處理的機制有更清晰的認知。

參考

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网