当前位置: 移动技术网 > IT编程>脚本编程>Go语言 > go 利用chan的阻塞机制,实现协程的开始、阻塞、返回控制器

go 利用chan的阻塞机制,实现协程的开始、阻塞、返回控制器

2019年12月02日  | 移动技术网IT编程  | 我要评论
一、使用场景 大背景是从kafka 中读取oplog进行增量处理,但是当我想发一条命令将这个增量过程阻塞,然后开始进行一次全量同步之后,在开始继续增量。 所以需要对多个协程进行控制。 二、使用知识 1. 从一个未初始化的管道读会阻塞 2.从一个关闭的管道读不会阻塞 利用两个管道和select 进行控 ...

一、使用场景

大背景是从kafka 中读取oplog进行增量处理,但是当我想发一条命令将这个增量过程阻塞,然后开始进行一次全量同步之后,在开始继续增量。

所以需要对多个协程进行控制。

二、使用知识

1. 从一个未初始化的管道读会阻塞

2.从一个关闭的管道读不会阻塞

利用两个管道和select 进行控制

三、上代码

控制器代码

package util

import (
	"errors"
	"sync"
)

const (
	//stop 停止
	stop = iota
	//start 开始
	start
	//pause 暂停
	pause
)

//control 控制器
type control struct {
	ch1  chan struct{}
	ch2  chan struct{}
	stat int64
	lock sync.rwmutex
}

var (
	//errstat 错误状态
	errstat = errors.new("stat error")
)

//newcontrol 获得一个新control
func newcontrol() *control {
	return &control{
		ch1:  make(chan struct{}),
		ch2:  nil,
		stat: start,
		lock: sync.rwmutex{},
	}
}

//stop 停止
func (c *control) stop() error {
	c.lock.lock()
	defer c.lock.unlock()
	if c.stat == start {
		c.ch2 = nil
		close(c.ch1)
		c.stat = stop
	} else if c.stat == pause {
		ch2 := c.ch2
		c.ch2 = nil
		close(c.ch1)
		close(ch2)
		c.stat = stop
	} else {
		return errstat
	}
	return nil
}

//pause 暂停
func (c *control) pause() error {
	c.lock.lock()
	defer c.lock.unlock()
	if c.stat == start {
		c.ch2 = make(chan struct{})
		close(c.ch1)
		c.stat = pause
	} else {
		return errstat
	}
	return nil
}

//start 开始
func (c *control) start() error {
	c.lock.lock()
	defer c.lock.unlock()
	if c.stat == pause {
		c.ch1 = make(chan struct{})
		close(c.ch2)
		c.stat = start
	} else {
		return errstat
	}
	return nil
}

//c 控制管道
func (c *control) c() <-chan struct{} {
	c.lock.rlock()
	defer c.lock.runlock()
	return c.ch1
}

//wait 等待
func (c *control) wait() bool {
	c.lock.rlock()
	ch2 := c.ch2
	c.lock.runlock()
	if ch2 == nil {  //通过赋值nil 发送停止推出命令
		return false
	}
	<-ch2  //会进行阻塞
	return true
}

 

使用代码

	for {
		select {
		case part, ok := <-c.partitions():
			if !ok {
				conf.logger.error("get kafka partitions not ok", regular.name)
				return
			}
			go readfrompart(c, part, regular, respchan)
		case <-regular.c():   //regular 为control 类
			if !regular.wait() {
				conf.logger.debug("stop! ")
				return
			}
			conf.logger.debug("start! ")
		}
	}

这样就可以随时随地的控制工程中的协程

regular  := util.newcontrol()
regular.pause()
regular.start()
regular.stop()

  

如您对本文有疑问或者有任何想说的,请 点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网