Created
March 10, 2020 16:22
-
-
Save yurishkuro/d460a21c293cd02306cc8bc19d895940 to your computer and use it in GitHub Desktop.
lazy session for Cassandra in Jaeger
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cassandra | |
import ( | |
"errors" | |
"sync" | |
"sync/atomic" | |
"github.com/jaegertracing/jaeger/pkg/cassandra" | |
"go.uber.org/zap" | |
) | |
var errUninitializedSession = errors.New("uninitialized Cassandra session") | |
// LazySession is a wrapper which attempts session creation only when queries are received. This allows for | |
// application startup even when Cassandra is unavailable. | |
// The first query incurs the cost of the blocking session creation. If session creation is unsuccessful, the query | |
// is created with a disconnectedSession, which will return an error when used. | |
type LazySession struct { | |
sessionInitialized uint32 | |
sessionProvider SessionProvider | |
logger *zap.Logger | |
session cassandra.Session | |
m sync.Mutex | |
} | |
// SessionProvider is a func which provides a cassandra session | |
type SessionProvider func() (cassandra.Session, error) | |
// NewLazySession creates a new LazySession | |
func NewLazySession(sessionProvider SessionProvider, logger *zap.Logger) cassandra.Session { | |
return &LazySession{ | |
sessionProvider: sessionProvider, | |
logger: logger, | |
} | |
} | |
func (l *LazySession) getSession() cassandra.Session { | |
if atomic.LoadUint32(&l.sessionInitialized) == 1 { | |
return l.session | |
} | |
l.logger.Info("Waiting on lock to begin lazy Cassandra session initialization") | |
l.m.Lock() | |
defer l.m.Unlock() | |
if l.sessionInitialized == 0 { | |
l.logger.Debug("Starting lazy Cassandra session initialization") | |
session, err := l.sessionProvider() | |
if err != nil { | |
l.logger.Warn("Unable to initialize lazy Cassandra session", zap.Error(err)) | |
return disconnectedSession{} | |
} | |
l.logger.Info("Successfully initialized lazy Cassandra session") | |
l.session = session | |
atomic.StoreUint32(&l.sessionInitialized, 1) | |
} | |
return l.session | |
} | |
// --- | |
// Query delegates to underlying session query. It creates a new session if none exists | |
func (l *LazySession) Query(stmt string, values ...interface{}) cassandra.Query { | |
return l.getSession().Query(stmt, values...) | |
} | |
// Close delegates to underlying session query. It creates a session if none exists | |
func (l *LazySession) Close() { | |
l.getSession().Close() | |
} | |
// --- | |
// disconnectedSession returns disconnected queries which short circuit to produce errors | |
type disconnectedSession struct { | |
} | |
func (disconnectedSession) Query(stmt string, values ...interface{}) cassandra.Query { | |
return disconnectedQuery{} | |
} | |
func (disconnectedSession) Close() { | |
} | |
// --- | |
// disconnectQuery returns queries that always error | |
type disconnectedQuery struct { | |
} | |
func (disconnectedQuery) Exec() error { | |
return errUninitializedSession | |
} | |
func (disconnectedQuery) String() string { | |
return "This is a query from an uninitialized session." | |
} | |
func (disconnectedQuery) ScanCAS(dest ...interface{}) (bool, error) { | |
return false, errUninitializedSession | |
} | |
func (disconnectedQuery) Iter() cassandra.Iterator { | |
return disconnectedIterator{} | |
} | |
func (disconnectedQuery) Bind(v ...interface{}) cassandra.Query { | |
return disconnectedQuery{} | |
} | |
func (disconnectedQuery) Consistency(level cassandra.Consistency) cassandra.Query { | |
return disconnectedQuery{} | |
} | |
func (disconnectedQuery) PageSize(int) cassandra.Query { | |
return disconnectedQuery{} | |
} | |
// --- | |
// disconnectedIterator returns iterators which short circuit to produce errors | |
type disconnectedIterator struct { | |
} | |
func (disconnectedIterator) Scan(dest ...interface{}) bool { | |
return false | |
} | |
func (disconnectedIterator) Close() error { | |
return errUninitializedSession | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment