From ab7d8b1fec28329135a7401c338cb90b3f5b92ee Mon Sep 17 00:00:00 2001 From: cclehui <763414242@qq.com> Date: Mon, 15 Apr 2019 20:48:10 +0800 Subject: [PATCH 1/5] task queue 01 --- .gitignore | 3 + demo/temp.go | 76 +++++++++++++++++++ error.go | 19 +++++ log.go | 12 +++ task_queue.go | 197 ++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 307 insertions(+) create mode 100644 .gitignore create mode 100644 demo/temp.go create mode 100644 error.go create mode 100644 log.go create mode 100644 task_queue.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ab8535e --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.swp +*.swn +*.swo diff --git a/demo/temp.go b/demo/temp.go new file mode 100644 index 0000000..d1597e4 --- /dev/null +++ b/demo/temp.go @@ -0,0 +1,76 @@ +package main + +import ( + "fmt" + "os" + "time" + + "github.com/cclehui/nasync" +) + +func queueLog(level int, logStr string) { + if level > nasync.LEVEL_DEBUG { + fmt.Println(logStr) + } +} + +func main() { + handler := func(msg string) string { + time.Sleep(time.Second * 2) + fmt.Println(msg) + return "pong" + } + + taskQueue := nasync.NewTaskQueue(3, 3) + taskQueue.SetLogFunc(queueLog) + + //taskQueue.SetSendBlock() + taskQueue.SetSendBlockWithTimeout(time.Second * 1) + + go func() { + time.Sleep(time.Second * 2) + + taskQueue.Close() + + }() + + for j := 1; j <= 15; j++ { + + _, err := taskQueue.Send(handler, fmt.Sprintf("ttttttttt, send j:%d", j)) + + fmt.Printf("send response %v\n", err) + + } + //taskQueue.Close() + + time.Sleep(time.Second * 5000) + + for j := 1; j < 10; j++ { + + taskQueue.Send(handler, fmt.Sprintf("xxxxxxxxxxx, send j:%d", j)) + + } + + time.Sleep(time.Second * 500000) + + os.Exit(0) + + //do async max 1000 tasks in max 10 go goroutine + //as := New(1000, 100) + as := nasync.New(2, 2) + defer as.Close() + + i := 1 + + for { + for i = 1; i <= 10; i++ { + + as.Do(handler, fmt.Sprintf("recieve, %d", i)) + + fmt.Printf("send task , %s, %d\n", time.Now().Format("2006-01-02 15:04:05"), i) + + } + time.Sleep(time.Second * 10) + } + +} diff --git a/error.go b/error.go new file mode 100644 index 0000000..8f9f4b2 --- /dev/null +++ b/error.go @@ -0,0 +1,19 @@ +package nasync + +const ( + ERROR_UNKNOWN = iota + ERROR_TIMEOUT + ERROR_QUEUE_FULL + ERROR_QUEUE_CLOSED +) + +type Error struct { + Code int + + Msg string +} + +func (e *Error) Error() string { + + return e.Msg +} diff --git a/log.go b/log.go new file mode 100644 index 0000000..5627770 --- /dev/null +++ b/log.go @@ -0,0 +1,12 @@ +package nasync + +const ( + LEVEL_UNKNOWN = iota + LEVEL_DEBUG + LEVEL_INFO + LEVEL_WARN + LEVEL_ERROR + LEVEL_FATAL +) + +type LogFunc func(level int, logStr string) diff --git a/task_queue.go b/task_queue.go new file mode 100644 index 0000000..c88126e --- /dev/null +++ b/task_queue.go @@ -0,0 +1,197 @@ +package nasync + +import ( + "bytes" + "context" + "fmt" + "runtime" + "strconv" + "sync" + "time" +) + +type TaskQueue struct { + taskChan chan *task // buffered queue used in non-runtime tasks + taskChanSize int64 // queue buffer size + chanAutoResize bool + + goroutineCount int //task worker goroutine number + goroutineAutoResize bool //协程个数自动扩容 + goroutineContext context.Context + closeFunc context.CancelFunc + + lostMessageCount int //丢失的消息个数 + + sendTaskBlock bool //task send 是否阻塞 + sendTaskTimeout time.Duration //阻塞send 超时的时间 + + syncMutex *sync.Mutex + + logFunc LogFunc + closed bool +} + +func NewTaskQueue(queueSize int64, goroutineCount int) *TaskQueue { + + if queueSize < 1 { + panic("queueSize must greater than 0") + } + + if goroutineCount < 1 { + panic("goroutineCount must greater than 0") + } + + taskQueue := &TaskQueue{} + + taskQueue.taskChanSize = queueSize + taskQueue.taskChan = make(chan *task, queueSize) + + taskQueue.goroutineCount = goroutineCount + taskQueue.goroutineContext, taskQueue.closeFunc = context.WithCancel(context.Background()) + + taskQueue.sendTaskBlock = false //默认非阻塞 + taskQueue.sendTaskTimeout = 0 //默认阻塞情况下不超时 + + taskQueue.syncMutex = &sync.Mutex{} + + //启动协程 异步处理task + taskQueue.initHandler() + + return taskQueue +} + +//非阻塞发送 +func (tq *TaskQueue) SetSendUnBlock() { + tq.sendTaskBlock = false +} + +//阻塞发送 无限超时 +func (tq *TaskQueue) SetSendBlock() { + tq.sendTaskBlock = true + tq.sendTaskTimeout = 0 +} + +//阻塞发送 超时限制 +func (tq *TaskQueue) SetSendBlockWithTimeout(timeout time.Duration) { + if timeout < 0 { + timeout = 0 + + } + + tq.sendTaskBlock = true + tq.sendTaskTimeout = timeout +} + +//send task +func (tq *TaskQueue) Send(handler interface{}, params ...interface{}) (resultStatus bool, resultError *Error) { + + defer func() { + if err := recover(); err != nil { + resultStatus = false + msg := fmt.Sprintf("%s", err) + resultError = &Error{Code: ERROR_UNKNOWN, Msg: msg} + } + + }() + + if tq.closed { + return false, &Error{Code: ERROR_QUEUE_CLOSED, Msg: "queue is closed"} + } + + task := newTask(handler, params...) + + if tq.sendTaskBlock { + if tq.sendTaskTimeout > 0 { //超时控制 + select { + case tq.taskChan <- task: + tq.log(LEVEL_DEBUG, fmt.Sprintf("send task %v", task)) + case <-time.After(tq.sendTaskTimeout): + tq.log(LEVEL_INFO, fmt.Sprintf("task abandoned for timeout %d, %v", tq.sendTaskTimeout, task)) + return false, &Error{Code: ERROR_TIMEOUT, Msg: "task abandoned for timeout"} + } + + } else { + tq.taskChan <- task //一直阻塞直到成功 + tq.log(LEVEL_DEBUG, fmt.Sprintf("send task %v", task)) + } + + } else { //非阻塞send task + select { + case tq.taskChan <- task: + tq.log(LEVEL_DEBUG, fmt.Sprintf("send task %v", task)) + default: + tq.log(LEVEL_INFO, fmt.Sprintf("task abandoned for queue full %d, %v", tq.taskChanSize, task)) + return false, &Error{Code: ERROR_QUEUE_FULL, Msg: "task abandoned for queue full"} + } + } + + return true, nil +} + +//处理task +func (tq *TaskQueue) initHandler() { + + for i := 0; i < tq.goroutineCount; i++ { + tq.AddHandler() + } +} + +//增加协程 +func (tq *TaskQueue) AddHandler() { + tq.syncMutex.Lock() + + defer tq.syncMutex.Unlock() + + go func() { + gid := getGID() + tq.log(LEVEL_INFO, fmt.Sprintf("add handler goroutine %d", gid)) + + for { + + select { + case task := <-tq.taskChan: + task.Do() + tq.log(LEVEL_DEBUG, fmt.Sprintf("gid:%d, handle task %v", gid, task)) + case <-tq.goroutineContext.Done(): + tq.log(LEVEL_INFO, fmt.Sprintf("close handler goroutine %d", gid)) + return + } + } + + }() +} + +//关闭队列 +func (tq *TaskQueue) Close() { + tq.syncMutex.Lock() + + defer tq.syncMutex.Unlock() + if !tq.closed { + close(tq.taskChan) + + tq.closeFunc() + + tq.closed = true + } +} + +func (tq *TaskQueue) SetLogFunc(logFunc LogFunc) { + + tq.logFunc = logFunc + +} + +func (tq *TaskQueue) log(level int, logStr string) { + if tq.logFunc != nil { + tq.logFunc(level, logStr) + } +} + +func getGID() uint64 { + b := make([]byte, 64) + b = b[:runtime.Stack(b, false)] + b = bytes.TrimPrefix(b, []byte("goroutine ")) + b = b[:bytes.IndexByte(b, ' ')] + n, _ := strconv.ParseUint(string(b), 10, 64) + return n +} From c15ab43edf8546ac986e53b2e1109ed1274bc425 Mon Sep 17 00:00:00 2001 From: cclehui <763414242@qq.com> Date: Tue, 16 Apr 2019 16:59:47 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E5=AE=8C=E5=96=84=20=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E9=98=9F=E5=88=97=E6=9C=BA=E5=88=B6=E5=92=8C=E5=8D=95=E5=85=83?= =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- async_test.go | 5 +- demo/temp.go | 76 ------------------------ task_queue.go | 122 +++++++++++++++++++++++++++------------ task_queue_test.go | 140 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 226 insertions(+), 117 deletions(-) delete mode 100644 demo/temp.go create mode 100644 task_queue_test.go diff --git a/async_test.go b/async_test.go index 8948e46..67c6f61 100644 --- a/async_test.go +++ b/async_test.go @@ -3,12 +3,11 @@ package nasync import ( "fmt" "testing" - "time" ) func TestAsyncTask(t *testing.T) { Do(ping) - time.Sleep(5 * time.Second) + //time.Sleep(5 * time.Second) } func TestAsyncAdvanced(t *testing.T) { @@ -21,7 +20,7 @@ func TestAsyncAdvanced(t *testing.T) { return "pong" } as.Do(handler, "ping") - time.Sleep(5 * time.Second) + //time.Sleep(5 * time.Second) } func ping() string { diff --git a/demo/temp.go b/demo/temp.go deleted file mode 100644 index d1597e4..0000000 --- a/demo/temp.go +++ /dev/null @@ -1,76 +0,0 @@ -package main - -import ( - "fmt" - "os" - "time" - - "github.com/cclehui/nasync" -) - -func queueLog(level int, logStr string) { - if level > nasync.LEVEL_DEBUG { - fmt.Println(logStr) - } -} - -func main() { - handler := func(msg string) string { - time.Sleep(time.Second * 2) - fmt.Println(msg) - return "pong" - } - - taskQueue := nasync.NewTaskQueue(3, 3) - taskQueue.SetLogFunc(queueLog) - - //taskQueue.SetSendBlock() - taskQueue.SetSendBlockWithTimeout(time.Second * 1) - - go func() { - time.Sleep(time.Second * 2) - - taskQueue.Close() - - }() - - for j := 1; j <= 15; j++ { - - _, err := taskQueue.Send(handler, fmt.Sprintf("ttttttttt, send j:%d", j)) - - fmt.Printf("send response %v\n", err) - - } - //taskQueue.Close() - - time.Sleep(time.Second * 5000) - - for j := 1; j < 10; j++ { - - taskQueue.Send(handler, fmt.Sprintf("xxxxxxxxxxx, send j:%d", j)) - - } - - time.Sleep(time.Second * 500000) - - os.Exit(0) - - //do async max 1000 tasks in max 10 go goroutine - //as := New(1000, 100) - as := nasync.New(2, 2) - defer as.Close() - - i := 1 - - for { - for i = 1; i <= 10; i++ { - - as.Do(handler, fmt.Sprintf("recieve, %d", i)) - - fmt.Printf("send task , %s, %d\n", time.Now().Format("2006-01-02 15:04:05"), i) - - } - time.Sleep(time.Second * 10) - } - -} diff --git a/task_queue.go b/task_queue.go index c88126e..becf275 100644 --- a/task_queue.go +++ b/task_queue.go @@ -15,15 +15,12 @@ type TaskQueue struct { taskChanSize int64 // queue buffer size chanAutoResize bool - goroutineCount int //task worker goroutine number - goroutineAutoResize bool //协程个数自动扩容 - goroutineContext context.Context - closeFunc context.CancelFunc + goroutineCount int //task worker goroutine number + goroutineContext context.Context + closeFunc context.CancelFunc - lostMessageCount int //丢失的消息个数 - - sendTaskBlock bool //task send 是否阻塞 - sendTaskTimeout time.Duration //阻塞send 超时的时间 + sendTaskBlock bool // + sendTaskTimeout time.Duration //block send timeout syncMutex *sync.Mutex @@ -31,14 +28,39 @@ type TaskQueue struct { closed bool } -func NewTaskQueue(queueSize int64, goroutineCount int) *TaskQueue { +func NewUnBlockQueue(queueSize int64, handlerNum int) *TaskQueue { + + queue := newTaskQueue(queueSize, handlerNum) + + return queue +} + +func NewBlockQueue(queueSize int64, handlerNum int) *TaskQueue { + + queue := newTaskQueue(queueSize, handlerNum) + + queue.setSendBlock() + + return queue +} + +func NewBlockTimeoutQueue(queueSize int64, handlerNum int, timeout time.Duration) *TaskQueue { + + queue := newTaskQueue(queueSize, handlerNum) + + queue.setSendBlockWithTimeout(timeout) + + return queue +} + +func newTaskQueue(queueSize int64, handlerNum int) *TaskQueue { if queueSize < 1 { panic("queueSize must greater than 0") } - if goroutineCount < 1 { - panic("goroutineCount must greater than 0") + if handlerNum < 1 { + panic("handlerNum must greater than 0") } taskQueue := &TaskQueue{} @@ -46,36 +68,34 @@ func NewTaskQueue(queueSize int64, goroutineCount int) *TaskQueue { taskQueue.taskChanSize = queueSize taskQueue.taskChan = make(chan *task, queueSize) - taskQueue.goroutineCount = goroutineCount taskQueue.goroutineContext, taskQueue.closeFunc = context.WithCancel(context.Background()) - taskQueue.sendTaskBlock = false //默认非阻塞 - taskQueue.sendTaskTimeout = 0 //默认阻塞情况下不超时 + taskQueue.sendTaskBlock = false //default not block + taskQueue.sendTaskTimeout = 0 taskQueue.syncMutex = &sync.Mutex{} - //启动协程 异步处理task - taskQueue.initHandler() + //init handler goroutine + taskQueue.initHandler(handlerNum) return taskQueue } -//非阻塞发送 -func (tq *TaskQueue) SetSendUnBlock() { +//unblock +func (tq *TaskQueue) setSendUnBlock() { tq.sendTaskBlock = false } -//阻塞发送 无限超时 -func (tq *TaskQueue) SetSendBlock() { +//block send until succeed +func (tq *TaskQueue) setSendBlock() { tq.sendTaskBlock = true tq.sendTaskTimeout = 0 } -//阻塞发送 超时限制 -func (tq *TaskQueue) SetSendBlockWithTimeout(timeout time.Duration) { +//block send until timeout +func (tq *TaskQueue) setSendBlockWithTimeout(timeout time.Duration) { if timeout < 0 { timeout = 0 - } tq.sendTaskBlock = true @@ -101,26 +121,26 @@ func (tq *TaskQueue) Send(handler interface{}, params ...interface{}) (resultSta task := newTask(handler, params...) if tq.sendTaskBlock { - if tq.sendTaskTimeout > 0 { //超时控制 + if tq.sendTaskTimeout > 0 { //timeout select { case tq.taskChan <- task: tq.log(LEVEL_DEBUG, fmt.Sprintf("send task %v", task)) case <-time.After(tq.sendTaskTimeout): - tq.log(LEVEL_INFO, fmt.Sprintf("task abandoned for timeout %d, %v", tq.sendTaskTimeout, task)) + tq.log(LEVEL_WARN, fmt.Sprintf("task abandoned for timeout %d", tq.sendTaskTimeout)) return false, &Error{Code: ERROR_TIMEOUT, Msg: "task abandoned for timeout"} } } else { - tq.taskChan <- task //一直阻塞直到成功 + tq.taskChan <- task //block until success tq.log(LEVEL_DEBUG, fmt.Sprintf("send task %v", task)) } - } else { //非阻塞send task + } else { //unblock send task select { case tq.taskChan <- task: tq.log(LEVEL_DEBUG, fmt.Sprintf("send task %v", task)) default: - tq.log(LEVEL_INFO, fmt.Sprintf("task abandoned for queue full %d, %v", tq.taskChanSize, task)) + tq.log(LEVEL_WARN, fmt.Sprintf("task abandoned for queue full, chanSize:%d", tq.taskChanSize)) return false, &Error{Code: ERROR_QUEUE_FULL, Msg: "task abandoned for queue full"} } } @@ -128,16 +148,19 @@ func (tq *TaskQueue) Send(handler interface{}, params ...interface{}) (resultSta return true, nil } -//处理task -func (tq *TaskQueue) initHandler() { +func (tq *TaskQueue) initHandler(handlerNum int) { - for i := 0; i < tq.goroutineCount; i++ { + for i := 0; i < handlerNum; i++ { tq.AddHandler() } } -//增加协程 -func (tq *TaskQueue) AddHandler() { +//add handler goroutine +func (tq *TaskQueue) AddHandler() bool { + if tq.closed { + return false + } + tq.syncMutex.Lock() defer tq.syncMutex.Unlock() @@ -150,8 +173,20 @@ func (tq *TaskQueue) AddHandler() { select { case task := <-tq.taskChan: - task.Do() - tq.log(LEVEL_DEBUG, fmt.Sprintf("gid:%d, handle task %v", gid, task)) + if task != nil { + func() { + defer func() { + if err := recover(); err != nil { + tq.log(LEVEL_WARN, fmt.Sprintf("gid:%d, handle task error %v", gid, err)) + } + + }() + task.Do() + tq.log(LEVEL_DEBUG, fmt.Sprintf("gid:%d, handle task %v", gid, task)) + + }() + } + case <-tq.goroutineContext.Done(): tq.log(LEVEL_INFO, fmt.Sprintf("close handler goroutine %d", gid)) return @@ -159,19 +194,30 @@ func (tq *TaskQueue) AddHandler() { } }() + + tq.goroutineCount++ + + return true } //关闭队列 func (tq *TaskQueue) Close() { tq.syncMutex.Lock() - defer tq.syncMutex.Unlock() - if !tq.closed { - close(tq.taskChan) + defer func() { + if err := recover(); err != nil { + tq.log(LEVEL_WARN, fmt.Sprintf("close error %+v", err)) + } tq.closeFunc() tq.closed = true + + tq.syncMutex.Unlock() + }() + + if !tq.closed { + close(tq.taskChan) } } diff --git a/task_queue_test.go b/task_queue_test.go new file mode 100644 index 0000000..1b41e44 --- /dev/null +++ b/task_queue_test.go @@ -0,0 +1,140 @@ +package nasync + +import ( + "fmt" + "testing" + "time" +) + +func queueLog(level int, logStr string) { + if level > LEVEL_INFO { + //fmt.Println(logStr) + } +} + +var handler = func(msg string) string { + time.Sleep(time.Second * 2) + fmt.Println(msg) + panic("xxxxxxxxxxxxxx") + return "pong" +} + +func TestNewUnBlockQueue(t *testing.T) { + chanSize := 2 + goroutineCount := 2 + taskQueue := NewUnBlockQueue(int64(chanSize), goroutineCount) + taskQueue.SetLogFunc(queueLog) + + logPrefix := "TestNewUnBlockQueue" + + fmt.Printf("%s queue size chanSize:%d, goroutineCount:%d\n", logPrefix, chanSize, goroutineCount) + + for i := 0; i <= 1; i++ { + min := i*10 + 1 + max := min + 6 + for j := min; j < max; j++ { + + _, err := taskQueue.Send(handler, fmt.Sprintf("handle, task TestNewUnBlockQueue :%d", j)) + + if err != nil { + fmt.Printf("%s send task fail: %v\n", logPrefix, err) + } else { + fmt.Printf("%s send task success: %d\n", logPrefix, j) + } + } + + time.Sleep(time.Second * 3) + } + +} + +func TestNewBlockQueue(t *testing.T) { + chanSize := 2 + goroutineCount := 2 + taskQueue := NewBlockQueue(int64(chanSize), goroutineCount) + taskQueue.SetLogFunc(queueLog) + + logPrefix := "TestNewBlockQueue" + + fmt.Printf("%s queue size chanSize:%d, goroutineCount:%d\n", logPrefix, chanSize, goroutineCount) + + for i := 0; i <= 0; i++ { + min := i*10 + 1 + max := min + 6 + for j := min; j < max; j++ { + + _, err := taskQueue.Send(handler, fmt.Sprintf("handle, task NewBlockQueue :%d", j)) + + if err != nil { + fmt.Printf("%s send task fail: %v\n", logPrefix, err) + } else { + fmt.Printf("%s send task success: %d\n", logPrefix, j) + } + } + + time.Sleep(time.Second * 3) + } +} + +func TestNewBlockTimeoutQueue(t *testing.T) { + chanSize := 2 + goroutineCount := 2 + taskQueue := NewBlockTimeoutQueue(int64(chanSize), goroutineCount, time.Second*1) + taskQueue.SetLogFunc(queueLog) + + logPrefix := "NewBlockTimeoutQueue" + + fmt.Printf("%s queue size chanSize:%d, goroutineCount:%d\n", logPrefix, chanSize, goroutineCount) + + for i := 0; i <= 0; i++ { + min := i*10 + 1 + max := min + 6 + for j := min; j < max; j++ { + + _, err := taskQueue.Send(handler, fmt.Sprintf("handle, task NewBlockTimeoutQueue :%d", j)) + + if err != nil { + fmt.Printf("%s send task fail: %v\n", logPrefix, err) + } else { + fmt.Printf("%s send task success: %d\n", logPrefix, j) + } + } + + time.Sleep(time.Second * 3) + } +} + +func TestClose(t *testing.T) { + chanSize := 2 + goroutineCount := 2 + taskQueue := NewBlockQueue(int64(chanSize), goroutineCount) + taskQueue.SetLogFunc(queueLog) + + logPrefix := "TestClose" + + fmt.Printf("%s queue size chanSize:%d, goroutineCount:%d\n", logPrefix, chanSize, goroutineCount) + + go func() { + + time.Sleep(time.Second * 3) + taskQueue.Close() + + }() + + for i := 0; i <= 0; i++ { + min := i*10 + 1 + max := min + 10 + for j := min; j < max; j++ { + + _, err := taskQueue.Send(handler, fmt.Sprintf("handle, task TestClose :%d", j)) + + if err != nil { + fmt.Printf("%s send task fail: %v\n", logPrefix, err) + } else { + fmt.Printf("%s send task success: %d\n", logPrefix, j) + } + } + + time.Sleep(time.Second * 3) + } +} From 2607229330e303055ef87cd99b1fae3a50e5909b Mon Sep 17 00:00:00 2001 From: cclehui <763414242@qq.com> Date: Tue, 16 Apr 2019 17:01:04 +0800 Subject: [PATCH 3/5] 03 --- async_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/async_test.go b/async_test.go index 67c6f61..8948e46 100644 --- a/async_test.go +++ b/async_test.go @@ -3,11 +3,12 @@ package nasync import ( "fmt" "testing" + "time" ) func TestAsyncTask(t *testing.T) { Do(ping) - //time.Sleep(5 * time.Second) + time.Sleep(5 * time.Second) } func TestAsyncAdvanced(t *testing.T) { @@ -20,7 +21,7 @@ func TestAsyncAdvanced(t *testing.T) { return "pong" } as.Do(handler, "ping") - //time.Sleep(5 * time.Second) + time.Sleep(5 * time.Second) } func ping() string { From 5042ef35cd17251a83caea86e12f9e11d89f2ed4 Mon Sep 17 00:00:00 2001 From: cclehui <763414242@qq.com> Date: Tue, 16 Apr 2019 17:10:13 +0800 Subject: [PATCH 4/5] add read me --- README.md | 41 +++++++++++++++++++++++++++++++++++++++++ task_queue_test.go | 2 -- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 59c1a06..82f5685 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,47 @@ func doSometing(msg string) string{ } +``` + +## Async queue + +```go +import "github.com/ti/nasync" + +func main() { + //new a async pool in max 1000 task in max 1000 gorutines + async := nasync.New(1000,1000) + defer async.Close() + async.Do(doSometing,"hello word") + + //new a task queue with chan size 1000 in max 100 gorutines + taskQueue := NewUnBlockQueue(1000, 100) + //taskQueue := NewBlockTimeoutQueue(1000, 100, time.Second*1) + //taskQueue := nasync.NewBlockQueue(1000, 100) + + go func() { + time.Sleep(time.Second * 3) + taskQueue.Close() + }() + + for j := 1; j <= 20; j++ { + + _, err := taskQueue.Send(doSometing, fmt.Sprintf("handle, task :%d", j)) + + if err != nil { + fmt.Printf(" send task fail: %v\n", err) + } else { + fmt.Printf(" send task success: %d\n", j) + } + } +} + + +func doSometing(msg string) string{ + return "i am done by " + msg +} + + ``` # WHY diff --git a/task_queue_test.go b/task_queue_test.go index 1b41e44..b6a331d 100644 --- a/task_queue_test.go +++ b/task_queue_test.go @@ -115,10 +115,8 @@ func TestClose(t *testing.T) { fmt.Printf("%s queue size chanSize:%d, goroutineCount:%d\n", logPrefix, chanSize, goroutineCount) go func() { - time.Sleep(time.Second * 3) taskQueue.Close() - }() for i := 0; i <= 0; i++ { From 8438928be89db4d8417e29829a7d1fe35ef36dfd Mon Sep 17 00:00:00 2001 From: cclehui <763414242@qq.com> Date: Tue, 16 Apr 2019 17:11:52 +0800 Subject: [PATCH 5/5] youhua readme --- README.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 82f5685..eb9a250 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ func main() { async.Do(doSometing,"hello word") //new a task queue with chan size 1000 in max 100 gorutines - taskQueue := NewUnBlockQueue(1000, 100) + taskQueue := NewUnBlockQueue(1000, 100) //taskQueue := NewBlockTimeoutQueue(1000, 100, time.Second*1) //taskQueue := nasync.NewBlockQueue(1000, 100) @@ -77,16 +77,16 @@ func main() { taskQueue.Close() }() - for j := 1; j <= 20; j++ { + for j := 1; j <= 20; j++ { - _, err := taskQueue.Send(doSometing, fmt.Sprintf("handle, task :%d", j)) + _, err := taskQueue.Send(doSometing, fmt.Sprintf("handle, task :%d", j)) - if err != nil { - fmt.Printf(" send task fail: %v\n", err) - } else { - fmt.Printf(" send task success: %d\n", j) - } - } + if err != nil { + fmt.Printf(" send task fail: %v\n", err) + } else { + fmt.Printf(" send task success: %d\n", j) + } + } }