14 minute read

concurrency pattern 整理

generator

返回一個可以持續產生數值的 channel

package main

import (
	"fmt"
)

func Generator() <-chan int {

	ch := make(chan int, 0)

	go func() {

		for i := 0; i < 10; i++ {
			ch <- i
		}

		close(ch)
	}()

	return ch

}

func main() {

	ch := Generator()

	for val := range ch {
		fmt.Println(val)
	}

	fmt.Println("done")

}
0
1
2
3
4
5
6
7
8
9
done

Process finished with the exit code 0

channel done

類似 generator,產生一個可持續輸出的channel, 但可以在從外部控制何時結束


package main

import (
	"context"
	"fmt"
	"time"
)

func DataGenerator(ctx context.Context) <-chan int {

	ch := make(chan int, 0)

	go func() {
		i := 0
		defer close(ch)

		for {

			select {

			case <-ctx.Done():
				fmt.Println("finish...")
				return
			case ch <- i:
				time.Sleep(time.Millisecond * 100)
				i += 1
			}

		}

	}()

	return ch

}

func main() {

	ctx, cancel := context.WithCancel(context.Background())

	ch := DataGenerator(ctx)

	for val := range ch {

		fmt.Println(val)

		if val == 10 {
			cancel()
			break
		}
	}

	fmt.Println("done")

}

0
1
2
3
4
5
6
7
8
9
10
done

pipeline

簡單說就是將多個channel串接在一起,每個channel都有自己的處理程序, 之後從最終端channel取得結果

graph LR
    A[DataChannel] --> B[ProcesssChannel1]
    B --> C[ProcessChannel2]
    C --> D[ProcessChannelN...]

以下範例

graph LR
    A[DataChannel] --> B[Add 10]
    B --> C[Add 20]
package main

import (
	"context"
	"fmt"
)

func Generator(ctx context.Context) <-chan int {

	ch := make(chan int, 0)

	go func() {
		defer close(ch)
		i := 0
		for {
			select {
			case <-ctx.Done():
				fmt.Println("finish...")
			case ch <- i:
				i += 1
			}
		}
	}()

	return ch

}

func ADD(ctx context.Context, inputChan <-chan int, AddValue int) <-chan int {

	resChan := make(chan int, 0)

	go func() {
		defer close(resChan)

		for {
			select {
			case <-ctx.Done():
				fmt.Println("finish... Add...")
				return
			case val, ok := <-inputChan:
				if !ok {
					return
				}
				resChan <- AddValue + val
			}
		}
	}()

	return resChan
}

func main() {

	ctx, cancel := context.WithCancel(context.Background())

	ch := Generator(ctx)
	ch2 := ADD(ctx, ch, 10)
	ch3 := ADD(ctx, ch2, 20)

	for val := range ch3 {
		fmt.Println(val)
		if val == 35 {
			cancel()
			break
		}
	}

	fmt.Println("done")

}
30
31
32
33
34
35
done

fan in

將多個channel的輸出合併成一個channel


graph LR
    A[DataChannel1] --> D[FanInChannel]
    B[DataChannel2] --> D
    C[DataChannelN...] --> D

範例

graph LR
    A["DataChannel1 (0-4) "] --> D["FanInChannel (0,1,2,3,4,100,101,102,103,104)"]
    B["DataChannel2 (100-104)"] --> D
package main

import "sync"

func Generator1() <-chan int {

	ch := make(chan int, 0)

	go func() {

		for i := 0; i < 5; i++ {
			ch <- i
		}

		close(ch)
	}()

	return ch

}

func Generator2() <-chan int {

	ch := make(chan int, 0)

	go func() {

		for i := 100; i < 105; i++ {
			ch <- i
		}

		close(ch)
	}()

	return ch

}

func FanIn(wg *sync.WaitGroup, chs ...<-chan int) chan int {

	out := make(chan int, 0)
	go func() {

		defer close(out)

		for _, ch := range chs {
			wg.Add(1)

			go func(ch <-chan int) {
				for val := range ch {
					out <- val
				}
				wg.Done()
			}(ch)

		}

		wg.Wait()

	}()

	return out

}

func main() {

	wg := sync.WaitGroup{}

	ch1 := Generator1()
	ch2 := Generator2()

	out := FanIn(&wg, ch1, ch2)

	for val := range out {
		println(val)
	}

	println("done")
}
100
0
1
2
3
4
101
102
103
104
done

fan out

將一個channel的資料分發到多個channel

