Skip to content

Instantly share code, notes, and snippets.

@bojand
Last active August 8, 2017 18:05
Show Gist options
  • Save bojand/8f2441a93806cfe37bf82a35c350b82b to your computer and use it in GitHub Desktop.
Save bojand/8f2441a93806cfe37bf82a35c350b82b to your computer and use it in GitHub Desktop.
Hot R.O.D. driver service fixed
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package driver
import (
"context"
"sync"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-lib/metrics"
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"
"go.uber.org/zap"
"github.com/uber/jaeger/examples/hotrod/pkg/log"
"github.com/uber/jaeger/examples/hotrod/pkg/pool"
"github.com/uber/jaeger/examples/hotrod/services/driver/thrift-gen/driver"
)
// Server implements jaeger-demo-frontend service
type Server struct {
hostPort string
tracer opentracing.Tracer
logger log.Factory
ch *tchannel.Channel
server *thrift.Server
redis *Redis
pool *pool.Pool
}
// NewServer creates a new driver.Server
func NewServer(hostPort string, tracer opentracing.Tracer, metricsFactory metrics.Factory, logger log.Factory) *Server {
channelOpts := &tchannel.ChannelOptions{
Tracer: tracer,
}
ch, err := tchannel.NewChannel("driver", channelOpts)
if err != nil {
logger.Bg().Fatal("Cannot create TChannel", zap.Error(err))
}
server := thrift.NewServer(ch)
return &Server{
hostPort: hostPort,
tracer: tracer,
logger: logger,
ch: ch,
server: server,
redis: newRedis(metricsFactory, logger),
pool: pool.New(100),
}
}
// Run starts the Driver server
func (s *Server) Run() error {
s.server.Register(driver.NewTChanDriverServer(s))
if err := s.ch.ListenAndServe(s.hostPort); err != nil {
s.logger.Bg().Fatal("Unable to start tchannel server", zap.Error(err))
}
peerInfo := s.ch.PeerInfo()
s.logger.Bg().Info("TChannel listening", zap.String("hostPort", peerInfo.HostPort))
// Run must block, but TChannel's ListenAndServe runs in the background, so block indefinitely
select {}
}
type driverResult struct {
Driver *Driver
Err error
}
// getDrivers gets drivers from redis in parallel
func (s *Server) getDrivers(ctx context.Context, driverIDs []string) []driverResult {
results := make([]driverResult, 0, len(driverIDs))
wg := sync.WaitGroup{}
routesLock := sync.Mutex{}
for _, driverID := range driverIDs {
wg.Add(1)
s.pool.Execute(func() {
var drv Driver
var err error
var resErr error
for i := 0; i < 3; i++ {
drv, err = s.redis.GetDriver(ctx, driverID)
if err == nil {
break
}
s.logger.For(ctx).Error("Retrying GetDriver after error", zap.Int("retry_no", i+1), zap.Error(err))
}
if err != nil {
s.logger.For(ctx).Error("Failed to get driver after 3 attempts", zap.Error(err))
resErr = err
}
routesLock.Lock()
results = append(results, driverResult{
Driver: &drv,
Err: resErr,
})
routesLock.Unlock()
wg.Done()
})
}
wg.Wait()
return results
}
// FindNearestFixed implements parallel version Thrift interface TChanDriver
func (s *Server) FindNearestFixed(ctx thrift.Context, location string) ([]*driver.DriverLocation, error) {
s.logger.For(ctx).Info("Searching for nearby drivers", zap.String("location", location))
driverIDs := s.redis.FindDriverIDs(ctx, location)
drivers := s.getDrivers(ctx, driverIDs)
retMe := make([]*driver.DriverLocation, len(drivers))
for i, drvRes := range drivers {
if drvRes.Err != nil {
return nil, drvRes.Err
}
retMe[i] = &driver.DriverLocation{
DriverID: drvRes.Driver.DriverID,
Location: drvRes.Driver.Location,
}
}
return retMe, nil
}
// FindNearest implements Thrift interface TChanDriver
func (s *Server) FindNearest(ctx thrift.Context, location string) ([]*driver.DriverLocation, error) {
s.logger.For(ctx).Info("Searching for nearby drivers", zap.String("location", location))
driverIDs := s.redis.FindDriverIDs(ctx, location)
retMe := make([]*driver.DriverLocation, len(driverIDs))
for i, driverID := range driverIDs {
var drv Driver
var err error
for i := 0; i < 3; i++ {
drv, err = s.redis.GetDriver(ctx, driverID)
if err == nil {
break
}
s.logger.For(ctx).Error("Retrying GetDriver after error", zap.Int("retry_no", i+1), zap.Error(err))
}
if err != nil {
s.logger.For(ctx).Error("Failed to get driver after 3 attempts", zap.Error(err))
return nil, err
}
retMe[i] = &driver.DriverLocation{
DriverID: drv.DriverID,
Location: drv.Location,
}
}
s.logger.For(ctx).Info("Search successful", zap.Int("num_drivers", len(retMe)))
return retMe, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment