MyDocs

# Golangの並列/並行処理でいろいろ

並列 / 並行 処理

channel を使う

package main

import (
	"fmt"
	"time"
)

func process(num int, str string) {
	for i := 0; i < num; i++ {
		time.Sleep(1 * time.Second)
		fmt.Println(i, str)
	}
}

func main() {
	fmt.Println("Start")
	process(2, "A")
	process(2, "B")
	process(2, "C")
	fmt.Println("Finish")
}

// Start
// 0 A
// 1 A
// 0 B
// 1 B
// 0 C
// 1 C
// Finish
// 
// ________________________________________________________
// Executed in    6.30 secs      fish           external
//    usr time  192.00 millis  172.00 micros  191.83 millis
//    sys time  201.91 millis  759.00 micros  201.15 millis

process の処理を並列化する。

package main

import (
	"fmt"
	"time"
)

func process(num int, str string) {
	for i := 0; i < num; i++ {
		time.Sleep(1 * time.Second)
		fmt.Println(i, str)
	}
}


func main() {
	chA := make(chan bool)
	chB := make(chan bool)
	chC := make(chan bool)
	fmt.Println("Start")

	go func() {
		process(2, "A")
		chA <- true
	}()

	go func() {
		process(2, "B")
		chB <- true
	}()

	go func() {
		process(2, "C")
		chC <- true
	}()

	<-chA
	<-chB
	<-chC

	fmt.Println("Finish")
}

// Start
// 0 B
// 0 A
// 0 C
// 1 C
// 1 A
// 1 B
// Finish
// 
// ________________________________________________________
// Executed in    2.29 secs      fish           external
//    usr time  194.02 millis  178.00 micros  193.84 millis
//    sys time  207.43 millis  862.00 micros  206.57 millis

sync.WaitGroup を使う

package main

import "fmt"

type Item struct {
	Id   int
	Name string
}

func execLoop(list []Item) {
	for _, item := range list {
		doSomething(item)
	}
}

func doSomething(item Item) {
	item.Id += 10
	fmt.Println(item)
}

func main() {
	list := []Item{
		{Id: 1, Name: "item1"},
		{Id: 2, Name: "item2"},
		{Id: 3, Name: "item3"},
		{Id: 4, Name: "item4"},
		{Id: 5, Name: "item5"},
	}

	execLoop(list)
}

// {11 item1}
// {12 item2}
// {13 item3}
// {14 item4}
// {15 item5}

ループ内の処理を並列化する。

package main

import (
	"fmt"
	"sync"
)

type Item struct {
	Id   int
	Name string
}

func execLoop(list []Item) {
	var wg sync.WaitGroup
	for _, item := range list {
		wg.Add(1)
		go func(item2 Item) {
			defer wg.Done()
			doSomething(item2)
		}(item)
	}
	wg.Wait()
}

func doSomething(item Item) {
	item.Id += 10
	fmt.Println(item)
}

func main() {
	list := []Item{
		{Id: 1, Name: "item1"},
		{Id: 2, Name: "item2"},
		{Id: 3, Name: "item3"},
		{Id: 4, Name: "item4"},
		{Id: 5, Name: "item5"},
	}

	execLoop(list)
}

// {15 item5}
// {11 item1}
// {14 item4}
// {13 item3}
// {12 item2}

sync.Mutex を使う

package main

import "fmt"

type myClass struct {
	AttributeName string
}

func main() {
	sourceSlice := make([]myClass, 100)
	destSlice := make([]myClass, 0)

	for _, myObj := range sourceSlice {
		var tmpObj myClass
		tmpObj.AttributeName = myObj.AttributeName
		destSlice = append(destSlice, tmpObj)
	}
	fmt.Println(len(destSlice))
}

// 100

sync.WaitGroup を使う。 ( ダメな例 ) append はスレッドセーフではないので件数が減る。

package main

import (
	"fmt"
	"sync"
)

type myClass struct {
	AttributeName string
}

func main() {
	sourceSlice := make([]myClass, 100)
	destSlice := make([]myClass, 0)

	var wg sync.WaitGroup
	for _, myObj := range sourceSlice {
		wg.Add(1)
		go func(myObj2 myClass) {
			defer wg.Done()
			var tmpObj myClass
			tmpObj.AttributeName = myObj2.AttributeName
			destSlice = append(destSlice, tmpObj)
		}(myObj)
	}
	wg.Wait()
	fmt.Println(len(destSlice))
}

// 75

-race を付けることで競合のチェックができる。

$ go run -race main.go

~~ 省略 ~~
==================
97

sync.Mutex を使う。

package main

import (
	"fmt"
	"sync"
)

type myClass struct {
	AttributeName string
}

func main() {
	sourceSlice := make([]myClass, 100)
	destSlice := make([]myClass, 0)

	var wg sync.WaitGroup
	mu := &sync.Mutex{}
	for _, myObj := range sourceSlice {
		wg.Add(1)
		go func(myObj2 myClass) {
			defer wg.Done()
			var tmpObj myClass
			tmpObj.AttributeName = myObj2.AttributeName
			mu.Lock()
			destSlice = append(destSlice, tmpObj)
			mu.Unlock()
		}(myObj)
	}
	wg.Wait()
	fmt.Println(len(destSlice))
}

// 100

ポーリング

len(q) は溜まったバッファ数を返す。 make で作るときはバッファ数を 2 以上で作らないと len(q) は常に 0 を返す。

package main

import (
	"fmt"
	"time"
)

func main() {
	q := make(chan struct{}, 2)

	go func() {
		// 重たい処理
		time.Sleep(3 * time.Second)
		q <- struct{}{}
	}()

	for {
		if len(q) > 0 {
			fmt.Println("完了")
			break
		}
		time.Sleep(1 * time.Second)
		fmt.Println("実行中")
	}
}

// 実行中
// 実行中
// 実行中
// 完了

ワーカー

close(q) されたら str, ok := <- qokfalse になる。

package main

import (
	"fmt"
	"sync"
	"time"
)

func printString(wg *sync.WaitGroup, q chan string) {
	defer wg.Done()

	for {
		str, ok := <-q
		if !ok {
			return
		}

		fmt.Println(str)
		time.Sleep(3 * time.Second)
	}
}

func main() {
	const workerNum = 3
	var wg sync.WaitGroup
	q := make(chan string, 5)

	for i := 0; i < workerNum; i++ {
		wg.Add(1)
		go printString(&wg, q)
	}

	q <- "test1"
	q <- "test2"
	q <- "test3"
	q <- "test4"
	q <- "test5"
	close(q)
	wg.Wait()
}

// test2
// test1
// test3
// test5
// test4