graph LR
    A[DataChannel] --> B[FanChannel1]
    A --> C[FanOutChannel2]
    A --> D[FanOutChannelN...]

範例

實際上分配不一定會平均, 有一定數量時, 分配會接近

graph LR
    A["DataChannel (0-59)"] --> B["FanOutChannel1 (0,3,6...)"]
    A --> C["FanOutChannel2 (1,4,7...)"]
    A --> D["FanOutChannel3 (2,5,8....)"]
package main

import (
	"fmt"
	"sync"
)

func Generator() <-chan string {

	ch := make(chan string, 0)

	go func() {

		for i := 0; i < 60; i++ {
			ch <- fmt.Sprintf("data: %d", i)
		}
		close(ch)
	}()

	return ch
}

func FanOut(SourceChan <-chan string, Name string) <-chan string {

	splitchan := make(chan string, 0)

	go func() {

		for val := range SourceChan {
			splitchan <- fmt.Sprintf("%v assgin to %v", val, Name)
		}
		close(splitchan)

	}()

	return splitchan

}

func main() {

	ch := Generator()
	wp := &sync.WaitGroup{}
	splitchan1 := FanOut(ch, "fan1")
	splitchan2 := FanOut(ch, "fan2")
	splitchan3 := FanOut(ch, "fan3")
	static_num := make(chan int, 3)

	wp.Add(3)
	go func() {
		count := 0
		for val := range splitchan1 {
			fmt.Println(val)
			count += 1
		}
		static_num <- count
		wp.Done()

	}()

	go func() {
		count := 0
		for val := range splitchan2 {
			fmt.Println(val)
			count += 1
		}
		static_num <- count
		wp.Done()
	}()

	go func() {
		count := 0
		for val := range splitchan3 {
			fmt.Println(val)
			count += 1
		}
		static_num <- count
		wp.Done()
	}()

	wp.Wait()
	close(static_num)
	fmt.Println("fan size")
	for val := range static_num {
		fmt.Println(val)
	}
}
data: 2 assgin to fan2
data: 3 assgin to fan2
data: 4 assgin to fan2
data: 5 assgin to fan2
data: 6 assgin to fan2
data: 7 assgin to fan2
data: 0 assgin to fan3
data: 1 assgin to fan1
data: 10 assgin to fan1
data: 11 assgin to fan1
data: 14 assgin to fan1
data: 9 assgin to fan3
data: 13 assgin to fan3
data: 16 assgin to fan3
data: 17 assgin to fan3
data: 19 assgin to fan3
data: 15 assgin to fan1
data: 18 assgin to fan1
data: 8 assgin to fan2
data: 12 assgin to fan2
data: 24 assgin to fan2
data: 21 assgin to fan1
data: 22 assgin to fan1
data: 20 assgin to fan3
data: 27 assgin to fan1
data: 23 assgin to fan3
data: 28 assgin to fan1
data: 29 assgin to fan1
data: 30 assgin to fan1
data: 25 assgin to fan2
data: 26 assgin to fan2
data: 33 assgin to fan2
data: 34 assgin to fan2
data: 37 assgin to fan2
data: 31 assgin to fan1
data: 35 assgin to fan1
data: 32 assgin to fan3
data: 36 assgin to fan3
data: 40 assgin to fan3
data: 41 assgin to fan3
data: 44 assgin to fan3
data: 38 assgin to fan2
data: 42 assgin to fan2
data: 39 assgin to fan1
data: 43 assgin to fan1
data: 46 assgin to fan2
data: 47 assgin to fan1
data: 45 assgin to fan3
data: 51 assgin to fan3
data: 52 assgin to fan3
data: 49 assgin to fan1
data: 54 assgin to fan1
data: 53 assgin to fan3
data: 55 assgin to fan3
data: 56 assgin to fan1
data: 57 assgin to fan1
data: 48 assgin to fan2
data: 50 assgin to fan2
data: 59 assgin to fan1
data: 58 assgin to fan3
fan size
19
22
19

fan in out

簡單說就是結合fan in , fan out, 將多個channel的資料匯集近一個channel, 之後再將資料分配至多個channel

    graph LR
    A[DataChannel1] --> D[FanInChannel]
    B[DataChannel2] --> D
    C[DataChannelN...] --> D
    D --> E[FanOutChannel1]
    D --> F[FanOutChannel2]
    D --> G[FanOutChannelN...]

package main

import (
	"fmt"
	"sync"
)

func Generator1() <-chan int {

	ch := make(chan int, 0)

	go func() {

		for i := 0; i < 20; i++ {
			ch <- i
		}

		close(ch)
	}()

	return ch

}

func Generator2() <-chan int {

	ch := make(chan int, 0)

	go func() {

		for i := 80; i < 100; i++ {
			ch <- i
		}

		close(ch)
	}()

	return ch

}

func FanIn(chs ...<-chan int) <-chan string {

	out := make(chan string, 0)

	go func() {
		wp := &sync.WaitGroup{}
		for _, ch := range chs {
			wp.Add(1)
			go func(ch <-chan int) {

				for val := range ch {
					out <- fmt.Sprintf("data: %v", val)
				}
				wp.Done()
			}(ch)
		}
		wp.Wait()
		close(out)
	}()

	return out

}

func FanOut(SourceChan <-chan string, Name string) <-chan string {

	splitchan := make(chan string, 0)

	go func() {

		for val := range SourceChan {
			splitchan <- fmt.Sprintf("%v assgin to %v", val, Name)
		}
		close(splitchan)

	}()

	return splitchan

}

func main() {

	ch1 := Generator1()
	ch2 := Generator2()
	fanin := FanIn(ch1, ch2)

	wp := &sync.WaitGroup{}
	splitchan1 := FanOut(fanin, "fan1")
	splitchan2 := FanOut(fanin, "fan2")
	splitchan3 := FanOut(fanin, "fan3")
	static_num := make(chan int, 3)

	wp.Add(3)
	go func() {
		count := 0
		for val := range splitchan1 {
			fmt.Println(val)
			count += 1
		}
		wp.Done()
		static_num <- count
	}()

	go func() {
		count := 0
		for val := range splitchan2 {
			fmt.Println(val)
			count += 1
		}
		wp.Done()
		static_num <- count
	}()

	go func() {
		count := 0
		for val := range splitchan3 {
			fmt.Println(val)
			count += 1
		}
		wp.Done()
		static_num <- count
	}()

	wp.Wait()
	close(static_num)
	fmt.Println("fan size")
	for val := range static_num {
		fmt.Println(val)
	}

}

or channel

會設計多個 函數, 該函數會回傳一個channel, 平時堵塞, 只有該函數完成後, 會輸入訊號至該channel

會監控以上函數的channel, 一旦有一個channel有訊號, 會執行後續的動作 (僅執行一次), 後面監控函數完成後 不會再執行

graph LR
    A[ActivateFunctionAfter1] -.->|send| B[or_channel]
    C[ActivateFunctionAfter2] -->|send| B
    D[ActivateFunctionAfter3] -.->|send| B
    B -->|one of them trigger| F[once任務]

package main

import (
	"fmt"
	"sync"
	"time"
)

// 該函數主要是輸出一個channel, 這邊模擬處理任務處理 完成後 close channel, 使其不阻塞
func ActivateFunctionAfter(duration time.Duration) <-chan struct{} {

	c := make(chan struct{})
	time.AfterFunc(duration, func() {
		fmt.Println("任務完成")
		close(c)
	})
	return c
}

// 用於傳入多個channel, 這邊目的是其中之一完成後(不阻塞), 執行一次特定任務
func or_channel(wp *sync.WaitGroup, chans ...<-chan struct{}) <-chan struct{} {

	ch := make(chan struct{}, 0) // 用於傳出 once任務已經被觸發
	once := new(sync.Once)

	go func() {

		for _, inputChan := range chans {
			wp.Add(1)
			go func(inputChan <-chan struct{}) {

				<-inputChan // 等待 activeFunctionAfter 其中之一被完成

				once.Do(func() { // once 任務 只會被觸發一次

					fmt.Println("執行once任務")
					// once 任務 內容 ..... 這邊只是模擬

					ch <- struct{}{} // once 完成後 傳出
					close(ch)
				})
				wp.Done()
			}(inputChan)
		}
	}()

	return ch
}

// 主要用於多個任務 , 只要其中之一完成後, 執行once任務 (只想被觸發一次,後續其他任務不會再被觸發)
func main() {
	wp := &sync.WaitGroup{}
	// 傳入想監控的函數channel, 函數完成後會觸發once任務
	res := or_channel(wp,
		ActivateFunctionAfter(time.Second*5),
		ActivateFunctionAfter(time.Second*2),
		ActivateFunctionAfter(time.Second*3),
		ActivateFunctionAfter(time.Second*4),
	)

	<-res // 監控once任務完成
	wp.Wait()

}
任務完成
執行once任務
任務完成
任務完成
任務完成

or channel done

與 or channel 類似, 監控多個函數的完成channel, 一旦有一個函數完成後, 會執行後續的動作, 但多了可從外部trigger直接執行後續動作


graph LR
    A[ActivateFunctionAfter1] -.->|send| B[or_channel]
    C[ActivateFunctionAfter2] -->|send| B
    D[ActivateFunctionAfter3] -.->|send| B
    E[context] -.->|trigger| B
    B -->|one of them trigger| F[once任務]

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func ActivateFunctionAfter(duration time.Duration) <-chan struct{} {
	c := make(chan struct{})
	time.AfterFunc(duration, func() {
		fmt.Println("任務完成")
		close(c)
	})
	return c
}

func or_channel_done(ctx context.Context, wp *sync.WaitGroup, chans ...<-chan struct{}) <-chan struct{} {
	ch := make(chan struct{})
	once := new(sync.Once)
	go func() {
		for _, inputChan := range chans {
			go func(inputChan <-chan struct{}) {
				wp.Add(1)
				defer wp.Done()
				for {
					select {
					case <-ctx.Done():
						fmt.Println("任務逾時....")
						once.Do(func() {
							fmt.Println("timeout, 執行once任務")
							close(ch)
						})
						return
					case <-inputChan:
						once.Do(func() {
							fmt.Println("執行once任務")
							ch <- struct{}{}
							close(ch)
						})
						return
					}
				}
			}(inputChan)
		}
	}()
	return ch
}

func main() {
	wp := &sync.WaitGroup{}
	ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
	ch1 := ActivateFunctionAfter(time.Second * 7)
	ch2 := ActivateFunctionAfter(time.Second * 11)
	ch3 := ActivateFunctionAfter(time.Second * 9)
	res := or_channel_done(ctx, wp, ch1, ch2, ch3)
	<-res
	wp.Wait()
任務逾時....
timeout, 執行once任務
任務逾時....
任務逾時....

tee channel

將一個channel 產生duplicate, 若將資料送入該channel, duplicate的channel 也會收到資料

實際作法為 原始channel會保持被listen , 只要發現有資料送入, 就會在duplicate的channel也送入相同的資料

graph LR
    A[Data] -->|input| B[TeeChannel1]
    B -->|tee| C[TeeChannel2]
    B -->|tee| D[TeeChannel3]
    C -->|pull| E[Data]
    D -->|pull| F[Data]

在原始channel放入1-4, 從duplicate channel 讀出資料


package main

import (
	"fmt"
	"sync"
)

func Generator() <-chan int {

	ch := make(chan int, 0)

	go func() {

		for i := 0; i < 5; i++ {
			ch <- i
		}

		close(ch)
	}()

	return ch

}

func tee(input <-chan int, outputs []chan int) {
	go func() {
		wg := &sync.WaitGroup{}

		for data := range input {
			for _, output := range outputs {
				wg.Add(1)
				go func(data int, output chan int) {
					defer wg.Done()
					output <- data
				}(data, output)
			}
			wg.Wait()
		}

		for _, output := range outputs {
			close(output)
		}

	}()
}

// 簡單說就是把一個channel 產生多個duplicate, 與fan out不同的是, fan out 也是一分多 但資料的總數不會變多 (一包餅乾變成兩個人在吃)
// tee則是真的一包餅乾變兩包
func main() {
	wp := &sync.WaitGroup{}
	ch := Generator()

	outputs := make([]chan int, 2)

	for i := 0; i < len(outputs); i++ { // 初始化 outputs
		outputs[i] = make(chan int, 0)
	}

	tee(ch, outputs)

	for index, output := range outputs {
		wp.Add(1)
		go func(index int, chanInt chan int) {
			for val := range chanInt {
				fmt.Printf("%v chan: %v\n", index, val)
			}
			wp.Done()
		}(index, output)
	}
	wp.Wait()
}
1 chan: 0
1 chan: 1
0 chan: 0
1 chan: 2
0 chan: 1
1 chan: 3
0 chan: 2
1 chan: 4
0 chan: 3
0 chan: 4

bridge channel

將多個channel的資料合併成一個channel輸出, 可保證順序性

graph LR
    A[DataChannel1] -->|1,2,3| B[BridgeChannel]
    C[DataChannel2] -->|11,12,13| B
    D[DataChannelN...] -->|101,102,103| B
    B -->|poll| E["[1,2,3] [101,102,103] [11,12,13]"]
package main

import (
	"context"
	"fmt"
)

func Generator(start int, end int) <-chan int {
	ch := make(chan int)

	go func() {
		defer close(ch)
		for i := start; i < end; i++ {
			ch <- i
		}
	}()

	return ch
}

func StreamGenerator(chs ...<-chan int) <-chan (<-chan int) {
	ch := make(chan (<-chan int))

	go func() {

		for _, val := range chs {
			ch <- val
		}

		close(ch)
	}()

	return ch
}

func Bridge(ctx context.Context, ch_ch <-chan <-chan int) <-chan int {
	res := make(chan int)

	go func() {
		defer close(res)
		for {
			select {
			case <-ctx.Done():
				return
			case ch, ok := <-ch_ch:
				if !ok {
					return
				}
				for val := range ch {
					res <- val
				}
			}
		}
	}()

	return res
}

func main() {

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	ch1 := Generator(100, 105)
	ch2 := Generator(6, 10)
	ch3 := Generator(500, 507)

	ch_ch := StreamGenerator(ch1, ch2, ch3)

	for val := range Bridge(ctx, ch_ch) {
		fmt.Println(val)
	}

}
100
101
102
103
104
6
7
8
9
500
501
502
503
504
505
506

worker pool

建立一個固定數量的worker goroutine, 用於處理任務, 通常會與cpu數量相同 (可同時處理的任務數量)
基本上就是 producer 與 consumer概念, producer 會將任務放入channel, consumer 會從channel取出任務處理

graph LR
    A[Producer] -->|task| B[TaskParmChannel]
    B -->|taskParm1| C[Worker1]
    B -->|taskParm2| D[Worker2]
    B -->|taskParm3| E[Worker3]
    B -->|taskParmN..| F[WorkerN...]
    C -->|result| G[OutputChannel]
    D -->|result| G
    E -->|result| G
    F -->|result| G

範例
這邊假設有五十組任務, 啟動16個worker, 每個任務需花費5秒, 這邊會等待所有任務完成後, 並計算總耗時

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func MyTask(WorkerID int, para int) string {

	time.Sleep(time.Second * 5)
	fmt.Println("WorkerID:", WorkerID, "task_id:", para, "執行中")
	return fmt.Sprintf("worker %v 已完成task %v ########", WorkerID, para)

}

func Worker(ctx context.Context, once *sync.Once, workerID int, inputParam <-chan int, output chan string) {

	defer once.Do(func() {
		fmt.Println("Worker:", workerID, "等待最後一組結束")
		time.Sleep(time.Second * 6)
		close(output)
	})

	for {
		select {
		case <-ctx.Done():
			fmt.Println("worker已逾時.........close worker", workerID)
			return
		case p, ok := <-inputParam:
			if !ok || ctx.Err() != nil {
				return
			}
			res := MyTask(workerID, p)
			output <- res
		}
	}
}

func GenerateTaskParm() chan int {
	inputParam := make(chan int)

	go func() {
		defer close(inputParam)
		for i := 0; i < 50; i++ {
			inputParam <- i
			time.Sleep(time.Millisecond * 100)
		}
	}()

	return inputParam
}

func GenerateWorkerID(CPUNum int) chan int {
	workerID := make(chan int)

	go func() {
		for i := 0; i < CPUNum; i++ {
			workerID <- i
		}
		close(workerID)
	}()

	return workerID
}

func main() {

	now := time.Now()

	CPUNum := 16
	// 用來紀錄goroutine worker編號
	workerID := GenerateWorkerID(CPUNum)

	// 函數參數
	para := GenerateTaskParm()

	// 用於返回 task完成後的結果
	output := make(chan string)

	wg := &sync.WaitGroup{}
	wg.Add(CPUNum)
	ctx, _ := context.WithTimeout(context.Background(), time.Second*100)
	once := new(sync.Once)
	go func() {
		for i := 0; i < CPUNum; i++ {
			go func() {
				defer wg.Done()
				WorkerID := <-workerID
				Worker(ctx, once, WorkerID, para, output)
			}()
		}
	}()
	count := 0
	go func() {
		for val := range output {
			fmt.Println(val)
			count += 1
		}
	}()
	wg.Wait()
	cost := time.Since(now)
	fmt.Println("總計完成數量:", count)
	fmt.Println("總計耗時:", cost)

}
WorkerID: 0 task_id: 0 執行中
worker 0 已完成task 0 ########
WorkerID: 15 task_id: 1 執行中
worker 15 已完成task 1 ########
WorkerID: 1 task_id: 2 執行中
worker 1 已完成task 2 ########
WorkerID: 2 task_id: 3 執行中
worker 2 已完成task 3 ########
WorkerID: 3 task_id: 4 執行中
worker 3 已完成task 4 ########
WorkerID: 4 task_id: 5 執行中
worker 4 已完成task 5 ########
WorkerID: 10 task_id: 6 執行中
worker 10 已完成task 6 ########
WorkerID: 11 task_id: 7 執行中
worker 11 已完成task 7 ########
WorkerID: 12 task_id: 8 執行中
worker 12 已完成task 8 ########
WorkerID: 13 task_id: 9 執行中
worker 13 已完成task 9 ########
WorkerID: 7 task_id: 10 執行中
worker 7 已完成task 10 ########
WorkerID: 9 task_id: 11 執行中
worker 9 已完成task 11 ########
WorkerID: 6 task_id: 12 執行中
worker 6 已完成task 12 ########
WorkerID: 8 task_id: 13 執行中
worker 8 已完成task 13 ########
WorkerID: 5 task_id: 14 執行中
worker 5 已完成task 14 ########
WorkerID: 14 task_id: 15 執行中
worker 14 已完成task 15 ########
WorkerID: 0 task_id: 16 執行中
worker 0 已完成task 16 ########
WorkerID: 15 task_id: 17 執行中
worker 15 已完成task 17 ########
WorkerID: 1 task_id: 18 執行中
worker 1 已完成task 18 ########
WorkerID: 2 task_id: 19 執行中
worker 2 已完成task 19 ########
WorkerID: 3 task_id: 20 執行中
worker 3 已完成task 20 ########
WorkerID: 4 task_id: 21 執行中
worker 4 已完成task 21 ########
WorkerID: 10 task_id: 22 執行中
worker 10 已完成task 22 ########
WorkerID: 11 task_id: 23 執行中
worker 11 已完成task 23 ########
WorkerID: 12 task_id: 24 執行中
worker 12 已完成task 24 ########
WorkerID: 13 task_id: 25 執行中
worker 13 已完成task 25 ########
WorkerID: 7 task_id: 26 執行中
worker 7 已完成task 26 ########
WorkerID: 9 task_id: 27 執行中
worker 9 已完成task 27 ########
WorkerID: 6 task_id: 28 執行中
worker 6 已完成task 28 ########
WorkerID: 8 task_id: 29 執行中
worker 8 已完成task 29 ########
WorkerID: 5 task_id: 30 執行中
worker 5 已完成task 30 ########
WorkerID: 14 task_id: 31 執行中
worker 14 已完成task 31 ########
WorkerID: 0 task_id: 32 執行中
worker 0 已完成task 32 ########
WorkerID: 15 task_id: 33 執行中
worker 15 已完成task 33 ########
WorkerID: 1 task_id: 34 執行中
worker 1 已完成task 34 ########
Worker: 1 等待最後一組結束
WorkerID: 2 task_id: 35 執行中
worker 2 已完成task 35 ########
WorkerID: 3 task_id: 36 執行中
worker 3 已完成task 36 ########
WorkerID: 4 task_id: 37 執行中
worker 4 已完成task 37 ########
WorkerID: 10 task_id: 38 執行中
worker 10 已完成task 38 ########
WorkerID: 11 task_id: 39 執行中
worker 11 已完成task 39 ########
WorkerID: 12 task_id: 40 執行中
worker 12 已完成task 40 ########
WorkerID: 13 task_id: 41 執行中
worker 13 已完成task 41 ########
WorkerID: 7 task_id: 42 執行中
worker 7 已完成task 42 ########
WorkerID: 9 task_id: 43 執行中
worker 9 已完成task 43 ########
WorkerID: 6 task_id: 44 執行中
worker 6 已完成task 44 ########
WorkerID: 8 task_id: 45 執行中
worker 8 已完成task 45 ########
WorkerID: 5 task_id: 46 執行中
worker 5 已完成task 46 ########
WorkerID: 14 task_id: 47 執行中
worker 14 已完成task 47 ########
WorkerID: 0 task_id: 48 執行中
worker 0 已完成task 48 ########
WorkerID: 15 task_id: 49 執行中
worker 15 已完成task 49 ########
總計完成數量: 50
總計耗時: 21.207082327s