Skip to content

Instantly share code, notes, and snippets.

@Hades32
Created October 19, 2019 22:36
Show Gist options
  • Save Hades32/789a1e29a15f93ae3b3379166f32217a to your computer and use it in GitHub Desktop.
Save Hades32/789a1e29a15f93ae3b3379166f32217a to your computer and use it in GitHub Desktop.
Azure EventHub with Go experiment
package main
// MIT License
//
// Copyright (c) Martin Rauscher. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE
import (
"context"
"fmt"
"github.com/Azure/azure-event-hubs-go/v2/eph"
"os"
"time"
"github.com/Azure/azure-event-hubs-go/v2"
)
var (
connStr = os.Getenv("EVENTHUB_CONNECTIONSTRING")
checkpointer = newMemoryLeaserCheckpointer(1*time.Minute, &sharedStore{})
)
func main() {
go ehpTest("ehp-1")
go ehpTest("ehp-2")
sendOnlyHub, err := eventhub.NewHubFromConnectionString(connStr)
if err != nil {
fmt.Println("couldn't connect 2", err)
return
}
fmt.Println("connected")
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
// send a single message into a random partition
err = sendOnlyHub.Send(ctx, eventhub.NewEventFromString("hello, world! @"+time.Now().String()))
if err != nil {
fmt.Println("couldn't send", err)
return
}
fmt.Println("sent first message")
for {
fmt.Println("waiting for your input...")
line := ""
fmt.Fscanln(os.Stdin, &line)
if line == "" {
break
}
ctx, _ := context.WithTimeout(context.Background(), 20*time.Second)
// send a single message into a random partition
err = sendOnlyHub.Send(ctx, eventhub.NewEventFromString("and he's like: "+line))
if err != nil {
fmt.Println("couldn't send 2", err)
return
}
fmt.Println("sent another message")
}
fmt.Println("bye bye")
}
func ehpTest(id string) {
eventProcessorHost, err := eph.NewFromConnectionString(context.Background(), connStr, checkpointer, checkpointer, eph.WithConsumerGroup("erste-kohorte"))
if err != nil {
fmt.Println("couldn't connect 1", err)
return
}
// listen to each partition of the Event Hub
// Start receiving messages
//
// Receive blocks while attempting to connect to hub, then runs until listenerHandle.Close() is called
// <- listenerHandle.Done() signals listener has stopped
// listenerHandle.Err() provides the last error the receiver encountered
_, err = eventProcessorHost.RegisterHandler(context.Background(),
func(c context.Context, event *eventhub.Event) error {
fmt.Println(id, "got a message: ", string(event.Data))
return nil
})
if err != nil {
fmt.Println("couldnt receive", err)
return
}
fmt.Println(id, "listening")
err = eventProcessorHost.Start(context.Background())
if err != nil {
fmt.Println("couldnt start EHP", err)
}
}
package main
// MIT License
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE
import (
"context"
"errors"
"github.com/Azure/azure-event-hubs-go/v2/eph"
"sync"
"time"
"github.com/Azure/azure-amqp-common-go/v2/uuid"
"github.com/devigned/tab"
"github.com/Azure/azure-event-hubs-go/v2/persist"
)
type (
memoryLeaserCheckpointer struct {
store *sharedStore
processor *eph.EventProcessorHost
leaseDuration time.Duration
memMu sync.Mutex
leases map[string]*memoryLease
}
memoryLease struct {
eph.Lease
expirationTime time.Time
Token string
Checkpoint *persist.Checkpoint
leaser *memoryLeaserCheckpointer
}
sharedStore struct {
leases map[string]*storeLease
storeMu sync.Mutex
}
storeLease struct {
token string
expiration time.Time
ml *memoryLease
}
)
func newMemoryLease(partitionID string) *memoryLease {
lease := new(memoryLease)
lease.PartitionID = partitionID
return lease
}
func (s *sharedStore) exists() bool {
s.storeMu.Lock()
defer s.storeMu.Unlock()
return s.leases != nil
}
func (s *sharedStore) ensure() bool {
s.storeMu.Lock()
defer s.storeMu.Unlock()
if s.leases == nil {
s.leases = make(map[string]*storeLease)
}
return true
}
func (s *sharedStore) getLease(partitionID string) memoryLease {
s.storeMu.Lock()
defer s.storeMu.Unlock()
return *s.leases[partitionID].ml
}
func (s *sharedStore) deleteLease(partitionID string) {
s.storeMu.Lock()
defer s.storeMu.Unlock()
delete(s.leases, partitionID)
}
func (s *sharedStore) createOrGetLease(partitionID string) memoryLease {
s.storeMu.Lock()
defer s.storeMu.Unlock()
if _, ok := s.leases[partitionID]; !ok {
s.leases[partitionID] = new(storeLease)
}
l := s.leases[partitionID]
if l.ml != nil {
return *l.ml
}
l.ml = newMemoryLease(partitionID)
return *l.ml
}
func (s *sharedStore) changeLease(partitionID, newToken, oldToken string, duration time.Duration) bool {
s.storeMu.Lock()
defer s.storeMu.Unlock()
if l, ok := s.leases[partitionID]; ok && l.token == oldToken {
l.token = newToken
l.expiration = time.Now().Add(duration)
return true
}
return false
}
func (s *sharedStore) releaseLease(partitionID, token string) bool {
s.storeMu.Lock()
defer s.storeMu.Unlock()
if l, ok := s.leases[partitionID]; ok && l.token == token {
l.token = ""
l.expiration = time.Now().Add(-1 * time.Second)
return true
}
return false
}
func (s *sharedStore) renewLease(partitionID, token string, duration time.Duration) bool {
s.storeMu.Lock()
defer s.storeMu.Unlock()
if l, ok := s.leases[partitionID]; ok && l.token == token {
l.expiration = time.Now().Add(duration)
return true
}
return false
}
func (s *sharedStore) acquireLease(partitionID, newToken string, duration time.Duration) bool {
s.storeMu.Lock()
defer s.storeMu.Unlock()
if l, ok := s.leases[partitionID]; ok && (time.Now().After(l.expiration) || l.token == "") {
l.token = newToken
l.expiration = time.Now().Add(duration)
return true
}
return false
}
func (s *sharedStore) storeLease(partitionID, token string, ml memoryLease) bool {
s.storeMu.Lock()
defer s.storeMu.Unlock()
if l, ok := s.leases[partitionID]; ok && l.token == token {
l.ml = &ml
return true
}
return false
}
func (s *sharedStore) isLeased(partitionID string) bool {
s.storeMu.Lock()
defer s.storeMu.Unlock()
if l, ok := s.leases[partitionID]; ok {
if time.Now().After(l.expiration) || l.token == "" {
return false
}
return true
}
return false
}
// IsNotOwnedOrExpired indicates that the lease has expired and does not owned by a processor
func (l *memoryLease) isNotOwnedOrExpired(ctx context.Context) bool {
return l.IsExpired(ctx) || l.Owner == ""
}
// IsExpired indicates that the lease has expired and is no longer valid
func (l *memoryLease) IsExpired(_ context.Context) bool {
return !l.leaser.store.isLeased(l.PartitionID)
}
func (l *memoryLease) expireAfter(d time.Duration) {
l.expirationTime = time.Now().Add(d)
}
func newMemoryLeaserCheckpointer(leaseDuration time.Duration, store *sharedStore) *memoryLeaserCheckpointer {
return &memoryLeaserCheckpointer{
leaseDuration: leaseDuration,
leases: make(map[string]*memoryLease),
store: store,
}
}
func (ml *memoryLeaserCheckpointer) SetEventHostProcessor(eph *eph.EventProcessorHost) {
ml.processor = eph
}
func (ml *memoryLeaserCheckpointer) StoreExists(ctx context.Context) (bool, error) {
return ml.store.exists(), nil
}
func (ml *memoryLeaserCheckpointer) EnsureStore(ctx context.Context) error {
ml.store.ensure()
return nil
}
func (ml *memoryLeaserCheckpointer) DeleteStore(ctx context.Context) error {
return ml.EnsureStore(ctx)
}
func (ml *memoryLeaserCheckpointer) GetLeases(ctx context.Context) ([]eph.LeaseMarker, error) {
ml.memMu.Lock()
defer ml.memMu.Unlock()
partitionIDs := ml.processor.GetPartitionIDs()
leases := make([]eph.LeaseMarker, len(partitionIDs))
for idx, partitionID := range partitionIDs {
lease := ml.store.getLease(partitionID)
lease.leaser = ml
leases[idx] = &lease
}
return leases, nil
}
func (ml *memoryLeaserCheckpointer) EnsureLease(ctx context.Context, partitionID string) (eph.LeaseMarker, error) {
ml.memMu.Lock()
defer ml.memMu.Unlock()
l := ml.store.createOrGetLease(partitionID)
l.leaser = ml
return &l, nil
}
func (ml *memoryLeaserCheckpointer) DeleteLease(ctx context.Context, partitionID string) error {
ml.memMu.Lock()
defer ml.memMu.Unlock()
ml.store.deleteLease(partitionID)
return nil
}
func (ml *memoryLeaserCheckpointer) AcquireLease(ctx context.Context, partitionID string) (eph.LeaseMarker, bool, error) {
ml.memMu.Lock()
defer ml.memMu.Unlock()
lease := ml.store.getLease(partitionID)
lease.leaser = ml
uuidToken, err := uuid.NewV4()
if err != nil {
tab.For(ctx).Error(err)
return nil, false, err
}
newToken := uuidToken.String()
if ml.store.isLeased(partitionID) {
// is leased by someone else due to a race to acquire
if !ml.store.changeLease(partitionID, newToken, lease.Token, ml.leaseDuration) {
return nil, false, errors.New("failed to change lease")
}
} else {
if !ml.store.acquireLease(partitionID, newToken, ml.leaseDuration) {
return nil, false, errors.New("failed to acquire lease")
}
}
lease.Token = newToken
lease.Owner = ml.processor.GetName()
lease.IncrementEpoch()
if !ml.store.storeLease(partitionID, newToken, lease) {
return nil, false, errors.New("failed to store lease after acquiring or changing")
}
ml.leases[partitionID] = &lease
return &lease, true, nil
}
func (ml *memoryLeaserCheckpointer) RenewLease(ctx context.Context, partitionID string) (eph.LeaseMarker, bool, error) {
ml.memMu.Lock()
defer ml.memMu.Unlock()
lease, ok := ml.leases[partitionID]
if !ok {
return nil, false, errors.New("lease was not found")
}
if !ml.store.renewLease(partitionID, lease.Token, ml.leaseDuration) {
return nil, false, errors.New("unable to renew lease")
}
return lease, true, nil
}
func (ml *memoryLeaserCheckpointer) ReleaseLease(ctx context.Context, partitionID string) (bool, error) {
ml.memMu.Lock()
defer ml.memMu.Unlock()
lease, ok := ml.leases[partitionID]
if !ok {
return false, errors.New("lease was not found")
}
if !ml.store.releaseLease(partitionID, lease.Token) {
return false, errors.New("could not release the lease")
}
delete(ml.leases, partitionID)
return true, nil
}
func (ml *memoryLeaserCheckpointer) UpdateLease(ctx context.Context, partitionID string) (eph.LeaseMarker, bool, error) {
lease, ok := ml.leases[partitionID]
if !ok {
return nil, false, errors.New("lease was not found")
}
if !ml.store.renewLease(partitionID, lease.Token, ml.leaseDuration) {
return nil, false, errors.New("unable to renew lease")
}
if !ml.store.storeLease(partitionID, lease.Token, *lease) {
return nil, false, errors.New("unable to store lease after renewal")
}
return lease, true, nil
}
func (ml *memoryLeaserCheckpointer) GetCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, bool) {
ml.memMu.Lock()
defer ml.memMu.Unlock()
lease, ok := ml.leases[partitionID]
if ok {
return *lease.Checkpoint, ok
}
return persist.NewCheckpointFromStartOfStream(), ok
}
func (ml *memoryLeaserCheckpointer) EnsureCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, error) {
ml.memMu.Lock()
defer ml.memMu.Unlock()
lease, ok := ml.leases[partitionID]
if ok {
if lease.Checkpoint == nil {
checkpoint := persist.NewCheckpointFromStartOfStream()
lease.Checkpoint = &checkpoint
}
return *lease.Checkpoint, nil
}
return persist.NewCheckpointFromStartOfStream(), nil
}
func (ml *memoryLeaserCheckpointer) UpdateCheckpoint(ctx context.Context, partitionID string, checkpoint persist.Checkpoint) error {
ml.memMu.Lock()
defer ml.memMu.Unlock()
lease, ok := ml.leases[partitionID]
if !ok {
return errors.New("lease for partition isn't owned by this eph.EventProcessorHost")
}
lease.Checkpoint = &checkpoint
if !ml.store.storeLease(partitionID, lease.Token, *lease) {
return errors.New("could not store lease on update of checkpoint")
}
return nil
}
func (ml *memoryLeaserCheckpointer) DeleteCheckpoint(ctx context.Context, partitionID string) error {
ml.memMu.Lock()
defer ml.memMu.Unlock()
lease, ok := ml.leases[partitionID]
if !ok {
return errors.New("lease for partition isn't owned by this eph.EventProcessorHost")
}
checkpoint := persist.NewCheckpointFromStartOfStream()
lease.Checkpoint = &checkpoint
if !ml.store.storeLease(partitionID, lease.Token, *lease) {
return errors.New("failed to store deleted checkpoint")
}
ml.leases[partitionID] = lease
return nil
}
func (ml *memoryLeaserCheckpointer) Close() error {
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment