Created
September 15, 2016 03:21
-
-
Save dimiro1/b9c4d81d9c8a4eef2cbfe1228c90eeea to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"io/ioutil" | |
"net/http" | |
"time" | |
"github.com/rs/xlog" | |
) | |
// Component represents a runnable component | |
type Component interface { | |
Run(ctx context.Context) | |
} | |
// Network is a graph network | |
// has a map ot registered elements | |
type Network struct { | |
components map[string]Component | |
} | |
// Register is used to give a name to the Component, so it can be used in the Connect function | |
// ex: RegisterComponent("read", c) | |
func (n *Network) Register(name string, c Component) error { | |
if n.components == nil { | |
n.components = make(map[string]Component) | |
} | |
n.components[name] = c | |
return nil | |
} | |
// Connect connect the input and output of the given components | |
// in -> out | |
// Ex: Connect("read.out -> write.in") | |
// Panic if element is not registered??? | |
func (n *Network) Connect(definition string) error { | |
return nil | |
} | |
// Run start all components | |
func (n *Network) Run(ctx context.Context) { | |
for _, c := range n.components { | |
go c.Run(ctx) | |
} | |
} | |
type readComponent struct { | |
in chan StringWithContext `port:"in"` | |
out chan string `port:"out"` | |
err chan string `port:"error"` | |
} | |
func (r *readComponent) Run(ctx context.Context) { | |
for { | |
select { | |
case message := <-r.in: | |
log := xlog.FromContext(message.ctx) | |
log.Info("Reading", xlog.F{ | |
"message": message.in, | |
}) | |
content, err := ioutil.ReadFile(message.in) | |
if err != nil { | |
log.Error("Error", xlog.F{ | |
"error": err, | |
}) | |
r.err <- err.Error() | |
continue | |
} | |
r.out <- string(content) | |
case <-ctx.Done(): | |
return | |
} | |
} | |
} | |
type sendRequestComponent struct { | |
url chan StringWithContext `port:"url"` | |
message chan string `port:"message"` | |
err chan string `port:"err"` | |
} | |
func (s *sendRequestComponent) Run(ctx context.Context) { | |
for { | |
select { | |
case url := <-s.url: | |
response, err := http.Get(url.in) | |
if err != nil { | |
s.err <- err.Error() | |
continue | |
} | |
data, err := ioutil.ReadAll(response.Body) | |
if err != nil { | |
s.err <- err.Error() | |
continue | |
} | |
s.message <- string(data) | |
response.Body.Close() | |
case <-ctx.Done(): | |
return | |
} | |
} | |
} | |
type writeComponent struct { | |
in chan string `port:"in"` | |
} | |
func (w *writeComponent) Run(ctx context.Context) { | |
for { | |
select { | |
case content := <-w.in: | |
fmt.Println(content) | |
case <-ctx.Done(): | |
return | |
} | |
} | |
} | |
type StringWithContext struct { | |
ctx context.Context | |
in string | |
} | |
func NewStringWithContext(ctx context.Context, str string) StringWithContext { | |
return StringWithContext{ | |
ctx: ctx, | |
in: str, | |
} | |
} | |
func main() { | |
conf := xlog.Config{ | |
Level: xlog.LevelInfo, | |
Fields: xlog.F{ | |
"role": "FBP", | |
}, | |
Output: xlog.NewOutputChannel(xlog.NewConsoleOutput()), | |
} | |
logger := xlog.New(conf) | |
// Essa instaciação de componentes pode ser feita no registro/connect dos componentes | |
buffer := 10 // Opcional. Pode ser 0. | |
urlPort := make(chan StringWithContext, buffer) | |
readInPort := make(chan StringWithContext, buffer) | |
readErrPort := make(chan string, buffer) | |
writeInPort := make(chan string, buffer) | |
s := &sendRequestComponent{ | |
url: urlPort, | |
message: writeInPort, | |
} | |
read := &readComponent{ | |
in: readInPort, | |
out: writeInPort, | |
err: readErrPort, | |
} | |
write := &writeComponent{ | |
in: writeInPort, | |
} | |
errorWriter := &writeComponent{ | |
in: readErrPort, | |
} | |
ctx := xlog.NewContext(context.Background(), logger) | |
ctx, cancel := context.WithCancel(ctx) | |
defer cancel() | |
n := &Network{} | |
n.Register("SendRequest", s) | |
n.Register("Read", read) | |
n.Register("Write", write) | |
n.Register("ErrorWrite", errorWriter) | |
// Isto não faz nada por enquanto | |
n.Connect("SendRequest.message -> Write.in") | |
n.Connect("Read.out -> Write.in") | |
n.Connect("Read.err -> ErrorWrite.in") | |
n.Run(ctx) | |
files := []string{ | |
"/Users/claudemiro/my-noflo-example-app/package.json", | |
"Hello World", | |
} | |
for i := 0; i < 2; i++ { | |
readInPort <- NewStringWithContext(ctx, files[i]) | |
time.Sleep(500 * time.Millisecond) | |
} | |
urlPort <- NewStringWithContext(ctx, "https://httpbin.org/ip") | |
time.Sleep(2 * time.Second) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment