当前位置: 移动技术网 > IT编程>脚本编程>Go语言 > 使用Golang简单实现七牛图片处理API

使用Golang简单实现七牛图片处理API

2017年12月08日  | 移动技术网IT编程  | 我要评论

之前一直在用qiniu的存储服务,生成图片的缩略图,模糊图,视频的webp,现在需要把存储移到s3上,那么这些图片,视频处理就要自己动手写了,本文梳理一下大致的思路。

分析需求

先看一下qiniu的接口是如何处理图片的,例如先截取视频第一秒的图片,再把图片缩略,最后存储到一个新的key,命令可以这么写 vframe/jpg/offset/1|imagemogr2/thumbnail/400x|saveas/xxx, 可以看到三个操作之间用 | 符号分割,类似unix 的 pipe 操作。

上面的操作算作一个cmd, 一次api请求可以同时处理多个cmd,cmd之间用分号分割, 处理完毕后,在回调中把处理结果返回,例如

复制代码 代码如下:

{
    "id": "xxxxx",
    "pipeline": "xxx",
    "code": 0,
    "desc": "the fop was completed successfully",
    "reqid": "xtsaafnxubr5j10u",
    "inputbucket": "xxx",
    "inputkey": "xxxxx",
    "items": [
        {
            "cmd": "vframe/jpg/offset/1|imagemogr2/thumbnail/400x|saveas/zmftzs1wcml2yxrlom1vbwvudc9jb3zlci9zbmfwl3zpzgvvl2m5yzdjzjq5ltu3ngqtngzjms1izdfkltrkyjzkmzlkzwy1ni8wlza=",
            "code": 0,
            "desc": "the fop was completed successfully",
            "hash": "fhdn6v8ei4vw4xjgalsfxutvmeiv",
            "key": "xx",
            "returnold": 0
        },
        {
            "cmd": "vframe/jpg/offset/1|imagemogr2/thumbnail/400x|imagemogr2/blur/45x8|saveas/zmftzs1wcml2yxrlom1vbwvudc9jb3zlci9zbmfwl3zpzgvvl2m5yzdjzjq5ltu3ngqtngzjms1izdfkltrkyjzkmzlkzwy1ni8wlzbfymx1cg==",
            "code": 0,
            "desc": "the fop was completed successfully",
            "hash": "fgnirzrcsa7tzx1xvsb_4d5tiak3",
            "key": "xxx",
            "returnold": 0
        }
    ]
}

分解需求

这个程序大致需要这么几个部分:

一个http接口,接受任务,接受后把任务扔到队列,返回一个job id。 worker异步处理任务,worker的个数 和 每个worker 并行的处理的个数 能够配置,worker有重试机制。
从 job payload 中解析出需要做的任务,解析出每个cmd, 最好能并行执行每一个 cmd, 记录每一个cmd的结果

每个cmd中有多个 operation, 并且用 pipe 连接,前一个operaion的输出是后一个operation的输入

可以把 1 和 2,3 分开来看,1 比较独立,之前写过一个worker的模型,参考的是这篇文章 handling 1 million requests per minute with go,比较详细,是用 go channel 作为queue的,我加了一个 beanstalk 作为 queue的 providor。还有一点改进是,文章中只提供了worker数量的设置,我再加了一个参数,设定每个worker可以并行执行的协程数。所以下面主要讲讲3, 2的解决办法

pipe

可以参考这个库 pipe, 用法如下:

复制代码 代码如下:

p := pipe.line(
    pipe.readfile("test.png"),
    resize(300, 300),
    blur(0.5),
)

output, err := pipe.combinedoutput(p)
if err != nil {
    fmt.printf("%v\n", err)
}

buf := bytes.newbuffer(output)
img, _ := imaging.decode(buf)

imaging.save(img, "test_a.png")

还是比较方便的,建一个 cmd struct, 利用正则匹配一下每个 operation 的参数,放入一个 []op slice, 最后执行,struct和方法如下:

复制代码 代码如下:

type cmd struct {
    cmd    string
    saveas string
    ops    []op
    err    error
}

type op interface {
    getpipe() pipe.pipe
}

type resizeop struct {
    width, height int
}

func (c resizeop) getpipe() pipe.pipe {
    return resize(c.width, c.height)
}

//使用方法
cmdstr := `file/test.png|thumbnail/x300|blur/20x8`
cmd := cmd{cmdstr, "test_b.png", nil, nil}

cmd.parse()
cmd.doops()
sync.waitgroup

单个cmd处理解决后,就是多个cmd的并行问题,没啥好想的,直接用 sync.waitgroup 就可以完美解决。一步一步来,我们先看看这个struct的使用方法:

复制代码 代码如下:

func main() {
    cmds := []string{}
    for i := 0; i < 10000; i++ {
        cmds = append(cmds, fmt.sprintf("cmd-%d", i))
    }

    results := handlecmds(cmds)

    fmt.println(len(results)) // 10000
}

func docmd(cmd string) string {
    return fmt.sprintf("cmd=%s", cmd)
}

func handlecmds(cmds []string) (results []string) {
    fmt.println(len(cmds)) //10000
    var count uint64

    group := sync.waitgroup{}
    lock := sync.mutex{}
    for _, item := range cmds {
        // 计数加一
        group.add(1)
        go func(cmd string) {
            result := docmd(cmd)
            atomic.adduint64(&count, 1)

            lock.lock()
            results = append(results, result)
            lock.unlock()
           
            // 计数减一
            group.done()
        }(item)
    }

    // 阻塞
    group.wait()

    fmt.printf("count=%d \n", count) // 10000
    return
}

group本质大概是一个计数器,计数 > 0时, group.wait() 会阻塞,直到 计数 == 0. 这里还有一点要注意,就是 results = append(results, result) 的操作是线程不安全的,清楚这里 results 是共享的,需要加锁来保证同步,否则最后 len(results) 不为 10000。

我们建一个benchcmd, 来存放 cmds. 如下:

复制代码 代码如下:

type benchcmd struct {
    cmds      []cmd
    waitgroup sync.waitgroup
    errs      []error
    lock      sync.mutex
}

func (b *benchcmd) docmds() {
    for _, item := range b.cmds {
        b.waitgroup.add(1)

        go func(cmd cmd) {
            cmd.parse()
            err := cmd.doops()

            b.lock.lock()
            b.errs = append(b.errs, err)
            b.lock.unlock()

            b.waitgroup.done()
        }(item)
    }

    b.waitgroup.wait()
}

最后的调用就像这样:

复制代码 代码如下:

var cmds []cmd
cmd_a := cmd{`file/test.png|thumbnail/x300|blur/20x8`, "test_a.png", nil, nil}
cmd_b := cmd{`file/test.png|thumbnail/500x1000|blur/20x108`, "test_b.png", nil, nil}
cmd_c := cmd{`file/test.png|thumbnail/300x300`, "test_c.png", nil, nil}

cmds = append(cmds, cmd_a)
cmds = append(cmds, cmd_b)
cmds = append(cmds, cmd_c)

bench := benchcmd{
    cmds:      cmds,
    waitgroup: sync.waitgroup{},
    lock:      sync.mutex{},
}

bench.docmds()

fmt.println(bench.errs)

这只是一个初级的实验,思考还不够全面,并且只是模仿api,qiniu应该不是这么做的,耦合更低,可能各个cmd都有各自处理的集群,那pipe这个库就暂时没法解决了,目前的局限在于 每个cmd必须都在一个进程中。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网