Skip to content

Instantly share code, notes, and snippets.

@Nearhan
Created June 9, 2018 21:03
Show Gist options
  • Save Nearhan/5eea2de897c3757da89281ae225de054 to your computer and use it in GitHub Desktop.
Save Nearhan/5eea2de897c3757da89281ae225de054 to your computer and use it in GitHub Desktop.
package core
import (
"database/sql"
"encoding/json"
"time"
"github.com/lib/pq"
uuid "github.com/satori/go.uuid"
"go.uber.org/zap"
)
var _ ProjectRequestDAO = (*PostgresDataRequestPersister)(nil)
// ProjectRequest A data request plus raw json for the project query
type ProjectRequest struct {
DataRequest
Project json.RawMessage
}
// ProjectRequestDAO manages the data request and project relationship in storage
type ProjectRequestDAO interface {
GetProjectIDs() ([]uuid.UUID, error)
GetProject(uuid.UUID) (ProjectRequest, error)
GetAllProjects() ([]ProjectRequest, error)
CreateProject(ProjectRequest) error
UpdateProject(ProjectRequest) error
DeleteProject(uuid.UUID) error
GetActiveProjects(time.Time) ([]ProjectRequest, error)
}
// internalID queries for the project request's internal postgres ID
func (p ProjectRequest) internalID(tx *sql.Tx) (int, error) {
var prID int
err := tx.QueryRow(
"SELECT project_id FROM projects WHERE project_uuid = $1",
p.UUID.String(),
).Scan(&prID)
return prID, err
}
// create writes the project request to the projects table
func (p ProjectRequest) create(tx *sql.Tx) error {
drID, err := p.DataRequest.internalID(tx)
if err != nil {
return err
}
_, err = tx.Exec(
"INSERT INTO projects (data_request_id, project_uuid, data) VALUES ($1, $2, $3)",
drID, p.UUID.String(), p.Project,
)
if err != nil {
if pqErr, ok := err.(*pq.Error); ok {
if pqErr.Code == pqErrorCodeUniqueConstraint {
return NewAlreadyExistsError("project", p.DataRequest.UUID.String())
}
}
return err
}
return nil
}
// update updates the project request within the projects table
func (p ProjectRequest) update(tx *sql.Tx) error {
prID, err := p.internalID(tx)
if err != nil {
return err
}
result, err := tx.Exec(
"UPDATE projects SET data = $2 WHERE project_id = $1",
prID, p.Project,
)
if err != nil {
return err
}
numRowsAffected, err := result.RowsAffected()
if err != nil {
return err
} else if numRowsAffected == 0 {
return NewNotFoundError("project", p.DataRequest.UUID.String())
}
return nil
}
// delete deletes the project request in the projects table
func (p ProjectRequest) delete(tx *sql.Tx) error {
prID, err := p.internalID(tx)
if err != nil {
return err
}
result, err := tx.Exec("DELETE FROM projects WHERE project_id = $1", prID)
if err != nil {
return err
}
numRowsAffected, err := result.RowsAffected()
if err != nil {
return err
} else if numRowsAffected == 0 {
return NewNotFoundError("project", p.DataRequest.UUID.String())
}
return nil
}
//
// ProjectDAO PostgresDataRequestPersister implementation
//
// GetProjectIDS fetchces all project IDs
func (p *PostgresDataRequestPersister) GetProjectIDs() ([]uuid.UUID, error) {
var ids []uuid.UUID
rows, err := p.db.Query("SELECT project_uuid FROM projects")
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var idStr string
if err := rows.Scan(&idStr); err != nil {
return nil, err
}
id, err := uuid.FromString(idStr)
if err != nil {
p.log.Error("invalid project ID UUID in database", zap.String("project_uuid", idStr))
return nil, err
}
ids = append(ids, id)
}
if err := rows.Err(); err != nil {
return nil, err
}
return ids, nil
}
// GetProject fetches a project and data request by its id
func (p *PostgresDataRequestPersister) GetProject(id uuid.UUID) (ProjectRequest, error) {
var projRequest ProjectRequest
dataRequest, err := p.Get(id)
if err != nil {
if _, ok := err.(*NotFoundError); ok {
return projRequest, NewNotFoundError("project", id.String())
}
return projRequest, err
}
err = p.db.QueryRow(
"SELECT data FROM projects WHERE project_uuid=$1", id.String(),
).Scan(&projRequest.Project)
if err != nil {
if err == sql.ErrNoRows {
return projRequest, NewNotFoundError("project", id.String())
}
return projRequest, err
}
projRequest.DataRequest = dataRequest
return projRequest, nil
}
// GetAllProjects returns all projects
func (p *PostgresDataRequestPersister) GetAllProjects() ([]ProjectRequest, error) {
return nil, ErrNotImplemented
}
// CreateProject adds a new project and its data request, scheduling rules
// for validation and collection
func (p *PostgresDataRequestPersister) CreateProject(projRequest ProjectRequest) error {
tx, err := p.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if err := p.insertDataRequest(tx, projRequest.DataRequest); err != nil {
if _, ok := err.(*AlreadyExistsError); ok {
return NewAlreadyExistsError("project", projRequest.DataRequest.UUID.String())
}
return err
}
if err := projRequest.create(tx); err != nil {
return err
}
return tx.Commit()
}
// UpdateProject updates a project and its data request, tasking new rules
// for validation and collection or stopping collection of removed rules
func (p *PostgresDataRequestPersister) UpdateProject(projRequest ProjectRequest) error {
tx, err := p.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if err := p.updateDataRequest(tx, projRequest.DataRequest); err != nil {
if _, ok := err.(*NotFoundError); ok {
return NewNotFoundError("project", projRequest.DataRequest.UUID.String())
}
return err
}
if err := projRequest.update(tx); err != nil {
return err
}
return tx.Commit()
}
// DeleteProject removes a project and its corresponding data request and rules
func (p *PostgresDataRequestPersister) DeleteProject(id uuid.UUID) error {
var projRequest ProjectRequest
tx, err := p.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
projRequest, err = p.GetProject(id)
if err != nil {
return err
}
if err := projRequest.delete(tx); err != nil {
return err
}
if err := p.deleteDataRequest(tx, projRequest.UUID); err != nil {
return err
}
return tx.Commit()
}
// GetActiveProjects queries all active DataRequests then filters out just ProjectRequests
func (p *PostgresDataRequestPersister) GetActiveProjects(t time.Time) ([]ProjectRequest, error) {
// internal query struct
type project struct {
uuid string
data json.RawMessage
}
// get all active data requests
drs, err := p.GetActive(t)
if err != nil {
return nil, err
}
// now we need to filter just ProjectRequests
// we can use UUID since Project.UUID and DataRequest.UUID are the same
var uuids []string
for _, dr := range drs {
uuids = append(uuids, dr.UUID.String())
}
rows, err := p.db.Query("SELECT project_uuid, data FROM projects WHERE project_uuid = ANY($1)", pq.Array(uuids))
if err != nil {
return nil, err
}
defer rows.Close()
ps := []project{}
for rows.Next() {
var p project
if err := rows.Scan(&p.uuid, &p.data); err != nil {
return nil, err
}
ps = append(ps, p)
}
if err := rows.Err(); err != nil {
return nil, err
}
// match correct DataRequest to ProjectRequest
var prs []ProjectRequest
drMap := drs.ToUUIDMap()
for _, pp := range ps {
uid, err := uuid.FromString(pp.uuid)
if err != nil {
return nil, err
}
if dr, ok := drMap[uid]; ok {
pr := ProjectRequest{dr, pp.data}
prs = append(prs, pr)
}
}
return prs, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment