Last active
February 13, 2020 08:26
-
-
Save sachinsu/bc1742a2a0cd060239473331b65e6f65 to your computer and use it in GitHub Desktop.
Using Go Routines to evaluate Oracle Advance Queuing Support using godror library
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"database/sql" | |
"fmt" | |
"strings" | |
"time" | |
godror "github.com/godror/godror" | |
"golang.org/x/sync/errgroup" | |
) | |
func main() { | |
ctx, _ := context.WithTimeout(context.Background(), 30*time.Second) | |
// Open database connection | |
db, err := sql.Open("godror", "dummy/dummy@orclcdb") | |
if err != nil { | |
fmt.Println(err) | |
return | |
} | |
defer db.Close() | |
var user string | |
if err = db.QueryRowContext(ctx, "SELECT USER FROM DUAL").Scan(&user); err != nil { | |
fmt.Println("Error running user query") | |
fmt.Println(err) | |
return | |
} | |
// setup Queue & Queue table | |
const qName = "TEST_Q" | |
const qTblName = qName + "_TBL" | |
qry := `DECLARE | |
tbl CONSTANT VARCHAR2(61) := '` + user + "." + qTblName + `'; | |
q CONSTANT VARCHAR2(61) := '` + user + "." + qName + `'; | |
BEGIN | |
BEGIN SYS.DBMS_AQADM.stop_queue(q); EXCEPTION WHEN OTHERS THEN NULL; END; | |
BEGIN SYS.DBMS_AQADM.drop_queue(q); EXCEPTION WHEN OTHERS THEN NULL; END; | |
BEGIN SYS.DBMS_AQADM.drop_queue_table(tbl); EXCEPTION WHEN OTHERS THEN NULL; END; | |
SYS.DBMS_AQADM.CREATE_QUEUE_TABLE(tbl, 'RAW'); | |
SYS.DBMS_AQADM.CREATE_QUEUE(q, tbl); | |
SYS.DBMS_AQADM.grant_queue_privilege('ENQUEUE', q, '` + user + `'); | |
SYS.DBMS_AQADM.grant_queue_privilege('DEQUEUE', q, '` + user + `'); | |
SYS.DBMS_AQADM.start_queue(q); | |
END;` | |
if _, err = db.ExecContext(ctx, qry); err != nil { | |
if !strings.Contains(err.Error(), "PLS-00201: 'SYS.DBMS_AQADM'") { | |
fmt.Println(err) | |
return | |
} | |
} | |
defer func() { | |
// Drop Queue | |
db.ExecContext( | |
context.Background(), | |
`DECLARE | |
tbl CONSTANT VARCHAR2(61) := USER||'.'||:1; | |
q CONSTANT VARCHAR2(61) := USER||'.'||:2; | |
BEGIN | |
BEGIN SYS.DBMS_AQADM.stop_queue(q); EXCEPTION WHEN OTHERS THEN NULL; END; | |
BEGIN SYS.DBMS_AQADM.drop_queue(q); EXCEPTION WHEN OTHERS THEN NULL; END; | |
BEGIN SYS.DBMS_AQADM.drop_queue_table(tbl); EXCEPTION WHEN OTHERS THEN NULL; | |
END;`, | |
qTblName, qName, | |
) | |
}() | |
// Create Error Group | |
g, gctx := errgroup.WithContext(ctx) | |
q, _ := godror.NewQueue(gctx, db, qName, "") | |
defer q.Close() | |
// Goroutine for Producer | |
g.Go(func() error { | |
ticker := time.NewTicker(50 * time.Millisecond) | |
counter := 1 | |
for { | |
select { | |
case <-ticker.C: | |
tx, _ := db.Begin() | |
fmt.Printf("%d\n", counter) | |
message := godror.Message{Raw: []byte(fmt.Sprintf("Message %d", counter))} | |
counter += 1 | |
q.Enqueue([]godror.Message{message}) | |
tx.Commit() | |
case <-gctx.Done(): | |
fmt.Printf("closing writer goroutine\n") | |
return gctx.Err() | |
} | |
} | |
}) | |
// Goroutine for Consumer | |
g.Go(func() error { | |
ticker := time.NewTicker(1 * time.Second) | |
for { | |
select { | |
case <-ticker.C: | |
tx, _ := db.Begin() | |
messages := make([]godror.Message, 30) | |
n, _ := q.Dequeue(messages) | |
tx.Commit() | |
for i := 0; i < n; i++ { | |
fmt.Printf("Received message: %s\n", string(messages[i].Raw)) | |
} | |
case <-gctx.Done(): | |
fmt.Printf("closing reader goroutine\n") | |
return gctx.Err() | |
} | |
} | |
}) | |
if err := g.Wait(); err == nil { | |
fmt.Println("finished clean") | |
} else { | |
fmt.Printf("received error: %v", err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment