Created
June 9, 2018 21:03
-
-
Save Nearhan/5eea2de897c3757da89281ae225de054 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package core | |
import ( | |
"database/sql" | |
"encoding/json" | |
"time" | |
"github.com/lib/pq" | |
uuid "github.com/satori/go.uuid" | |
"go.uber.org/zap" | |
) | |
var _ ProjectRequestDAO = (*PostgresDataRequestPersister)(nil) | |
// ProjectRequest A data request plus raw json for the project query | |
type ProjectRequest struct { | |
DataRequest | |
Project json.RawMessage | |
} | |
// ProjectRequestDAO manages the data request and project relationship in storage | |
type ProjectRequestDAO interface { | |
GetProjectIDs() ([]uuid.UUID, error) | |
GetProject(uuid.UUID) (ProjectRequest, error) | |
GetAllProjects() ([]ProjectRequest, error) | |
CreateProject(ProjectRequest) error | |
UpdateProject(ProjectRequest) error | |
DeleteProject(uuid.UUID) error | |
GetActiveProjects(time.Time) ([]ProjectRequest, error) | |
} | |
// internalID queries for the project request's internal postgres ID | |
func (p ProjectRequest) internalID(tx *sql.Tx) (int, error) { | |
var prID int | |
err := tx.QueryRow( | |
"SELECT project_id FROM projects WHERE project_uuid = $1", | |
p.UUID.String(), | |
).Scan(&prID) | |
return prID, err | |
} | |
// create writes the project request to the projects table | |
func (p ProjectRequest) create(tx *sql.Tx) error { | |
drID, err := p.DataRequest.internalID(tx) | |
if err != nil { | |
return err | |
} | |
_, err = tx.Exec( | |
"INSERT INTO projects (data_request_id, project_uuid, data) VALUES ($1, $2, $3)", | |
drID, p.UUID.String(), p.Project, | |
) | |
if err != nil { | |
if pqErr, ok := err.(*pq.Error); ok { | |
if pqErr.Code == pqErrorCodeUniqueConstraint { | |
return NewAlreadyExistsError("project", p.DataRequest.UUID.String()) | |
} | |
} | |
return err | |
} | |
return nil | |
} | |
// update updates the project request within the projects table | |
func (p ProjectRequest) update(tx *sql.Tx) error { | |
prID, err := p.internalID(tx) | |
if err != nil { | |
return err | |
} | |
result, err := tx.Exec( | |
"UPDATE projects SET data = $2 WHERE project_id = $1", | |
prID, p.Project, | |
) | |
if err != nil { | |
return err | |
} | |
numRowsAffected, err := result.RowsAffected() | |
if err != nil { | |
return err | |
} else if numRowsAffected == 0 { | |
return NewNotFoundError("project", p.DataRequest.UUID.String()) | |
} | |
return nil | |
} | |
// delete deletes the project request in the projects table | |
func (p ProjectRequest) delete(tx *sql.Tx) error { | |
prID, err := p.internalID(tx) | |
if err != nil { | |
return err | |
} | |
result, err := tx.Exec("DELETE FROM projects WHERE project_id = $1", prID) | |
if err != nil { | |
return err | |
} | |
numRowsAffected, err := result.RowsAffected() | |
if err != nil { | |
return err | |
} else if numRowsAffected == 0 { | |
return NewNotFoundError("project", p.DataRequest.UUID.String()) | |
} | |
return nil | |
} | |
// | |
// ProjectDAO PostgresDataRequestPersister implementation | |
// | |
// GetProjectIDS fetchces all project IDs | |
func (p *PostgresDataRequestPersister) GetProjectIDs() ([]uuid.UUID, error) { | |
var ids []uuid.UUID | |
rows, err := p.db.Query("SELECT project_uuid FROM projects") | |
if err != nil { | |
return nil, err | |
} | |
defer rows.Close() | |
for rows.Next() { | |
var idStr string | |
if err := rows.Scan(&idStr); err != nil { | |
return nil, err | |
} | |
id, err := uuid.FromString(idStr) | |
if err != nil { | |
p.log.Error("invalid project ID UUID in database", zap.String("project_uuid", idStr)) | |
return nil, err | |
} | |
ids = append(ids, id) | |
} | |
if err := rows.Err(); err != nil { | |
return nil, err | |
} | |
return ids, nil | |
} | |
// GetProject fetches a project and data request by its id | |
func (p *PostgresDataRequestPersister) GetProject(id uuid.UUID) (ProjectRequest, error) { | |
var projRequest ProjectRequest | |
dataRequest, err := p.Get(id) | |
if err != nil { | |
if _, ok := err.(*NotFoundError); ok { | |
return projRequest, NewNotFoundError("project", id.String()) | |
} | |
return projRequest, err | |
} | |
err = p.db.QueryRow( | |
"SELECT data FROM projects WHERE project_uuid=$1", id.String(), | |
).Scan(&projRequest.Project) | |
if err != nil { | |
if err == sql.ErrNoRows { | |
return projRequest, NewNotFoundError("project", id.String()) | |
} | |
return projRequest, err | |
} | |
projRequest.DataRequest = dataRequest | |
return projRequest, nil | |
} | |
// GetAllProjects returns all projects | |
func (p *PostgresDataRequestPersister) GetAllProjects() ([]ProjectRequest, error) { | |
return nil, ErrNotImplemented | |
} | |
// CreateProject adds a new project and its data request, scheduling rules | |
// for validation and collection | |
func (p *PostgresDataRequestPersister) CreateProject(projRequest ProjectRequest) error { | |
tx, err := p.db.Begin() | |
if err != nil { | |
return err | |
} | |
defer tx.Rollback() | |
if err := p.insertDataRequest(tx, projRequest.DataRequest); err != nil { | |
if _, ok := err.(*AlreadyExistsError); ok { | |
return NewAlreadyExistsError("project", projRequest.DataRequest.UUID.String()) | |
} | |
return err | |
} | |
if err := projRequest.create(tx); err != nil { | |
return err | |
} | |
return tx.Commit() | |
} | |
// UpdateProject updates a project and its data request, tasking new rules | |
// for validation and collection or stopping collection of removed rules | |
func (p *PostgresDataRequestPersister) UpdateProject(projRequest ProjectRequest) error { | |
tx, err := p.db.Begin() | |
if err != nil { | |
return err | |
} | |
defer tx.Rollback() | |
if err := p.updateDataRequest(tx, projRequest.DataRequest); err != nil { | |
if _, ok := err.(*NotFoundError); ok { | |
return NewNotFoundError("project", projRequest.DataRequest.UUID.String()) | |
} | |
return err | |
} | |
if err := projRequest.update(tx); err != nil { | |
return err | |
} | |
return tx.Commit() | |
} | |
// DeleteProject removes a project and its corresponding data request and rules | |
func (p *PostgresDataRequestPersister) DeleteProject(id uuid.UUID) error { | |
var projRequest ProjectRequest | |
tx, err := p.db.Begin() | |
if err != nil { | |
return err | |
} | |
defer tx.Rollback() | |
projRequest, err = p.GetProject(id) | |
if err != nil { | |
return err | |
} | |
if err := projRequest.delete(tx); err != nil { | |
return err | |
} | |
if err := p.deleteDataRequest(tx, projRequest.UUID); err != nil { | |
return err | |
} | |
return tx.Commit() | |
} | |
// GetActiveProjects queries all active DataRequests then filters out just ProjectRequests | |
func (p *PostgresDataRequestPersister) GetActiveProjects(t time.Time) ([]ProjectRequest, error) { | |
// internal query struct | |
type project struct { | |
uuid string | |
data json.RawMessage | |
} | |
// get all active data requests | |
drs, err := p.GetActive(t) | |
if err != nil { | |
return nil, err | |
} | |
// now we need to filter just ProjectRequests | |
// we can use UUID since Project.UUID and DataRequest.UUID are the same | |
var uuids []string | |
for _, dr := range drs { | |
uuids = append(uuids, dr.UUID.String()) | |
} | |
rows, err := p.db.Query("SELECT project_uuid, data FROM projects WHERE project_uuid = ANY($1)", pq.Array(uuids)) | |
if err != nil { | |
return nil, err | |
} | |
defer rows.Close() | |
ps := []project{} | |
for rows.Next() { | |
var p project | |
if err := rows.Scan(&p.uuid, &p.data); err != nil { | |
return nil, err | |
} | |
ps = append(ps, p) | |
} | |
if err := rows.Err(); err != nil { | |
return nil, err | |
} | |
// match correct DataRequest to ProjectRequest | |
var prs []ProjectRequest | |
drMap := drs.ToUUIDMap() | |
for _, pp := range ps { | |
uid, err := uuid.FromString(pp.uuid) | |
if err != nil { | |
return nil, err | |
} | |
if dr, ok := drMap[uid]; ok { | |
pr := ProjectRequest{dr, pp.data} | |
prs = append(prs, pr) | |
} | |
} | |
return prs, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment