Skip to content

Instantly share code, notes, and snippets.

@hayajo
Last active August 29, 2015 14:05
Show Gist options
  • Save hayajo/f8917f39c6f0c0828c21 to your computer and use it in GitHub Desktop.
Save hayajo/f8917f39c6f0c0828c21 to your computer and use it in GitHub Desktop.
NDS#37 ゴルーチンと並行性パターン

ゴルーチンと並行性パターン

第37回勉強会(2014/08/09) - 長岡 IT開発者 勉強会(NDS) 発表資料

Usage

  • Basic

      $ go get code.google.com/p/go.tools/cmd/present
      $ present -orighost="localhost"
    

    Go to the following URL: http://localhost:3999/_main.slide

  • Vagrant

      $ vagrant up
      $ vagrant ssh -c 'cd /vagrant && GOPATH=/usr/local present -http="0.0.0.0:3999" -orighost="localhost"'
    

    Go to the following URL: http://localhost:3999/_main.slide

  • Docker

      $ docker build -t 'nds37:go-present' .
      $ docker run -d --net="host" --name="go-present" -v `pwd`:/mnt nds37:go-present
    

    Go to the following URL: http://localhost:3999/_main.slide

ゴルーチンと並行性パターン
長岡 IT開発者 勉強会(NDS) - 第37回勉強会
9 Aug 2014
@hayajo
* Goにおける並行性基礎
* ゴルーチン
並行性のための基本機能。スレッドのようなもので、とても軽量。
goステートメントで生成される。
- 関数
.code -edit goroutine.go /START1/,/END1/
- メソッド
.code -edit goroutine.go /START2/,/END2/
- クロージャ
.code -edit goroutine.go /START3/,/END3/
* ゴルーチン: Example
.play -edit goroutine.go HL1
* 同期
syncパッケージで提供される基本的な同期機能。
- sync.Mutex (ミューテックス)
- sync.Once (pthread_once())
- sync.WaitGroup (カウンタセマフォ)
- sync.Pool (フリーリスト)
* 同期: Example - sync.WaitGroup
.play -edit sync-waitgroup.go
* チャネル
ゴルーチン間の通信を提供。双方向パイプに近い。メッセージ型を指定して利用する。
.code -edit channel.go /START1/,/END1/
送受信はバッファサイズ分でブロックされる。(デフォルトはバッファなし)
.code -edit channel.go /START2/,/END2/ HL1
引数や戻り値にも使用できる。
* チャネル: Example
.play -edit channel.go HL1
* 並行性パターン
* Bockground jobs
.play -numbers -edit background-jobs.go /START/,/END/
* Unbuffered channel
.code -numbers -edit unbuffered-channel.go /START1/,/END1/
.play -numbers -edit unbuffered-channel.go /START2/,/END2/
* Buffered channel
.play -numbers -edit buffered-channel.go /START/,/END OMIT/
* Reading from multiple channels: select
.play -numbers -edit select.go /START/,/END/
* Consume all values from a channel: range
.play -numbers -edit range.go /START/,/END/
* Signaling
.play -numbers -edit signaling.go /START/,/END/
チャネルからの読み込みは値とチャネルの状態を返す。
value, isOpen := <- channel
チャネルが閉じられている場合、値はゼロ値、状態はfalseとなり読み込みはブロックされない。
* Safe counter(1)
.code -numbers -edit safe-counter.go /START1/,/END1/
* Safe counter(2)
.play -numbers -edit safe-counter.go /START2/,/END2/
単純なカウンタであれば sync.atomic パッケージでも実現可能。
* Timer
.play -numbers -edit timer.go /START/,/END/
- time.After
- time.Tick
- time.Sleep
* Fan-out(1)
処理を分散させる
.code -numbers -edit fan-out.go /START1/,/END1/
* Fan-out(2)
.play -numbers -edit fan-out.go /START2/,/END2/
* Fan-in(1)
チャネルを多重化する
.code -numbers -edit fan-in.go /START1/,/END1/
* Fan-in(2)
.code -numbers -edit fan-in.go /START2/,/END2/
.play -numbers -edit fan-in.go /START3/,/END3/
* Coroutine
[[http://ja.wikipedia.org/wiki/%E3%82%B3%E3%83%AB%E3%83%BC%E3%83%81%E3%83%B3][コルーチン - Wikipedia]]
.code -numbers -edit coroutine.go /START1/,/END1/
.play -numbers -edit coroutine.go /START2/,/END2/
* Generator
[[http://ja.wikipedia.org/wiki/%E3%82%B8%E3%82%A7%E3%83%8D%E3%83%AC%E3%83%BC%E3%82%BF][ジェネレータ (プログラミング) - Wikipedia]]
.code -numbers -edit generator.go /START1/,/END1/
.play -numbers -edit generator.go /START2/,/END2/
* Future(1)
[[http://ja.wikipedia.org/wiki/Future][future - Wikipedia]]
.code -numbers -edit promise.go /START1/,/END1/
* Future(2)
.play -numbers -edit promise.go /START2/,/END2/
* Actor model(1)
[[http://ja.wikipedia.org/wiki/%E3%82%A2%E3%82%AF%E3%82%BF%E3%83%BC%E3%83%A2%E3%83%87%E3%83%AB][アクターモデル - Wikipedia]]
.code -numbers -edit actor-model.go /START1/,/END1/
* Actor model(2)
.code -numbers -edit actor-model.go /START2/,/END2/
* Actor model(3)
.play -numbers -edit actor-model.go /START3/,/END3/
* Appendix
* 並列度をあげる
デフォルトは1。変更は runtime.GOMAXPROCS(int) で行う。
runtime.GOMAXPROCS(2)
CPUコア数は runtime.NumCPU() で取得する。
runtime.GOMAXPROCS(runtime.NumCPU())
* 現在のゴルーチン数を取得
.play -edit numgoroutine.go /START/,/END/
* 現在のメモリ情報を取得
.play -edit memstats.go /START/,/END/
package main
import "log"
// START1 OMIT
type actor chan<- interface{} // HL
type PutReq struct {
key, val string
}
type GetReq struct {
key string
ack chan<- string // HL
}
// END1 OMIT
// START2 OMIT
func NewMapActor() actor {
a := make(chan interface{})
go func() {
m := make(map[string]string)
for {
switch r := (<-a).(type) { // HL
case PutReq: // HL
m[r.key] = r.val
case GetReq: // HL
r.ack <- m[r.key]
}
}
}()
return actor(a) // HL
}
// END2 OMIT
// START3 OMIT
func main() {
a := NewMapActor()
a <- PutReq{key: "hoge", val: "HOGEHOGE"} // HL
ack := make(chan string)
a <- GetReq{key: "hoge", ack: ack} // HL
log.Println(<-ack)
}
// END3 OMIT
package main
import (
"fmt"
"strings"
"sync"
)
func main() {
// START OMIT
var wg sync.WaitGroup // HL
args := []string{"foo", "bar", "baz"}
for _, a := range args {
wg.Add(1) // HL
go func(str string) {
fmt.Println(strings.ToUpper(str))
wg.Done() // HL
}(a)
}
wg.Wait() // waiting for all goroutines to finish // HL
// END OMIT
}
package main
import (
"log"
"math/rand"
"time"
)
func main() {
// START OMIT
limCh := make(chan struct{}, 3) // HL
for i := 0; i < 10; i++ {
limCh <- struct{}{} // HL
go func(i int) {
defer func() { <-limCh }() // HL
log.Println("BEGIN", i)
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
log.Println("END", i)
}(i)
}
for i := 0; i < cap(limCh); i++ { // HL
limCh <- struct{}{} // HL
} // HL
// END OMIT
}
package main
import "fmt"
func main() {
name := "Gopher"
// START1 OMIT
doneCh := make(chan struct{}) // HL1
// END1 OMIT
// START2 OMIT
go func() {
fmt.Printf("Hello, %s\n", name)
doneCh <- struct{}{} // blocking until ready to read // HL1
}()
<-doneCh // blocking until a value is received // HL1
// END2 OMIT
}
package main
import "log"
// START1 OMIT
func hello(yield chan<- string) {
go func() {
yield <- "Hello" // HL
yield <- "Gopher" // HL
yield <- "Hello NDS" // HL
}()
}
// END1 OMIT
func main() {
// START2 OMIT
co := make(chan string)
hello(co)
log.Println(<-co, "World")
log.Println("Hello", <-co)
log.Println(<-co)
// END2 OMIT
}
package main
import "log"
func filter(in, out chan int) {
out <- 1 + <-in
}
func main() {
begin := make(chan int)
in := begin
var out chan int
for i := 0; i < 10; i++ {
out = make(chan int)
go filter(in, out)
in = out
}
begin <- 0
log.Println(<-out)
}
FROM ubuntu:trusty
RUN apt-get update && apt-get install -y golang mercurial
ENV GOPATH /usr/local
RUN go get code.google.com/p/go.tools/cmd/present
EXPOSE 3999
CMD cd /mnt && present -orighost="localhost"
package main
import (
"bytes"
"log"
"sync"
)
// START1 OMIT
func fanIn(ch ...<-chan byte) <-chan byte {
var wg sync.WaitGroup
outCh := make(chan byte) // HL
for _, c := range ch { // HL
wg.Add(1)
go func(c <-chan byte) {
for b := range c {
outCh <- b // HL
}
wg.Done()
}(c)
}
go func() {
wg.Wait()
close(outCh)
}()
return outCh // HL
}
// END1 OMIT
func main() {
// START2 OMIT
ch1 := make(chan byte)
ch2 := make(chan byte)
go func() {
defer func() {
close(ch1)
close(ch2)
}()
for i := 0; i < 26; i++ {
ch1 <- byte(65 + i) // A..Z
ch2 <- byte(97 + i) // a..z
}
}()
// END2 OMIT
// START3 OMIT
var b bytes.Buffer
outCh := fanIn(ch1, ch2) // HL
for v := range outCh { // HL
b.WriteByte(v)
}
log.Println(b.String())
// END3 OMIT
}
package main
import (
"log"
"sync"
)
func main() {
// START1 OMIT
outCh := make(chan byte) // HL
go func() {
defer func() {
close(outCh)
}()
for i := 0; i < 26; i++ {
outCh <- byte(65 + i) // A..Z
}
}()
// END1 OMIT
var wg sync.WaitGroup
// START2 OMIT
startCh := make(chan struct{})
for i := 0; i < 3; i++ {
wg.Add(1)
go func(i int) { // HL
<-startCh
for b := range outCh { // HL
log.Printf("#%d %s", i, string(b))
}
wg.Done()
}(i)
}
close(startCh)
// END2 OMIT
wg.Wait()
}
package main
import (
"log"
"math/rand"
"time"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
// START1 OMIT
func generator(n int) <-chan int {
yield := make(chan int) // HL
go func() {
for i := 0; i < n; i++ {
yield <- rand.Int() // HL
}
close(yield)
}()
return yield
}
// END1 OMIT
func main() {
// START2 OMIT
for v := range generator(100) { // HL
log.Println(v)
}
// END2 OMIT
}
package main
import (
"log"
"os"
"time"
)
func logging(fmt string, args ...interface{}) {
log.Printf(fmt, args...)
}
func main() {
name := "Gopher"
// START1 OMIT
go logging("Hello, %s\n", name) // HL1
// END1 OMIT
logger := log.New(os.Stdout, "", log.Ldate | log.Ltime)
// START2 OMIT
go logger.Printf("Hello, %s\n", name) // HL1
// END2 OMIT
// START3 OMIT
go func() { // HL1
log.Printf("Hello, %s\n", name) // HL1
}() // HL1
// END3 OMIT
time.Sleep(100 * time.Millisecond)
}
package main
import (
"encoding/json"
"fmt"
"runtime"
)
func main() {
// START OMIT
var m runtime.MemStats
runtime.ReadMemStats(&m)
// END OMIT
b, _ := json.MarshalIndent(m, "", " ")
fmt.Println(string(b))
}
package main
import (
"log"
"runtime"
)
func main() {
// START OMIT
log.Printf("running %d goroutines", runtime.NumGoroutine())
// END OMIT
}
package main
import (
"fmt"
"log"
"strconv"
"sync"
)
// START1 OMIT
type futureUint64 struct {
value uint64
ch chan uint64
once sync.Once
}
func (f *futureUint64) String() string {
f.once.Do(func() { f.value = <-f.ch }) // HL
return strconv.FormatUint(f.value, 10)
}
func Fibonacci(n uint64) fmt.Stringer {
f := new(futureUint64)
f.ch = make(chan uint64)
go func() { // HL
f.ch <- fib(n) // HL
}() // HL
return f
}
// END1 OMIT
// START2 OMIT
func fib(n uint64) uint64 {
switch n {
case 0:
return 0
case 1, 2:
return 1
default:
return fib(n-1) + fib(n-2)
}
}
func main() {
f := Fibonacci(20)
log.Println("The 20th Fibonacci number is ", f)
}
// END2 OMIT
package main
import (
"log"
"time"
)
func main() {
// START OMIT
ch := make(chan int) // HL
go func() {
defer func() {
close(ch) // HL
}()
for i := 0; i < 10; i++ {
ch <- i
time.Sleep(time.Second)
}
}()
for v := range ch { // HL
log.Println(v)
}
// END OMIT
}
package main
import (
"log"
"sync"
)
// START1 OMIT
func counter(ch chan<- int64) {
go func() {
var c int64 = 1
for {
ch <- c // HL
c++
}
}()
}
// END1 OMIT
func main() {
// START2 OMIT
var wg sync.WaitGroup
cntCh := make(chan int64) // HL
counter(cntCh)
start := make(chan struct{})
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(i int) {
<-start
log.Printf("#%03d: % 4d\n", i, <-cntCh) // HL
wg.Done()
}(i)
}
close(start)
wg.Wait()
// END2 OMIT
}
package main
import "log"
func main() {
// START OMIT
cntCh := make(chan int) // HL
sumCh := make(chan int) // HL
go func() {
var s int
for i := 0; i < 1000; i++ {
cntCh <- i
s += i
}
sumCh <- s
}()
loop: // HL
for { // HL
select { // HL
case c := <-cntCh: // HL
log.Println(c)
case s := <-sumCh: // HL
log.Println("SUM", s)
break loop // HL
}
}
// END OMIT
}
package main
import (
"log"
"sync"
)
func main() {
// START OMIT
var wg sync.WaitGroup
startCh := make(chan struct{})
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(i int) {
<-startCh // blocking until the channel is closed // HL
log.Println(i)
wg.Done()
}(i)
}
close(startCh) // closing the channel in order to start goroutines // HL
wg.Wait()
// END OMIT
}
package main
import (
"fmt"
"sync"
)
func main() {
name := "Gopher"
var wg sync.WaitGroup // HL
wg.Add(1) // HL
go func() {
defer wg.Done() // HL
fmt.Printf("Hello, %s\n", name)
}()
wg.Wait() // HL
}
package main
import (
"container/ring"
"fmt"
"log"
"time"
)
var aa []string = []string{
"ヽ(゚∀゚)ノ",
"( ゚∀)ノ",
"(  ゚)ノ",
"ヽ(  )ノ",
"ヽ(゚  )",
"ヽ(∀゚ )ノ",
}
func main() {
ch := make(chan string)
go func() {
r := ring.New(len(aa))
for _, s := range aa {
r.Value = s
r = r.Next()
}
for {
r = r.Next()
ch <- fmt.Sprintf("%v", r.Value)
time.Sleep(100 * time.Millisecond)
}
}()
// START OMIT
timeoutCh := time.After(5 * time.Second) // HL
loop:
for {
select {
case v := <-ch:
fmt.Print("\x0c") // New Page
fmt.Println(v)
case <-timeoutCh: // HL
log.Println("timed out")
break loop
}
}
// END OMIT
}
package main
import (
"fmt"
"math/rand"
"time"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
// START1 OMIT
func from(conCh chan<- int) {
conCh <- rand.Intn(100) // blocking until ready to read // HL
}
func to(conCh <-chan int) {
fmt.Printf("Received %d\n", <-conCh) // blocking until a value is received // HL
}
// END1 OMIT
func main() {
// START2 OMIT
conCh := make(chan int) // HL
go from(conCh)
go to(conCh)
// END2 OMIT
time.Sleep(100 * time.Millisecond)
}
# -*- mode: ruby -*-
# vi: set ft=ruby :
Vagrant.configure(2) do |config|
config.vm.box = "ubuntu/trusty64"
config.vm.network "forwarded_port", guest: 3999, host: 3999
config.vm.provision "shell", inline: <<-EOS
set -x
if [ ! `which go` ]; then
apt-get update
apt-get install -y golang mercurial
fi
export GOPATH=/usr/local
go get code.google.com/p/go.tools/cmd/present
EOS
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment