mirror of
https://github.com/rclone/rclone.git
synced 2024-12-25 08:28:57 +01:00
1127 lines
34 KiB
Go
1127 lines
34 KiB
Go
/*
|
|
Copyright 2017 Google Inc. All Rights Reserved.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package spanner
|
|
|
|
import (
|
|
"container/heap"
|
|
"container/list"
|
|
"fmt"
|
|
"math/rand"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/golang/glog"
|
|
"golang.org/x/net/context"
|
|
|
|
sppb "google.golang.org/genproto/googleapis/spanner/v1"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/metadata"
|
|
)
|
|
|
|
// sessionHandle is an interface for transactions to access Cloud Spanner sessions safely. It is generated by sessionPool.take().
|
|
type sessionHandle struct {
|
|
// mu guarantees that inner session object is returned / destroyed only once.
|
|
mu sync.Mutex
|
|
// session is a pointer to a session object. Transactions never need to access it directly.
|
|
session *session
|
|
}
|
|
|
|
// recycle gives the inner session object back to its home session pool. It is safe to call recycle multiple times but only the first one would take effect.
|
|
func (sh *sessionHandle) recycle() {
|
|
sh.mu.Lock()
|
|
defer sh.mu.Unlock()
|
|
if sh.session == nil {
|
|
// sessionHandle has already been recycled.
|
|
return
|
|
}
|
|
sh.session.recycle()
|
|
sh.session = nil
|
|
}
|
|
|
|
// getID gets the Cloud Spanner session ID from the internal session object. getID returns empty string if the sessionHandle is nil or the inner session
|
|
// object has been released by recycle / destroy.
|
|
func (sh *sessionHandle) getID() string {
|
|
sh.mu.Lock()
|
|
defer sh.mu.Unlock()
|
|
if sh.session == nil {
|
|
// sessionHandle has already been recycled/destroyed.
|
|
return ""
|
|
}
|
|
return sh.session.getID()
|
|
}
|
|
|
|
// getClient gets the Cloud Spanner RPC client associated with the session ID in sessionHandle.
|
|
func (sh *sessionHandle) getClient() sppb.SpannerClient {
|
|
sh.mu.Lock()
|
|
defer sh.mu.Unlock()
|
|
if sh.session == nil {
|
|
return nil
|
|
}
|
|
return sh.session.client
|
|
}
|
|
|
|
// getMetadata returns the metadata associated with the session in sessionHandle.
|
|
func (sh *sessionHandle) getMetadata() metadata.MD {
|
|
sh.mu.Lock()
|
|
defer sh.mu.Unlock()
|
|
if sh.session == nil {
|
|
return nil
|
|
}
|
|
return sh.session.md
|
|
}
|
|
|
|
// getTransactionID returns the transaction id in the session if available.
|
|
func (sh *sessionHandle) getTransactionID() transactionID {
|
|
sh.mu.Lock()
|
|
defer sh.mu.Unlock()
|
|
if sh.session == nil {
|
|
return nil
|
|
}
|
|
return sh.session.tx
|
|
}
|
|
|
|
// destroy destroys the inner session object. It is safe to call destroy multiple times and only the first call would attempt to
|
|
// destroy the inner session object.
|
|
func (sh *sessionHandle) destroy() {
|
|
sh.mu.Lock()
|
|
s := sh.session
|
|
sh.session = nil
|
|
sh.mu.Unlock()
|
|
if s == nil {
|
|
// sessionHandle has already been destroyed.
|
|
return
|
|
}
|
|
s.destroy(false)
|
|
}
|
|
|
|
// session wraps a Cloud Spanner session ID through which transactions are created and executed.
|
|
type session struct {
|
|
// client is the RPC channel to Cloud Spanner. It is set only once during session's creation.
|
|
client sppb.SpannerClient
|
|
// id is the unique id of the session in Cloud Spanner. It is set only once during session's creation.
|
|
id string
|
|
// pool is the session's home session pool where it was created. It is set only once during session's creation.
|
|
pool *sessionPool
|
|
// createTime is the timestamp of the session's creation. It is set only once during session's creation.
|
|
createTime time.Time
|
|
|
|
// mu protects the following fields from concurrent access: both healthcheck workers and transactions can modify them.
|
|
mu sync.Mutex
|
|
// valid marks the validity of a session.
|
|
valid bool
|
|
// hcIndex is the index of the session inside the global healthcheck queue. If hcIndex < 0, session has been unregistered from the queue.
|
|
hcIndex int
|
|
// idleList is the linkedlist node which links the session to its home session pool's idle list. If idleList == nil, the
|
|
// session is not in idle list.
|
|
idleList *list.Element
|
|
// nextCheck is the timestamp of next scheduled healthcheck of the session. It is maintained by the global health checker.
|
|
nextCheck time.Time
|
|
// checkingHelath is true if currently this session is being processed by health checker. Must be modified under health checker lock.
|
|
checkingHealth bool
|
|
// md is the Metadata to be sent with each request.
|
|
md metadata.MD
|
|
// tx contains the transaction id if the session has been prepared for write.
|
|
tx transactionID
|
|
}
|
|
|
|
// isValid returns true if the session is still valid for use.
|
|
func (s *session) isValid() bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.valid
|
|
}
|
|
|
|
// isWritePrepared returns true if the session is prepared for write.
|
|
func (s *session) isWritePrepared() bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.tx != nil
|
|
}
|
|
|
|
// String implements fmt.Stringer for session.
|
|
func (s *session) String() string {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return fmt.Sprintf("<id=%v, hcIdx=%v, idleList=%p, valid=%v, create=%v, nextcheck=%v>",
|
|
s.id, s.hcIndex, s.idleList, s.valid, s.createTime, s.nextCheck)
|
|
}
|
|
|
|
// ping verifies if the session is still alive in Cloud Spanner.
|
|
func (s *session) ping() error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
defer cancel()
|
|
return runRetryable(ctx, func(ctx context.Context) error {
|
|
_, err := s.client.GetSession(contextWithOutgoingMetadata(ctx, s.pool.md), &sppb.GetSessionRequest{Name: s.getID()}) // s.getID is safe even when s is invalid.
|
|
return err
|
|
})
|
|
}
|
|
|
|
// refreshIdle refreshes the session's session ID if it is in its home session pool's idle list
|
|
// and returns true if successful.
|
|
func (s *session) refreshIdle() bool {
|
|
s.mu.Lock()
|
|
validAndIdle := s.valid && s.idleList != nil
|
|
s.mu.Unlock()
|
|
if !validAndIdle {
|
|
// Optimization: return early if s is not valid or if s is not in idle list.
|
|
return false
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
var sid string
|
|
err := runRetryable(ctx, func(ctx context.Context) error {
|
|
session, e := s.client.CreateSession(contextWithOutgoingMetadata(ctx, s.pool.md), &sppb.CreateSessionRequest{Database: s.pool.db})
|
|
if e != nil {
|
|
return e
|
|
}
|
|
sid = session.Name
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return false
|
|
}
|
|
s.pool.mu.Lock()
|
|
s.mu.Lock()
|
|
var recycle bool
|
|
if s.valid && s.idleList != nil {
|
|
// session is in idle list, refresh its session id.
|
|
sid, s.id = s.id, sid
|
|
s.createTime = time.Now()
|
|
if s.tx != nil {
|
|
s.tx = nil
|
|
s.pool.idleWriteList.Remove(s.idleList)
|
|
// We need to put this session back into the pool.
|
|
recycle = true
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
s.pool.mu.Unlock()
|
|
if recycle {
|
|
s.pool.recycle(s)
|
|
}
|
|
// If we fail to explicitly destroy the session, it will be eventually garbage collected by
|
|
// Cloud Spanner.
|
|
if err = runRetryable(ctx, func(ctx context.Context) error {
|
|
_, e := s.client.DeleteSession(contextWithOutgoingMetadata(ctx, s.pool.md), &sppb.DeleteSessionRequest{Name: sid})
|
|
return e
|
|
}); err != nil && log.V(2) {
|
|
log.Warningf("Failed to delete session %v. Error: %v", sid, err)
|
|
}
|
|
return true
|
|
}
|
|
|
|
// setHcIndex atomically sets the session's index in the healthcheck queue and returns the old index.
|
|
func (s *session) setHcIndex(i int) int {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
oi := s.hcIndex
|
|
s.hcIndex = i
|
|
return oi
|
|
}
|
|
|
|
// setIdleList atomically sets the session's idle list link and returns the old link.
|
|
func (s *session) setIdleList(le *list.Element) *list.Element {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
old := s.idleList
|
|
s.idleList = le
|
|
return old
|
|
}
|
|
|
|
// invalidate marks a session as invalid and returns the old validity.
|
|
func (s *session) invalidate() bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
ov := s.valid
|
|
s.valid = false
|
|
return ov
|
|
}
|
|
|
|
// setNextCheck sets the timestamp for next healthcheck on the session.
|
|
func (s *session) setNextCheck(t time.Time) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.nextCheck = t
|
|
}
|
|
|
|
// setTransactionID sets the transaction id in the session
|
|
func (s *session) setTransactionID(tx transactionID) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.tx = tx
|
|
}
|
|
|
|
// getID returns the session ID which uniquely identifies the session in Cloud Spanner.
|
|
func (s *session) getID() string {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.id
|
|
}
|
|
|
|
// getHcIndex returns the session's index into the global healthcheck priority queue.
|
|
func (s *session) getHcIndex() int {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.hcIndex
|
|
}
|
|
|
|
// getIdleList returns the session's link in its home session pool's idle list.
|
|
func (s *session) getIdleList() *list.Element {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.idleList
|
|
}
|
|
|
|
// getNextCheck returns the timestamp for next healthcheck on the session.
|
|
func (s *session) getNextCheck() time.Time {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.nextCheck
|
|
}
|
|
|
|
// recycle turns the session back to its home session pool.
|
|
func (s *session) recycle() {
|
|
s.setTransactionID(nil)
|
|
if !s.pool.recycle(s) {
|
|
// s is rejected by its home session pool because it expired and the session pool is currently having enough number of open sessions.
|
|
s.destroy(false)
|
|
}
|
|
}
|
|
|
|
// destroy removes the session from its home session pool, healthcheck queue and Cloud Spanner service.
|
|
func (s *session) destroy(isExpire bool) bool {
|
|
// Remove s from session pool.
|
|
if !s.pool.remove(s, isExpire) {
|
|
return false
|
|
}
|
|
// Unregister s from healthcheck queue.
|
|
s.pool.hc.unregister(s)
|
|
// Remove s from Cloud Spanner service.
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
// Ignore the error returned by runRetryable because even if we fail to explicitly destroy the session,
|
|
// it will be eventually garbage collected by Cloud Spanner.
|
|
err := runRetryable(ctx, func(ctx context.Context) error {
|
|
_, e := s.client.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: s.getID()})
|
|
return e
|
|
})
|
|
if err != nil && log.V(2) {
|
|
log.Warningf("Failed to delete session %v. Error: %v", s.getID(), err)
|
|
}
|
|
return true
|
|
}
|
|
|
|
// prepareForWrite prepares the session for write if it is not already in that state.
|
|
func (s *session) prepareForWrite(ctx context.Context) error {
|
|
if s.isWritePrepared() {
|
|
return nil
|
|
}
|
|
tx, err := beginTransaction(ctx, s.getID(), s.client)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.setTransactionID(tx)
|
|
return nil
|
|
}
|
|
|
|
// SessionPoolConfig stores configurations of a session pool.
|
|
type SessionPoolConfig struct {
|
|
// getRPCClient is the caller supplied method for getting a gRPC client to Cloud Spanner, this makes session pool able to use client pooling.
|
|
getRPCClient func() (sppb.SpannerClient, error)
|
|
// MaxOpened is the maximum number of opened sessions that is allowed by the
|
|
// session pool. Default to NumChannels * 100.
|
|
MaxOpened uint64
|
|
// MinOpened is the minimum number of opened sessions that the session pool
|
|
// tries to maintain. Session pool won't continue to expire sessions if number
|
|
// of opened connections drops below MinOpened. However, if session is found
|
|
// to be broken, it will still be evicted from session pool, therefore it is
|
|
// posssible that the number of opened sessions drops below MinOpened.
|
|
MinOpened uint64
|
|
// maxSessionAge is the maximum duration that a session can be reused, zero
|
|
// means session pool will never expire sessions.
|
|
maxSessionAge time.Duration
|
|
// MaxIdle is the maximum number of idle sessions, pool is allowed to keep. Defaults to 0.
|
|
MaxIdle uint64
|
|
// MaxBurst is the maximum number of concurrent session creation requests. Defaults to 10.
|
|
MaxBurst uint64
|
|
// WriteSessions is the fraction of sessions we try to keep prepared for write.
|
|
WriteSessions float64
|
|
// HealthCheckWorkers is number of workers used by health checker for this pool.
|
|
HealthCheckWorkers int
|
|
// HealthCheckInterval is how often the health checker pings a session. Defaults to 5 min.
|
|
HealthCheckInterval time.Duration
|
|
// healthCheckMaintainerEnabled enables the session pool maintainer.
|
|
healthCheckMaintainerEnabled bool
|
|
// healthCheckSampleInterval is how often the health checker samples live session (for use in maintaining session pool size). Defaults to 1 min.
|
|
healthCheckSampleInterval time.Duration
|
|
}
|
|
|
|
// errNoRPCGetter returns error for SessionPoolConfig missing getRPCClient method.
|
|
func errNoRPCGetter() error {
|
|
return spannerErrorf(codes.InvalidArgument, "require SessionPoolConfig.getRPCClient != nil, got nil")
|
|
}
|
|
|
|
// errMinOpenedGTMapOpened returns error for SessionPoolConfig.MaxOpened < SessionPoolConfig.MinOpened when SessionPoolConfig.MaxOpened is set.
|
|
func errMinOpenedGTMaxOpened(spc *SessionPoolConfig) error {
|
|
return spannerErrorf(codes.InvalidArgument,
|
|
"require SessionPoolConfig.MaxOpened >= SessionPoolConfig.MinOpened, got %v and %v", spc.MaxOpened, spc.MinOpened)
|
|
}
|
|
|
|
// validate verifies that the SessionPoolConfig is good for use.
|
|
func (spc *SessionPoolConfig) validate() error {
|
|
if spc.getRPCClient == nil {
|
|
return errNoRPCGetter()
|
|
}
|
|
if spc.MinOpened > spc.MaxOpened && spc.MaxOpened > 0 {
|
|
return errMinOpenedGTMaxOpened(spc)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// sessionPool creates and caches Cloud Spanner sessions.
|
|
type sessionPool struct {
|
|
// mu protects sessionPool from concurrent access.
|
|
mu sync.Mutex
|
|
// valid marks the validity of the session pool.
|
|
valid bool
|
|
// db is the database name that all sessions in the pool are associated with.
|
|
db string
|
|
// idleList caches idle session IDs. Session IDs in this list can be allocated for use.
|
|
idleList list.List
|
|
// idleWriteList caches idle sessions which have been prepared for write.
|
|
idleWriteList list.List
|
|
// mayGetSession is for broadcasting that session retrival/creation may proceed.
|
|
mayGetSession chan struct{}
|
|
// numOpened is the total number of open sessions from the session pool.
|
|
numOpened uint64
|
|
// createReqs is the number of ongoing session creation requests.
|
|
createReqs uint64
|
|
// prepareReqs is the number of ongoing session preparation request.
|
|
prepareReqs uint64
|
|
// configuration of the session pool.
|
|
SessionPoolConfig
|
|
// Metadata to be sent with each request
|
|
md metadata.MD
|
|
// hc is the health checker
|
|
hc *healthChecker
|
|
}
|
|
|
|
// newSessionPool creates a new session pool.
|
|
func newSessionPool(db string, config SessionPoolConfig, md metadata.MD) (*sessionPool, error) {
|
|
if err := config.validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
pool := &sessionPool{
|
|
db: db,
|
|
valid: true,
|
|
mayGetSession: make(chan struct{}),
|
|
SessionPoolConfig: config,
|
|
md: md,
|
|
}
|
|
if config.HealthCheckWorkers == 0 {
|
|
// With 10 workers and assuming average latency of 5 ms for BeginTransaction, we will be able to
|
|
// prepare 2000 tx/sec in advance. If the rate of takeWriteSession is more than that, it will
|
|
// degrade to doing BeginTransaction inline.
|
|
// TODO: consider resizing the worker pool dynamically according to the load.
|
|
config.HealthCheckWorkers = 10
|
|
}
|
|
if config.HealthCheckInterval == 0 {
|
|
config.HealthCheckInterval = 5 * time.Minute
|
|
}
|
|
if config.healthCheckSampleInterval == 0 {
|
|
config.healthCheckSampleInterval = time.Minute
|
|
}
|
|
// On GCE VM, within the same region an healthcheck ping takes on average 10ms to finish, given a 5 minutes interval and
|
|
// 10 healthcheck workers, a healthChecker can effectively mantain 100 checks_per_worker/sec * 10 workers * 300 seconds = 300K sessions.
|
|
pool.hc = newHealthChecker(config.HealthCheckInterval, config.HealthCheckWorkers, config.healthCheckSampleInterval, pool)
|
|
close(pool.hc.ready)
|
|
return pool, nil
|
|
}
|
|
|
|
// isValid checks if the session pool is still valid.
|
|
func (p *sessionPool) isValid() bool {
|
|
if p == nil {
|
|
return false
|
|
}
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
return p.valid
|
|
}
|
|
|
|
// close marks the session pool as closed.
|
|
func (p *sessionPool) close() {
|
|
if p == nil {
|
|
return
|
|
}
|
|
p.mu.Lock()
|
|
if !p.valid {
|
|
p.mu.Unlock()
|
|
return
|
|
}
|
|
p.valid = false
|
|
p.mu.Unlock()
|
|
p.hc.close()
|
|
// destroy all the sessions
|
|
p.hc.mu.Lock()
|
|
allSessions := make([]*session, len(p.hc.queue.sessions))
|
|
copy(allSessions, p.hc.queue.sessions)
|
|
p.hc.mu.Unlock()
|
|
for _, s := range allSessions {
|
|
s.destroy(false)
|
|
}
|
|
}
|
|
|
|
// errInvalidSessionPool returns error for using an invalid session pool.
|
|
func errInvalidSessionPool() error {
|
|
return spannerErrorf(codes.InvalidArgument, "invalid session pool")
|
|
}
|
|
|
|
// errGetSessionTimeout returns error for context timeout during sessionPool.take().
|
|
func errGetSessionTimeout() error {
|
|
return spannerErrorf(codes.Canceled, "timeout / context canceled during getting session")
|
|
}
|
|
|
|
// shouldPrepareWrite returns true if we should prepare more sessions for write.
|
|
func (p *sessionPool) shouldPrepareWrite() bool {
|
|
return float64(p.numOpened)*p.WriteSessions > float64(p.idleWriteList.Len()+int(p.prepareReqs))
|
|
}
|
|
|
|
func (p *sessionPool) createSession(ctx context.Context) (*session, error) {
|
|
doneCreate := func(done bool) {
|
|
p.mu.Lock()
|
|
if !done {
|
|
// Session creation failed, give budget back.
|
|
p.numOpened--
|
|
}
|
|
p.createReqs--
|
|
// Notify other waiters blocking on session creation.
|
|
close(p.mayGetSession)
|
|
p.mayGetSession = make(chan struct{})
|
|
p.mu.Unlock()
|
|
}
|
|
sc, err := p.getRPCClient()
|
|
if err != nil {
|
|
doneCreate(false)
|
|
return nil, err
|
|
}
|
|
var s *session
|
|
err = runRetryable(ctx, func(ctx context.Context) error {
|
|
sid, e := sc.CreateSession(ctx, &sppb.CreateSessionRequest{Database: p.db})
|
|
if e != nil {
|
|
return e
|
|
}
|
|
// If no error, construct the new session.
|
|
s = &session{valid: true, client: sc, id: sid.Name, pool: p, createTime: time.Now(), md: p.md}
|
|
p.hc.register(s)
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
doneCreate(false)
|
|
// Should return error directly because of the previous retries on CreateSession RPC.
|
|
return nil, err
|
|
}
|
|
doneCreate(true)
|
|
return s, nil
|
|
}
|
|
|
|
func (p *sessionPool) isHealthy(s *session) bool {
|
|
if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) {
|
|
// TODO: figure out if we need to schedule a new healthcheck worker here.
|
|
if err := s.ping(); shouldDropSession(err) {
|
|
// The session is already bad, continue to fetch/create a new one.
|
|
s.destroy(false)
|
|
return false
|
|
}
|
|
p.hc.scheduledHC(s)
|
|
}
|
|
return true
|
|
}
|
|
|
|
// take returns a cached session if there are available ones; if there isn't any, it tries to allocate a new one.
|
|
// Session returned by take should be used for read operations.
|
|
func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) {
|
|
ctx = contextWithOutgoingMetadata(ctx, p.md)
|
|
for {
|
|
var (
|
|
s *session
|
|
err error
|
|
)
|
|
|
|
p.mu.Lock()
|
|
if !p.valid {
|
|
p.mu.Unlock()
|
|
return nil, errInvalidSessionPool()
|
|
}
|
|
if p.idleList.Len() > 0 {
|
|
// Idle sessions are available, get one from the top of the idle list.
|
|
s = p.idleList.Remove(p.idleList.Front()).(*session)
|
|
} else if p.idleWriteList.Len() > 0 {
|
|
s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session)
|
|
}
|
|
if s != nil {
|
|
s.setIdleList(nil)
|
|
p.mu.Unlock()
|
|
// From here, session is no longer in idle list, so healthcheck workers won't destroy it.
|
|
// If healthcheck workers failed to schedule healthcheck for the session timely, do the check here.
|
|
// Because session check is still much cheaper than session creation, they should be reused as much as possible.
|
|
if !p.isHealthy(s) {
|
|
continue
|
|
}
|
|
return &sessionHandle{session: s}, nil
|
|
}
|
|
// Idle list is empty, block if session pool has reached max session creation concurrency or max number of open sessions.
|
|
if (p.MaxOpened > 0 && p.numOpened >= p.MaxOpened) || (p.MaxBurst > 0 && p.createReqs >= p.MaxBurst) {
|
|
mayGetSession := p.mayGetSession
|
|
p.mu.Unlock()
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, errGetSessionTimeout()
|
|
case <-mayGetSession:
|
|
}
|
|
continue
|
|
}
|
|
// Take budget before the actual session creation.
|
|
p.numOpened++
|
|
p.createReqs++
|
|
p.mu.Unlock()
|
|
if s, err = p.createSession(ctx); err != nil {
|
|
return nil, toSpannerError(err)
|
|
}
|
|
return &sessionHandle{session: s}, nil
|
|
}
|
|
}
|
|
|
|
// takeWriteSession returns a write prepared cached session if there are available ones; if there isn't any, it tries to allocate a new one.
|
|
// Session returned should be used for read write transactions.
|
|
func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) {
|
|
ctx = contextWithOutgoingMetadata(ctx, p.md)
|
|
for {
|
|
var (
|
|
s *session
|
|
err error
|
|
)
|
|
|
|
p.mu.Lock()
|
|
if !p.valid {
|
|
p.mu.Unlock()
|
|
return nil, errInvalidSessionPool()
|
|
}
|
|
if p.idleWriteList.Len() > 0 {
|
|
// Idle sessions are available, get one from the top of the idle list.
|
|
s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session)
|
|
} else if p.idleList.Len() > 0 {
|
|
s = p.idleList.Remove(p.idleList.Front()).(*session)
|
|
}
|
|
if s != nil {
|
|
s.setIdleList(nil)
|
|
p.mu.Unlock()
|
|
// From here, session is no longer in idle list, so healthcheck workers won't destroy it.
|
|
// If healthcheck workers failed to schedule healthcheck for the session timely, do the check here.
|
|
// Because session check is still much cheaper than session creation, they should be reused as much as possible.
|
|
if !p.isHealthy(s) {
|
|
continue
|
|
}
|
|
if !s.isWritePrepared() {
|
|
if err = s.prepareForWrite(ctx); err != nil {
|
|
return nil, toSpannerError(err)
|
|
}
|
|
}
|
|
return &sessionHandle{session: s}, nil
|
|
}
|
|
// Idle list is empty, block if session pool has reached max session creation concurrency or max number of open sessions.
|
|
if (p.MaxOpened > 0 && p.numOpened >= p.MaxOpened) || (p.MaxBurst > 0 && p.createReqs >= p.MaxBurst) {
|
|
mayGetSession := p.mayGetSession
|
|
p.mu.Unlock()
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, errGetSessionTimeout()
|
|
case <-mayGetSession:
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Take budget before the actual session creation.
|
|
p.numOpened++
|
|
p.createReqs++
|
|
p.mu.Unlock()
|
|
if s, err = p.createSession(ctx); err != nil {
|
|
return nil, toSpannerError(err)
|
|
}
|
|
if err = s.prepareForWrite(ctx); err != nil {
|
|
return nil, toSpannerError(err)
|
|
}
|
|
return &sessionHandle{session: s}, nil
|
|
}
|
|
}
|
|
|
|
// recycle puts session s back to the session pool's idle list, it returns true if the session pool successfully recycles session s.
|
|
func (p *sessionPool) recycle(s *session) bool {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
if !s.isValid() || !p.valid {
|
|
// Reject the session if session is invalid or pool itself is invalid.
|
|
return false
|
|
}
|
|
if p.maxSessionAge != 0 && s.createTime.Add(p.maxSessionAge).Before(time.Now()) && p.numOpened > p.MinOpened {
|
|
// session expires and number of opened sessions exceeds MinOpened, let the session itself.
|
|
return false
|
|
}
|
|
// Put session at the back of the list to round robin for load balancing across channels.
|
|
if s.isWritePrepared() {
|
|
s.setIdleList(p.idleWriteList.PushBack(s))
|
|
} else {
|
|
s.setIdleList(p.idleList.PushBack(s))
|
|
}
|
|
// Broadcast that a session has been returned to idle list.
|
|
close(p.mayGetSession)
|
|
p.mayGetSession = make(chan struct{})
|
|
return true
|
|
}
|
|
|
|
// remove atomically removes session s from the session pool and invalidates s.
|
|
// If isExpire == true, the removal is triggered by session expiration and in such cases, only idle sessions can be removed.
|
|
func (p *sessionPool) remove(s *session, isExpire bool) bool {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) {
|
|
// Don't expire session if the session is not in idle list (in use), or if number of open sessions is going below p.MinOpened.
|
|
return false
|
|
}
|
|
ol := s.setIdleList(nil)
|
|
// If the session is in the idlelist, remove it.
|
|
if ol != nil {
|
|
// Remove from whichever list it is in.
|
|
p.idleList.Remove(ol)
|
|
p.idleWriteList.Remove(ol)
|
|
}
|
|
if s.invalidate() {
|
|
// Decrease the number of opened sessions.
|
|
p.numOpened--
|
|
// Broadcast that a session has been destroyed.
|
|
close(p.mayGetSession)
|
|
p.mayGetSession = make(chan struct{})
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// hcHeap implements heap.Interface. It is used to create the priority queue for session healthchecks.
|
|
type hcHeap struct {
|
|
sessions []*session
|
|
}
|
|
|
|
// Len impelemnts heap.Interface.Len.
|
|
func (h hcHeap) Len() int {
|
|
return len(h.sessions)
|
|
}
|
|
|
|
// Less implements heap.Interface.Less.
|
|
func (h hcHeap) Less(i, j int) bool {
|
|
return h.sessions[i].getNextCheck().Before(h.sessions[j].getNextCheck())
|
|
}
|
|
|
|
// Swap implements heap.Interface.Swap.
|
|
func (h hcHeap) Swap(i, j int) {
|
|
h.sessions[i], h.sessions[j] = h.sessions[j], h.sessions[i]
|
|
h.sessions[i].setHcIndex(i)
|
|
h.sessions[j].setHcIndex(j)
|
|
}
|
|
|
|
// Push implements heap.Interface.Push.
|
|
func (h *hcHeap) Push(s interface{}) {
|
|
ns := s.(*session)
|
|
ns.setHcIndex(len(h.sessions))
|
|
h.sessions = append(h.sessions, ns)
|
|
}
|
|
|
|
// Pop implements heap.Interface.Pop.
|
|
func (h *hcHeap) Pop() interface{} {
|
|
old := h.sessions
|
|
n := len(old)
|
|
s := old[n-1]
|
|
h.sessions = old[:n-1]
|
|
s.setHcIndex(-1)
|
|
return s
|
|
}
|
|
|
|
// healthChecker performs periodical healthchecks on registered sessions.
|
|
type healthChecker struct {
|
|
// mu protects concurrent access to hcQueue.
|
|
mu sync.Mutex
|
|
// queue is the priority queue for session healthchecks. Sessions with lower nextCheck rank higher in the queue.
|
|
queue hcHeap
|
|
// interval is the average interval between two healthchecks on a session.
|
|
interval time.Duration
|
|
// workers is the number of concurrent healthcheck workers.
|
|
workers int
|
|
// waitWorkers waits for all healthcheck workers to exit
|
|
waitWorkers sync.WaitGroup
|
|
// pool is the underlying session pool.
|
|
pool *sessionPool
|
|
// sampleInterval is the interval of sampling by the maintainer.
|
|
sampleInterval time.Duration
|
|
// ready is used to signal that maintainer can start running.
|
|
ready chan struct{}
|
|
// done is used to signal that health checker should be closed.
|
|
done chan struct{}
|
|
// once is used for closing channel done only once.
|
|
once sync.Once
|
|
}
|
|
|
|
// newHealthChecker initializes new instance of healthChecker.
|
|
func newHealthChecker(interval time.Duration, workers int, sampleInterval time.Duration, pool *sessionPool) *healthChecker {
|
|
if workers <= 0 {
|
|
workers = 1
|
|
}
|
|
hc := &healthChecker{
|
|
interval: interval,
|
|
workers: workers,
|
|
pool: pool,
|
|
sampleInterval: sampleInterval,
|
|
ready: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
}
|
|
if hc.pool.healthCheckMaintainerEnabled {
|
|
hc.waitWorkers.Add(1)
|
|
go hc.maintainer()
|
|
}
|
|
for i := 1; i <= hc.workers; i++ {
|
|
hc.waitWorkers.Add(1)
|
|
go hc.worker(i)
|
|
}
|
|
return hc
|
|
}
|
|
|
|
// close closes the healthChecker and waits for all healthcheck workers to exit.
|
|
func (hc *healthChecker) close() {
|
|
hc.once.Do(func() { close(hc.done) })
|
|
hc.waitWorkers.Wait()
|
|
}
|
|
|
|
// isClosing checks if a healthChecker is already closing.
|
|
func (hc *healthChecker) isClosing() bool {
|
|
select {
|
|
case <-hc.done:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// getInterval gets the healthcheck interval.
|
|
func (hc *healthChecker) getInterval() time.Duration {
|
|
hc.mu.Lock()
|
|
defer hc.mu.Unlock()
|
|
return hc.interval
|
|
}
|
|
|
|
// scheduledHCLocked schedules next healthcheck on session s with the assumption that hc.mu is being held.
|
|
func (hc *healthChecker) scheduledHCLocked(s *session) {
|
|
// The next healthcheck will be scheduled after [interval*0.5, interval*1.5) nanoseconds.
|
|
nsFromNow := rand.Int63n(int64(hc.interval)) + int64(hc.interval)/2
|
|
s.setNextCheck(time.Now().Add(time.Duration(nsFromNow)))
|
|
if hi := s.getHcIndex(); hi != -1 {
|
|
// Session is still being tracked by healthcheck workers.
|
|
heap.Fix(&hc.queue, hi)
|
|
}
|
|
}
|
|
|
|
// scheduledHC schedules next healthcheck on session s. It is safe to be called concurrently.
|
|
func (hc *healthChecker) scheduledHC(s *session) {
|
|
hc.mu.Lock()
|
|
defer hc.mu.Unlock()
|
|
hc.scheduledHCLocked(s)
|
|
}
|
|
|
|
// register registers a session with healthChecker for periodical healthcheck.
|
|
func (hc *healthChecker) register(s *session) {
|
|
hc.mu.Lock()
|
|
defer hc.mu.Unlock()
|
|
hc.scheduledHCLocked(s)
|
|
heap.Push(&hc.queue, s)
|
|
}
|
|
|
|
// unregister unregisters a session from healthcheck queue.
|
|
func (hc *healthChecker) unregister(s *session) {
|
|
hc.mu.Lock()
|
|
defer hc.mu.Unlock()
|
|
oi := s.setHcIndex(-1)
|
|
if oi >= 0 {
|
|
heap.Remove(&hc.queue, oi)
|
|
}
|
|
}
|
|
|
|
// markDone marks that health check for session has been performed.
|
|
func (hc *healthChecker) markDone(s *session) {
|
|
hc.mu.Lock()
|
|
defer hc.mu.Unlock()
|
|
s.checkingHealth = false
|
|
}
|
|
|
|
// healthCheck checks the health of the session and pings it if needed.
|
|
func (hc *healthChecker) healthCheck(s *session) {
|
|
defer hc.markDone(s)
|
|
if !s.pool.isValid() {
|
|
// Session pool is closed, perform a garbage collection.
|
|
s.destroy(false)
|
|
return
|
|
}
|
|
if s.pool.maxSessionAge != 0 && s.createTime.Add(s.pool.maxSessionAge).Before(time.Now()) {
|
|
// Session reaches its maximum age, retire it. Failing that try to refresh it.
|
|
if s.destroy(true) || !s.refreshIdle() {
|
|
return
|
|
}
|
|
}
|
|
if err := s.ping(); shouldDropSession(err) {
|
|
// Ping failed, destroy the session.
|
|
s.destroy(false)
|
|
}
|
|
}
|
|
|
|
// worker performs the healthcheck on sessions in healthChecker's priority queue.
|
|
func (hc *healthChecker) worker(i int) {
|
|
// Returns a session which we should ping to keep it alive.
|
|
getNextForPing := func() *session {
|
|
hc.pool.mu.Lock()
|
|
defer hc.pool.mu.Unlock()
|
|
hc.mu.Lock()
|
|
defer hc.mu.Unlock()
|
|
if hc.queue.Len() <= 0 {
|
|
// Queue is empty.
|
|
return nil
|
|
}
|
|
s := hc.queue.sessions[0]
|
|
if s.getNextCheck().After(time.Now()) && hc.pool.valid {
|
|
// All sessions have been checked recently.
|
|
return nil
|
|
}
|
|
hc.scheduledHCLocked(s)
|
|
if !s.checkingHealth {
|
|
s.checkingHealth = true
|
|
return s
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Returns a session which we should prepare for write.
|
|
getNextForTx := func() *session {
|
|
hc.pool.mu.Lock()
|
|
defer hc.pool.mu.Unlock()
|
|
if hc.pool.shouldPrepareWrite() {
|
|
if hc.pool.idleList.Len() > 0 && hc.pool.valid {
|
|
hc.mu.Lock()
|
|
defer hc.mu.Unlock()
|
|
if hc.pool.idleList.Front().Value.(*session).checkingHealth {
|
|
return nil
|
|
}
|
|
session := hc.pool.idleList.Remove(hc.pool.idleList.Front()).(*session)
|
|
session.checkingHealth = true
|
|
hc.pool.prepareReqs++
|
|
return session
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
for {
|
|
if hc.isClosing() {
|
|
// Exit when the pool has been closed and all sessions have been destroyed
|
|
// or when health checker has been closed.
|
|
hc.waitWorkers.Done()
|
|
return
|
|
}
|
|
ws := getNextForTx()
|
|
if ws != nil {
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
err := ws.prepareForWrite(contextWithOutgoingMetadata(ctx, hc.pool.md))
|
|
cancel()
|
|
if err != nil {
|
|
// TODO(dixiao): handle error properly
|
|
log.Errorf("prepareForWrite failed: %v", err)
|
|
}
|
|
hc.pool.recycle(ws)
|
|
hc.pool.mu.Lock()
|
|
hc.pool.prepareReqs--
|
|
hc.pool.mu.Unlock()
|
|
hc.markDone(ws)
|
|
}
|
|
rs := getNextForPing()
|
|
if rs == nil {
|
|
if ws == nil {
|
|
// No work to be done so sleep to avoid burning cpu
|
|
pause := int64(100 * time.Millisecond)
|
|
if pause > int64(hc.interval) {
|
|
pause = int64(hc.interval)
|
|
}
|
|
select {
|
|
case <-time.After(time.Duration(rand.Int63n(pause) + pause/2)):
|
|
break
|
|
case <-hc.done:
|
|
break
|
|
}
|
|
|
|
}
|
|
continue
|
|
}
|
|
hc.healthCheck(rs)
|
|
}
|
|
}
|
|
|
|
// maintainer maintains the maxSessionsInUse by a window of kWindowSize * sampleInterval.
|
|
// Based on this information, health checker will try to maintain the number of sessions by hc..
|
|
func (hc *healthChecker) maintainer() {
|
|
// Wait so that pool is ready.
|
|
<-hc.ready
|
|
|
|
var (
|
|
windowSize uint64 = 10
|
|
iteration uint64
|
|
timeout <-chan time.Time
|
|
)
|
|
|
|
// replenishPool is run if numOpened is less than sessionsToKeep, timeouts on sampleInterval.
|
|
replenishPool := func(sessionsToKeep uint64) {
|
|
ctx, _ := context.WithTimeout(context.Background(), hc.sampleInterval)
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
return
|
|
default:
|
|
break
|
|
}
|
|
|
|
p := hc.pool
|
|
p.mu.Lock()
|
|
// Take budget before the actual session creation.
|
|
if sessionsToKeep <= p.numOpened {
|
|
p.mu.Unlock()
|
|
break
|
|
}
|
|
p.numOpened++
|
|
p.createReqs++
|
|
shouldPrepareWrite := p.shouldPrepareWrite()
|
|
p.mu.Unlock()
|
|
var (
|
|
s *session
|
|
err error
|
|
)
|
|
if s, err = p.createSession(ctx); err != nil {
|
|
log.Warningf("Failed to create session, error: %v", toSpannerError(err))
|
|
continue
|
|
}
|
|
if shouldPrepareWrite {
|
|
if err = s.prepareForWrite(ctx); err != nil {
|
|
log.Warningf("Failed to prepare session, error: %v", toSpannerError(err))
|
|
continue
|
|
}
|
|
}
|
|
p.recycle(s)
|
|
}
|
|
}
|
|
|
|
// shrinkPool, scales down the session pool.
|
|
shrinkPool := func(sessionsToKeep uint64) {
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
return
|
|
default:
|
|
break
|
|
}
|
|
|
|
p := hc.pool
|
|
p.mu.Lock()
|
|
|
|
if sessionsToKeep >= p.numOpened {
|
|
p.mu.Unlock()
|
|
break
|
|
}
|
|
|
|
var s *session
|
|
if p.idleList.Len() > 0 {
|
|
s = p.idleList.Front().Value.(*session)
|
|
} else if p.idleWriteList.Len() > 0 {
|
|
s = p.idleWriteList.Front().Value.(*session)
|
|
}
|
|
p.mu.Unlock()
|
|
if s != nil {
|
|
// destroy session as expire.
|
|
s.destroy(true)
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
for {
|
|
if hc.isClosing() {
|
|
hc.waitWorkers.Done()
|
|
return
|
|
}
|
|
|
|
// maxSessionsInUse is the maximum number of sessions in use concurrently over a period of time.
|
|
var maxSessionsInUse uint64
|
|
|
|
// Updates metrics.
|
|
hc.pool.mu.Lock()
|
|
currSessionsInUse := hc.pool.numOpened - uint64(hc.pool.idleList.Len()) - uint64(hc.pool.idleWriteList.Len())
|
|
currSessionsOpened := hc.pool.numOpened
|
|
hc.pool.mu.Unlock()
|
|
|
|
hc.mu.Lock()
|
|
if iteration%windowSize == 0 || maxSessionsInUse < currSessionsInUse {
|
|
maxSessionsInUse = currSessionsInUse
|
|
}
|
|
sessionsToKeep := maxUint64(hc.pool.MinOpened,
|
|
minUint64(currSessionsOpened, hc.pool.MaxIdle+maxSessionsInUse))
|
|
hc.mu.Unlock()
|
|
|
|
timeout = time.After(hc.sampleInterval)
|
|
// Replenish or Shrink pool if needed.
|
|
// Note: we don't need to worry about pending create session requests, we only need to sample the current sessions in use.
|
|
// the routines will not try to create extra / delete creating sessions.
|
|
if sessionsToKeep > currSessionsOpened {
|
|
replenishPool(sessionsToKeep)
|
|
} else {
|
|
shrinkPool(sessionsToKeep)
|
|
}
|
|
|
|
select {
|
|
case <-timeout:
|
|
break
|
|
case <-hc.done:
|
|
break
|
|
}
|
|
iteration++
|
|
}
|
|
}
|
|
|
|
// shouldDropSession returns true if a particular error leads to the removal of a session
|
|
func shouldDropSession(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
// If a Cloud Spanner can no longer locate the session (for example, if session is garbage collected), then caller
|
|
// should not try to return the session back into the session pool.
|
|
// TODO: once gRPC can return auxilary error information, stop parsing the error message.
|
|
if ErrCode(err) == codes.NotFound && strings.Contains(ErrDesc(err), "Session not found:") {
|
|
return true
|
|
}
|
|
return false
|
|
}
|