database/sql 中接口的层级关系https://draveness.me/golang/docs/part4-advanced/ch09-stdlib/golang-database-sql/
database/sql源码地址:
https://github.com/golang/go/tree/release-branch.go1.17/src/database/sqlgo-zero数据库连接池源码地址
https://github.com/zeromicro/go-zero/blob/master/core/stores/sqlx/sqlmanager.go
滑动验证页面
一.go zero 中数据库连接池
gozero默认 最大空闲连接数64个,最大连接数64个,连接空闲时间 一分钟;
const (maxIdleConns = 64maxOpenConns = 64maxLifetime = time.Minute
)
新建连接
func newDBConnection(driverName, datasource string) (*sql.DB, error) {conn, err := sql.Open(driverName, datasource)if err != nil {return nil, err}// we need to do this until the issue https://github.com/golang/go/issues/9851 get fixed// discussed here https://github.com/go-sql-driver/mysql/issues/257// if the discussed SetMaxIdleTimeout methods added, we'll change this behavior// 8 means we can't have more than 8 goroutines to concurrently access the same database.conn.SetMaxIdleConns(maxIdleConns)conn.SetMaxOpenConns(maxOpenConns)conn.SetConnMaxLifetime(maxLifetime)if err := conn.Ping(); err != nil {_ = conn.Close()return nil, err}return conn, nil
}
二 数据库句柄DB结构
- sql包自动创建和释放连接,并且是线程安;
- 它还维护空闲连接的空闲池,能自动创建和释放连接。
- 调用DB.Begin返回的Tx绑定到单个连接。一旦提交或对事务调用回滚,该事务的连接返回到DB的空闲连接池。
- 连接池池大小可以使用SetMaxIdleConns进行控制。
type DB struct {// Total time waited for new connections.waitDuration atomic.Int64connector driver.Connector// numClosed is an atomic counter which represents a total number of// closed connections. Stmt.openStmt checks it before cleaning closed// connections in Stmt.css.numClosed atomic.Uint64mu sync.Mutex // protects following fieldsfreeConn []*driverConn // free connections ordered by returnedAt oldest to newestconnRequests map[uint64]chan connRequestnextRequest uint64 // Next key to use in connRequests.numOpen int // number of opened and pending open connections// Used to signal the need for new connections// a goroutine running connectionOpener() reads on this chan and// maybeOpenNewConnections sends on the chan (one send per needed connection)// It is closed during db.Close(). The close tells the connectionOpener// goroutine to exit.openerCh chan struct{}closed booldep map[finalCloser]depSetlastPut map[*driverConn]string // stacktrace of last conn's put; debug onlymaxIdleCount int // zero means defaultMaxIdleConns; negative means 0maxOpen int // <= 0 means unlimitedmaxLifetime time.Duration // maximum amount of time a connection may be reusedmaxIdleTime time.Duration // maximum amount of time a connection may be idle before being closedcleanerCh chan struct{}waitCount int64 // Total number of connections waited for.maxIdleClosed int64 // Total number of connections closed due to idle count.maxIdleTimeClosed int64 // Total number of connections closed due to idle time.maxLifetimeClosed int64 // Total number of connections closed due to max connection lifetime limit.stop func() // stop cancels the connection opener.
}
三.获取数据库句柄
sql.Open 接收 driverName 和 dataSourceName 作为入参,前者用于在全局 driver map 中查找对应的驱动实现,
func Open(driverName, dataSourceName string) (*DB, error) {driversMu.RLock()driveri, ok := drivers[driverName]driversMu.RUnlock()if !ok {return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)}if driverCtx, ok := driveri.(driver.DriverContext); ok {connector, err := driverCtx.OpenConnector(dataSourceName)if err != nil {return nil, err}return OpenDB(connector), nil}return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}
2.1 driver map
sql.OpenDBdriveri map 中存储数据库对应的驱动实现,通过Register 函数来写入的 。
var (driversMu sync.RWMutexdrivers = make(map[string]driver.Driver)
)// nowFunc returns the current time; it's overridden in tests.
var nowFunc = time.Now// Register makes a database driver available by the provided name.
// If Register is called twice with the same name or if driver is nil,
// it panics.
func Register(name string, driver driver.Driver) {driversMu.Lock()defer driversMu.Unlock()if driver == nil {panic("sql: Register driver is nil")}if _, dup := drivers[name]; dup {panic("sql: Register called twice for driver " + name)}drivers[name] = driver
}
2.2 sql.OpenDB
sql.OpenDB 做的事情很简单,只验证参数,不会创建数据库连接。要验证数据源名称是否有效,请调用ping。
再另起一个 goroutine 调用 sql.DB 的 connectionOpener 方法后就结束。
方法结束,返回DB,返回的DB对于多个goroutine并发使用是安全的,并且维护其自己的空闲连接池。
func OpenDB(c driver.Connector) *DB {ctx, cancel := context.WithCancel(context.Background())db := &DB{connector: c,openerCh: make(chan struct{}, connectionRequestQueueSize),lastPut: make(map[*driverConn]string),connRequests: make(map[uint64]chan connRequest),stop: cancel,}go db.connectionOpener(ctx)return db
}
四.获取连接
sql.DB 的连接是延迟建立的,需要用到连接时才会去创建一条连接。通常是通过 sql.DB数据库交互的时候才会创建连接,这里的交互指的是pingContext 、queryContext 、exexContext。
func (db *DB) Ping() error {return db.PingContext(context.Background())
}
3.1 PingContext
func (db *DB) PingContext(ctx context.Context) error {var dc *driverConnvar err errorerr = db.retry(func(strategy connReuseStrategy) error {dc, err = db.conn(ctx, strategy)return err})if err != nil {return err}return db.pingDC(ctx, dc, dc.releaseConn)
}
先最多尝试 maxBadConnRetries 次以 cachedOrNewConn 这个策略调用一个非导出函数,如果均失败且失败原因是 driver.ErrBadConn,那么尝试以 alwaysNewConn 这个策略调用同样的函数。
const maxBadConnRetries = 2func (db *DB) retry(fn func(strategy connReuseStrategy) error) error {for i := int64(0); i < maxBadConnRetries; i++ {err := fn(cachedOrNewConn)// retry if err is driver.ErrBadConnif err == nil || !errors.Is(err, driver.ErrBadConn) {return err}}return fn(alwaysNewConn)
}
连接成功后,会设置连接的空闲时间,并把连接放入空闲连接数组(DB.freeConn)。
五.将连接放回连接池
当 DB.PingDC 结束时,releaseConn 就会被调用,而这个方法的逻辑很简单,仅仅只是调用DB.putConn方法。
func (dc *driverConn) releaseConn(err error) {dc.db.putConn(dc, err, true)
}
// putConn adds a connection to the db's free pool.
// err is optionally the last error that occurred on this connection.
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {if !errors.Is(err, driver.ErrBadConn) {if !dc.validateConnection(resetSession) {err = driver.ErrBadConn}}db.mu.Lock()if !dc.inUse {db.mu.Unlock()if debugGetPut {fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])}panic("sql: connection returned that was never out")}if !errors.Is(err, driver.ErrBadConn) && dc.expired(db.maxLifetime) {db.maxLifetimeClosed++err = driver.ErrBadConn}if debugGetPut {db.lastPut[dc] = stack()}dc.inUse = falsedc.returnedAt = nowFunc()for _, fn := range dc.onPut {fn()}dc.onPut = nilif errors.Is(err, driver.ErrBadConn) {// Don't reuse bad connections.// Since the conn is considered bad and is being discarded, treat it// as closed. Don't decrement the open count here, finalClose will// take care of that.db.maybeOpenNewConnections()db.mu.Unlock()dc.Close()return}if putConnHook != nil {putConnHook(db, dc)}added := db.putConnDBLocked(dc, nil)db.mu.Unlock()if !added {dc.Close()return}
}
六.新建连接 DB.conn
// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {db.mu.Lock()if db.closed {db.mu.Unlock()return nil, errDBClosed}// Check if the context is expired.select {default:case <-ctx.Done():db.mu.Unlock()return nil, ctx.Err()}lifetime := db.maxLifetime// Prefer a free connection, if possible.last := len(db.freeConn) - 1if strategy == cachedOrNewConn && last >= 0 {// Reuse the lowest idle time connection so we can close// connections which remain idle as soon as possible.conn := db.freeConn[last]db.freeConn = db.freeConn[:last]conn.inUse = trueif conn.expired(lifetime) {db.maxLifetimeClosed++db.mu.Unlock()conn.Close()return nil, driver.ErrBadConn}db.mu.Unlock()// Reset the session if required.if err := conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {conn.Close()return nil, err}return conn, nil}// Out of free connections or we were asked not to use one. If we're not// allowed to open any more connections, make a request and wait.if db.maxOpen > 0 && db.numOpen >= db.maxOpen {// Make the connRequest channel. It's buffered so that the// connectionOpener doesn't block while waiting for the req to be read.req := make(chan connRequest, 1)reqKey := db.nextRequestKeyLocked()db.connRequests[reqKey] = reqdb.waitCount++db.mu.Unlock()waitStart := nowFunc()// Timeout the connection request with the context.select {case <-ctx.Done():// Remove the connection request and ensure no value has been sent// on it after removing.db.mu.Lock()delete(db.connRequests, reqKey)db.mu.Unlock()db.waitDuration.Add(int64(time.Since(waitStart)))select {default:case ret, ok := <-req:if ok && ret.conn != nil {db.putConn(ret.conn, ret.err, false)}}return nil, ctx.Err()case ret, ok := <-req:db.waitDuration.Add(int64(time.Since(waitStart)))if !ok {return nil, errDBClosed}// Only check if the connection is expired if the strategy is cachedOrNewConns.// If we require a new connection, just re-use the connection without looking// at the expiry time. If it is expired, it will be checked when it is placed// back into the connection pool.// This prioritizes giving a valid connection to a client over the exact connection// lifetime, which could expire exactly after this point anyway.if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {db.mu.Lock()db.maxLifetimeClosed++db.mu.Unlock()ret.conn.Close()return nil, driver.ErrBadConn}if ret.conn == nil {return nil, ret.err}// Reset the session if required.if err := ret.conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {ret.conn.Close()return nil, err}return ret.conn, ret.err}}db.numOpen++ // optimisticallydb.mu.Unlock()ci, err := db.connector.Connect(ctx)if err != nil {db.mu.Lock()db.numOpen-- // correct for earlier optimismdb.maybeOpenNewConnections()db.mu.Unlock()return nil, err}db.mu.Lock()dc := &driverConn{db: db,createdAt: nowFunc(),returnedAt: nowFunc(),ci: ci,inUse: true,}db.addDepLocked(dc, dc)db.mu.Unlock()return dc, nil
}
七.过期连接清理
当通过 DB.SetConnMaxLifetime 设置 DB.maxLifetime 或通过 DB.SetConnMaxIdleTime 设置 db.maxIdleTime 时,DB均会调用 DB.startCleanerLocked,这个函数的作用是按需初始化 DB.cleanerCh,然后新起一个协程调用 DB.connectionCleaner。
// startCleanerLocked starts connectionCleaner if needed.
func (db *DB) startCleanerLocked() {if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {db.cleanerCh = make(chan struct{}, 1)go db.connectionCleaner(db.shortestIdleTimeLocked())}
}
func (db *DB) connectionCleaner(d time.Duration) {const minInterval = time.Secondif d < minInterval {d = minInterval}t := time.NewTimer(d)for {select {case <-t.C:case <-db.cleanerCh: // maxLifetime was changed or db was closed.}db.mu.Lock()d = db.shortestIdleTimeLocked()if db.closed || db.numOpen == 0 || d <= 0 {db.cleanerCh = nildb.mu.Unlock()return}d, closing := db.connectionCleanerRunLocked(d)db.mu.Unlock()for _, c := range closing {c.Close()}if d < minInterval {d = minInterval}if !t.Stop() {select {case <-t.C:default:}}t.Reset(d)}
}