Golang - bufferd channel - pool 池
package main
import (
"fmt"
"sync"
"time"
)
type Pool struct {
queue chan int
wg *sync.WaitGroup
}
func NewPool(size int) *Pool {
if size <= 0 {
size = 1
}
return &Pool{queue: make(chan int, size), wg: &sync.WaitGroup{}}
}
func (this *Pool) Add(size int) {
for i := 0; i < size; i++ { // size > 0
this.queue <- 1
}
for i := 0; i > size; i-- { // size < 0
<-this.queue
}
this.wg.Add(size)
}
func (this *Pool) Done() {
<-this.queue
this.wg.Done()
}
func (this *Pool) Wait() {
this.wg.Wait()
}
func main() {
pool := NewPool(15)
for i := 1; i <= 500; i++ {
pool.Add(1)
go func(i int) {
defer pool.Done()
fmt.Println(i)
time.Sleep(time.Second)
}(i)
}
pool.Wait()
}package main
// ---------------Worker---------------
type Worker struct {
IsAvailable bool
}
// 创建
func InitWorker() *Worker {
return &Worker{}
}
// 执行
func (w *Worker)Process(){
}
// 销毁worker
func (w *Worker)Destory(){
// 消除之前占用的某一些资源
}
// --------------Pool---------------
type Pool struct {
availableWorker chan *Worker
}
// 创建Pool
func NewPool(maxsize int)Pool{
return Pool{
availableWorker:make(chan *Worker,maxsize), // 限制 bufferd chan 最大缓存数
}
}
// 获取worker
func (p *Pool) get()(w *Worker){
select{
case w = <-p.availableWorker: // 有可用的worker
default:
// 无可用的worker,创建(实在没船就创造船)
w = InitWorker()
}
return
}
// 回收worker
func (p *Pool) returnBack(w *Worker){
// worker 是否还可用(飞船是否还可用)
if !w.IsAvailable {
// 不可用,销毁丢弃
w.Destory()
return
}
select {
case p.availableWorker<-w: // 如果有位置,则缓存(停飞船喽)
default:
// 没位置,销毁丢弃
w.Destory()
}
}
// 销毁所有worker(不要重复昭君的惨剧)
func (p *Pool) destroyAllWorker(){
for {
select{
case w:= <-p.availableWorker:
w.Destory()
default:
return
}
}
}
// -------------- Let's go -----------------
func main() {
pool := NewPool(50) // 最大缓存50个worker(50个飞船的停飞场)
defer pool.destroyAllWorker() // 如果不想被追到街上打的话,最后记得销毁所有的工作者
for {
worker := pool.get() // 获取worker
worker.Process() // 使用worker
pool.returnBack(worker) // 归还worker
}
// 友情提示:该代码接近伪码,不知直接运行
}最后更新于