|
package main |
|
|
|
import ( |
|
"fmt" |
|
"time" |
|
|
|
"github.com/gocql/gocql" |
|
"github.com/golang-migrate/migrate/v4" |
|
"github.com/golang-migrate/migrate/v4/database/cassandra" |
|
_ "github.com/golang-migrate/migrate/v4/source/file" |
|
"github.com/kelseyhightower/envconfig" |
|
"github.com/relistan/rubberneck" |
|
log "github.com/sirupsen/logrus" |
|
) |
|
|
|
// Config represents the environment variables for configuration |
|
type Config struct { |
|
Environment string `envconfig:"ENVIRONMENT" default:"dev"` |
|
|
|
Endpoints []string `envconfig:"CASSANDRA_ENDPOINTS" required:"true"` |
|
Keyspace string `envconfig:"CASSANDRA_KEYSPACE" required:"true"` |
|
SSLEnable bool `envconfig:"CASSANDRA_SSL_ENABLE" default:"false"` |
|
CACertFile string `envconfig:"CASSANDRA_CACERT_FILE" default:"sf-class2-root.crt"` |
|
Port int `envconfig:"CASSANDRA_PORT" default:"9042"` |
|
Username string `envconfig:"CASSANDRA_USERNAME" default:""` |
|
Password string `envconfig:"CASSANDRA_PASSWORD" default:""` |
|
|
|
MigrationDir string `envconfig:"CASSANDRA_MIGRATION_DIR" default:"db/cassandra/migrations"` |
|
ReplicationFactor int `envconfig:"CASSANDRA_REPL_FACTOR" default:"1"` |
|
ReplicationStrategy string `envconfig:"CASSANDRA_REPL_STRATEGY" default:"SimpleStrategy"` |
|
} |
|
|
|
// createKeyspace creates the keyspace in Cassandra if it doesn't exist |
|
func createKeyspace(session *gocql.Session, keyspace string, replFactor int, strategy string) error { |
|
query := fmt.Sprintf( |
|
"CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = {'class': '%s', 'replication_factor': %d}", |
|
keyspace, strategy, replFactor) |
|
if err := session.Query(query).Exec(); err != nil { |
|
return err |
|
} |
|
return nil |
|
} |
|
|
|
// runMigrations runs schema migrations using Golang-migrate |
|
func runMigrations(sess *gocql.Session, migrationDir string, cassandraHosts []string, keyspace string) error { |
|
cassandraDriver, err := cassandra.WithInstance(sess, &cassandra.Config{ |
|
KeyspaceName: keyspace, |
|
}) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
m, err := migrate.NewWithDatabaseInstance( |
|
"file://"+migrationDir, |
|
"cassandra", cassandraDriver) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
defer m.Close() |
|
|
|
err = m.Up() |
|
if err != nil && err != migrate.ErrNoChange { |
|
return err |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func awaitSession(cluster *gocql.ClusterConfig) (session *gocql.Session, err error) { |
|
for i := 0; i < 240; i++ { |
|
session, err = cluster.CreateSession() |
|
if err == nil { |
|
break |
|
} |
|
time.Sleep(500 * time.Millisecond) |
|
log.Warnf("Cassandra not available. Waiting...") |
|
} |
|
|
|
return session, err |
|
} |
|
|
|
func configureCassandra(config *Config) *gocql.ClusterConfig { |
|
cluster := gocql.NewCluster(config.Endpoints...) |
|
cluster.Keyspace = "system" |
|
cluster.Port = config.Port |
|
|
|
// These two are only used when deployed against Amazon Keyspaces |
|
if config.Username != "" { |
|
cluster.Authenticator = gocql.PasswordAuthenticator{ |
|
Username: config.Username, |
|
Password: config.Password, |
|
} |
|
} |
|
if config.SSLEnable { |
|
cluster.SslOpts = &gocql.SslOptions{ |
|
CaPath: config.CACertFile, |
|
EnableHostVerification: false, |
|
} |
|
} |
|
|
|
return cluster |
|
} |
|
|
|
func main() { |
|
var config Config |
|
log.Info("Beginning Cassandra migration") |
|
err := envconfig.Process("", &config) |
|
if err != nil { |
|
log.Fatalf("error loading config: %v", err) |
|
} |
|
rubberneck.Print(config) |
|
|
|
// Connect to Cassandra cluster |
|
cluster := configureCassandra(&config) |
|
session, err := awaitSession(cluster) |
|
if err != nil { |
|
// We land here after the loop expires |
|
log.Fatalf("error connecting to Cassandra: %v", err) |
|
} |
|
defer session.Close() |
|
|
|
keyspace := fmt.Sprintf("%s_%s", config.Keyspace, config.Environment) |
|
|
|
// Create the keyspace if it doesn't exist |
|
if err := createKeyspace(session, keyspace, config.ReplicationFactor, config.ReplicationStrategy); err != nil { |
|
log.Fatalf("error creating keyspace %s: %v", keyspace, err) |
|
} |
|
|
|
cluster.Keyspace = keyspace |
|
migrateSession, err := cluster.CreateSession() |
|
if err != nil { |
|
log.Fatalf("error connecting to Cassandra: %v", err) |
|
} |
|
defer migrateSession.Close() |
|
|
|
// Apply schema migrations |
|
if err := runMigrations(migrateSession, config.MigrationDir, config.Endpoints, keyspace); err != nil { |
|
log.Fatalf("error applying migrations: %v", err) |
|
} |
|
|
|
log.Info("Migration completed successfully") |
|
} |