首页 > 其他 > 详细

Go 并发示例-Pool

时间:2021-01-25 16:16:04      阅读:25      评论:0      收藏:0      [点我收藏+]

缓冲的通道实现一个资源池,这个资源池可以管理在任意多个goroutine之间共享的资源,比如网络连接、数据库连接等,我们在数据库操作的时候,比较常见的就是数据连接池,也可以基于我们实现的资源池来实现

资源池实现代码:

package demo4

import (
	"errors"
	"io"
	"log"
	"sync"
)

type Pool struct {
	m sync.Mutex
	res chan io.Closer
	factory func()(io.Closer,error)
	closed bool
}

var ErrPoolClosed = errors.New("资源池已经被关闭")


//创建一个资源池
func New(fn func()(io.Closer,error),size int) (*Pool,error) {
	if size <=0{
		return nil,errors.New("size值太小了")
	}
	return &Pool{
		factory: fn,
		res: make(chan io.Closer,size),
	},nil
}

//从资源池里获取一个资源

func (p *Pool) Acquire() (io.Closer,error)  {
	select {
	case r,ok :=<-p.res:
		log.Println("Acquire:共享资源...")
		if !ok{
			return nil,ErrPoolClosed
		}
		return r,nil
	default:
		log.Println("Acquire:新生成资源...")
		return p.factory()
	}
}

//关闭资源池,释放资源
func (p *Pool) Close()  {
	p.m.Lock()
	defer p.m.Unlock()

	if p.closed {
		return
	}
	p.closed = true
	//关闭通道,不让写入了
	close(p.res)
	//关闭通道里的资源
	for r := range p.res{
		r.Close()
	}

}

func (p *Pool) Release(r io.Closer)  {
	p.m.Lock()
	defer p.m.Unlock()

	if p.closed{
		r.Close()
		return
	}
	select {
	case p.res <-r:
		log.Println("资源释放到池子里面了...")
	default:
		log.Println("资源池满了,释放这个资源吧")
		r.Close()
	}
}

  

看如何使用这个资源池,模拟一个数据库连接池

 

package main

import (
	"io"
	"log"
	"math/rand"
	"newDemo/channel_pool/demo4"
	"sync"
	"sync/atomic"
	"time"
)


const(
	//模拟的最大goroutine
	maxGoroutine = 5
	//资源池的大小
	poolRes = 2
)
var idCounter int32

type dbConnection struct {
	ID int32
}

func (db *dbConnection) Close() error {
	log.Println("关闭连接:",db.ID)
	return nil
}

func createConnection() (io.Closer,error) {
	//并发安全,给数据库连接生成一个唯一标志
	id := atomic.AddInt32(&idCounter,1)
	return &dbConnection{id},nil
}

func dbQuery(query int,pool *demo4.Pool)  {
	conn,err := pool.Acquire()
	if err !=nil{
		log.Println(err)
		return
	}
	defer pool.Release(conn)

	//模拟查询
	time.Sleep(time.Duration(rand.Intn(1000))*time.Microsecond)
	log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.(*dbConnection).ID)

}

func main() {
	var wg sync.WaitGroup
	wg.Add(maxGoroutine)

	p,err := demo4.New(createConnection,poolRes)
	if err !=nil{
		log.Println(err)
		return
	}
	for query := 0;query <maxGoroutine;query ++{
		go func(q int) {
			dbQuery(q,p)
			wg.Done()
		}(query)
	}

	wg.Wait()
	log.Println("开始关闭资源池")
	p.Close()
}

  

首先定义了一个结构体dbConnection,它只有一个字段,用来做唯一标记。然后dbConnection实现了io.Closer接口,这样才可以使用我们的资源池。

createConnection函数对应的是资源池中的factory字段,用来创建数据库连接dbConnection的,同时为其赋予了一个为止的标志。

接着我们就同时开了5个goroutine,模拟并发的数据库查询dbQuery,查询方法里,先从资源池获取可用的数据库连接,用完后再释放。

这里我们会创建5个数据库连接,但是我们设置的资源池大小只有2,所以再释放了2个连接后,后面的3个连接会因为资源池满了而释放不了,一会我们看下输出的打印信息就可以看到。

最后这个资源连接池使用完之后,我们要关闭资源池,使用资源池的Close方法即可。

 

运行结果如下:

技术分享图片

 

Go 并发示例-Pool

原文:https://www.cnblogs.com/pebblecome/p/14325514.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!