Skip to content

Instantly share code, notes, and snippets.

@toni-moreno
Created March 27, 2020 06:51
Show Gist options
  • Save toni-moreno/bc4c1a05973923c7c011d29300a1c1b5 to your computer and use it in GitHub Desktop.
Save toni-moreno/bc4c1a05973923c7c011d29300a1c1b5 to your computer and use it in GitHub Desktop.
// Copyright 2019 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
// You may obtain a copy of the license at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// pipeline implements a one way pipe example. agent is a listening
// pull socket, and master is a dialing push socket.
//
// To use:
//
// $ go build .
// $ url=tcp://127.0.0.1:40899
// $ ./pipeline agent $url & agent=$! && sleep 1
// $ ./pipeline master $url "Hello, World."
// $ ./pipeline master $url "Goodbye."
// $ kill $agent
//
package main
import (
"fmt"
"os"
"strconv"
"sync"
"time"
"go.nanomsg.org/mangos/v3"
// register transports
"go.nanomsg.org/mangos/v3/protocol/rep"
"go.nanomsg.org/mangos/v3/protocol/req"
_ "go.nanomsg.org/mangos/v3/transport/all"
)
func die(format string, v ...interface{}) {
fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
os.Exit(1)
}
func agent(url string, agentName string, sleep int) {
var sock mangos.Socket
var err error
var msg []byte
if sock, err = rep.NewSocket(); err != nil {
die("can't get new pull socket: %s", err)
}
if err = sock.Dial(url); err != nil {
die("can't dial on push socket: %s", err.Error())
}
for {
// Could also use sock.RecvMsg to get header
msg, err = sock.Recv()
t := time.Now()
tstr := fmt.Sprintf("%02d:%02d:%02d", t.Hour(), t.Minute(), t.Second())
fmt.Printf("AGENT [%s]: AT %s RECEIVED \"%s\" Sleeping for %d seconds\n", agentName, tstr, msg, sleep)
time.Sleep(time.Duration(int64(sleep) * int64(time.Second)))
retmsg := fmt.Sprintf("AGENT [%s] %s", agentName, tstr)
err = sock.Send([]byte(retmsg))
if err != nil {
die("can't send reply: %s", err.Error())
}
}
}
func master(url string, numWorkers int) {
var wg sync.WaitGroup
var sock mangos.Socket
var err error
if sock, err = req.NewSocket(); err != nil {
die("can't get new push socket: %s", err.Error())
}
defer sock.Close()
if err = sock.Listen(url); err != nil {
die("can't listen on pull socket: %s", err.Error())
}
for i := 0; i < numWorkers; i++ {
fmt.Printf("TRIGGER WORKER[%d] Init\n", i)
wg.Add(1)
go func(i int, wg *sync.WaitGroup) {
ctx, _ := sock.OpenContext()
ctx.SetOption(mangos.OptionRetryTime, time.Duration(0))
defer wg.Done()
for {
var resp []byte
time.Sleep(time.Second / 2)
t := time.Now()
msg := fmt.Sprintf("worker[%d] - %02d:%02d:%02d", i, t.Hour(), t.Minute(), t.Second())
//fmt.Printf("TRIGGER WORKER[%d] SENDED [%s] \n", i, msg)
if err = ctx.Send([]byte(msg)); err != nil {
die("can't send message on push socket: %s", err.Error())
}
if resp, err = ctx.Recv(); err != nil {
die("can't receive resp: %s", err.Error())
}
t2 := time.Now()
msg2 := fmt.Sprintf("%02d:%02d:%02d", t2.Hour(), t2.Minute(), t2.Second())
fmt.Printf("TRIGGER WORKER[%d] SENDED MSG [%s] RECEIVED MSG [%s] AT [%s]\n", i, msg, string(resp), msg2)
}
}(i, &wg)
}
wg.Wait()
}
func main() {
if len(os.Args) > 2 && os.Args[1] == "agent" {
t, _ := strconv.Atoi(os.Args[4])
agent(os.Args[2], os.Args[3], t)
os.Exit(0)
}
if len(os.Args) > 3 && os.Args[1] == "master" {
n, _ := strconv.Atoi(os.Args[3])
master(os.Args[2], n)
os.Exit(0)
}
fmt.Fprintf(os.Stderr,
"Usage: pipeline agent|master <URL> <ARG> ...\n")
os.Exit(1)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment