Created
February 5, 2021 11:21
-
-
Save navono/dec41669fb9e2386ea161cc8273e9111 to your computer and use it in GitHub Desktop.
watermill sqlite3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
stdSQL "database/sql" | |
"encoding/json" | |
"fmt" | |
"strings" | |
"github.com/pkg/errors" | |
"github.com/ThreeDotsLabs/watermill-sql/pkg/sql" | |
"github.com/ThreeDotsLabs/watermill/message" | |
) | |
type DefaultSQLite3Schema struct { | |
// GenerateMessagesTableName may be used to override how the messages table name is generated. | |
GenerateMessagesTableName func(topic string) string | |
} | |
func (s DefaultSQLite3Schema) SchemaInitializingQueries(topic string) []string { | |
createMessagesTable := strings.Join([]string{` | |
CREATE TABLE IF NOT EXISTS ` + s.MessagesTable(topic) + ` ( | |
offset INTEGER PRIMARY KEY AUTOINCREMENT, | |
uuid VARCHAR(36) NOT NULL, | |
created_at DATETIME, | |
payload TEXT DEFAULT NULL, | |
metadata TEXT DEFAULT NULL | |
)`, | |
}, "\n") | |
return []string{createMessagesTable} | |
} | |
func (s DefaultSQLite3Schema) InsertQuery(topic string, msgs message.Messages) (string, []interface{}, error) { | |
insertQuery := fmt.Sprintf( | |
`INSERT INTO %s (uuid, payload, metadata) VALUES %s`, | |
s.MessagesTable(topic), | |
strings.TrimRight(strings.Repeat(`(?,?,?),`, len(msgs)), ","), | |
) | |
args, err := defaultInsertArgs(msgs) | |
if err != nil { | |
return "", nil, err | |
} | |
return insertQuery, args, nil | |
} | |
func (s DefaultSQLite3Schema) SelectQuery(topic string, consumerGroup string, offsetsAdapter sql.OffsetsAdapter) (string, []interface{}) { | |
nextOffsetQuery, nextOffsetArgs := offsetsAdapter.NextOffsetQuery(topic, consumerGroup) | |
selectQuery := ` | |
SELECT offset, uuid, payload, metadata FROM ` + s.MessagesTable(topic) + ` | |
WHERE | |
offset > (` + nextOffsetQuery + `) | |
ORDER BY | |
offset ASC | |
LIMIT 1` | |
return selectQuery, nextOffsetArgs | |
} | |
func (s DefaultSQLite3Schema) UnmarshalMessage(row *stdSQL.Row) (offset int, msg *message.Message, err error) { | |
return unmarshalDefaultMessage(row) | |
} | |
func (s DefaultSQLite3Schema) MessagesTable(topic string) string { | |
if s.GenerateMessagesTableName != nil { | |
return s.GenerateMessagesTableName(topic) | |
} | |
return fmt.Sprintf("`watermill_%s`", topic) | |
} | |
type defaultSchemaRow struct { | |
Offset int64 | |
UUID []byte | |
Payload []byte | |
Metadata []byte | |
} | |
func defaultInsertArgs(msgs message.Messages) ([]interface{}, error) { | |
var args []interface{} | |
for _, msg := range msgs { | |
metadata, err := json.Marshal(msg.Metadata) | |
if err != nil { | |
return nil, errors.Wrapf(err, "could not marshal metadata into JSON for message %s", msg.UUID) | |
} | |
args = append(args, msg.UUID, msg.Payload, metadata) | |
} | |
return args, nil | |
} | |
func unmarshalDefaultMessage(row *stdSQL.Row) (offset int, msg *message.Message, err error) { | |
r := defaultSchemaRow{} | |
err = row.Scan(&r.Offset, &r.UUID, &r.Payload, &r.Metadata) | |
if err != nil { | |
return 0, nil, errors.Wrap(err, "could not scan message row") | |
} | |
msg = message.NewMessage(string(r.UUID), r.Payload) | |
if r.Metadata != nil { | |
err = json.Unmarshal(r.Metadata, &msg.Metadata) | |
if err != nil { | |
return 0, nil, errors.Wrap(err, "could not unmarshal metadata as JSON") | |
} | |
} | |
return int(r.Offset), msg, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment