diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ab8535e --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.swp +*.swn +*.swo diff --git a/README.md b/README.md index 59c1a06..eb9a250 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,47 @@ func doSometing(msg string) string{ } +``` + +## Async queue + +```go +import "github.com/ti/nasync" + +func main() { + //new a async pool in max 1000 task in max 1000 gorutines + async := nasync.New(1000,1000) + defer async.Close() + async.Do(doSometing,"hello word") + + //new a task queue with chan size 1000 in max 100 gorutines + taskQueue := NewUnBlockQueue(1000, 100) + //taskQueue := NewBlockTimeoutQueue(1000, 100, time.Second*1) + //taskQueue := nasync.NewBlockQueue(1000, 100) + + go func() { + time.Sleep(time.Second * 3) + taskQueue.Close() + }() + + for j := 1; j <= 20; j++ { + + _, err := taskQueue.Send(doSometing, fmt.Sprintf("handle, task :%d", j)) + + if err != nil { + fmt.Printf(" send task fail: %v\n", err) + } else { + fmt.Printf(" send task success: %d\n", j) + } + } +} + + +func doSometing(msg string) string{ + return "i am done by " + msg +} + + ``` # WHY diff --git a/error.go b/error.go new file mode 100644 index 0000000..8f9f4b2 --- /dev/null +++ b/error.go @@ -0,0 +1,19 @@ +package nasync + +const ( + ERROR_UNKNOWN = iota + ERROR_TIMEOUT + ERROR_QUEUE_FULL + ERROR_QUEUE_CLOSED +) + +type Error struct { + Code int + + Msg string +} + +func (e *Error) Error() string { + + return e.Msg +} diff --git a/log.go b/log.go new file mode 100644 index 0000000..5627770 --- /dev/null +++ b/log.go @@ -0,0 +1,12 @@ +package nasync + +const ( + LEVEL_UNKNOWN = iota + LEVEL_DEBUG + LEVEL_INFO + LEVEL_WARN + LEVEL_ERROR + LEVEL_FATAL +) + +type LogFunc func(level int, logStr string) diff --git a/task_queue.go b/task_queue.go new file mode 100644 index 0000000..becf275 --- /dev/null +++ b/task_queue.go @@ -0,0 +1,243 @@ +package nasync + +import ( + "bytes" + "context" + "fmt" + "runtime" + "strconv" + "sync" + "time" +) + +type TaskQueue struct { + taskChan chan *task // buffered queue used in non-runtime tasks + taskChanSize int64 // queue buffer size + chanAutoResize bool + + goroutineCount int //task worker goroutine number + goroutineContext context.Context + closeFunc context.CancelFunc + + sendTaskBlock bool // + sendTaskTimeout time.Duration //block send timeout + + syncMutex *sync.Mutex + + logFunc LogFunc + closed bool +} + +func NewUnBlockQueue(queueSize int64, handlerNum int) *TaskQueue { + + queue := newTaskQueue(queueSize, handlerNum) + + return queue +} + +func NewBlockQueue(queueSize int64, handlerNum int) *TaskQueue { + + queue := newTaskQueue(queueSize, handlerNum) + + queue.setSendBlock() + + return queue +} + +func NewBlockTimeoutQueue(queueSize int64, handlerNum int, timeout time.Duration) *TaskQueue { + + queue := newTaskQueue(queueSize, handlerNum) + + queue.setSendBlockWithTimeout(timeout) + + return queue +} + +func newTaskQueue(queueSize int64, handlerNum int) *TaskQueue { + + if queueSize < 1 { + panic("queueSize must greater than 0") + } + + if handlerNum < 1 { + panic("handlerNum must greater than 0") + } + + taskQueue := &TaskQueue{} + + taskQueue.taskChanSize = queueSize + taskQueue.taskChan = make(chan *task, queueSize) + + taskQueue.goroutineContext, taskQueue.closeFunc = context.WithCancel(context.Background()) + + taskQueue.sendTaskBlock = false //default not block + taskQueue.sendTaskTimeout = 0 + + taskQueue.syncMutex = &sync.Mutex{} + + //init handler goroutine + taskQueue.initHandler(handlerNum) + + return taskQueue +} + +//unblock +func (tq *TaskQueue) setSendUnBlock() { + tq.sendTaskBlock = false +} + +//block send until succeed +func (tq *TaskQueue) setSendBlock() { + tq.sendTaskBlock = true + tq.sendTaskTimeout = 0 +} + +//block send until timeout +func (tq *TaskQueue) setSendBlockWithTimeout(timeout time.Duration) { + if timeout < 0 { + timeout = 0 + } + + tq.sendTaskBlock = true + tq.sendTaskTimeout = timeout +} + +//send task +func (tq *TaskQueue) Send(handler interface{}, params ...interface{}) (resultStatus bool, resultError *Error) { + + defer func() { + if err := recover(); err != nil { + resultStatus = false + msg := fmt.Sprintf("%s", err) + resultError = &Error{Code: ERROR_UNKNOWN, Msg: msg} + } + + }() + + if tq.closed { + return false, &Error{Code: ERROR_QUEUE_CLOSED, Msg: "queue is closed"} + } + + task := newTask(handler, params...) + + if tq.sendTaskBlock { + if tq.sendTaskTimeout > 0 { //timeout + select { + case tq.taskChan <- task: + tq.log(LEVEL_DEBUG, fmt.Sprintf("send task %v", task)) + case <-time.After(tq.sendTaskTimeout): + tq.log(LEVEL_WARN, fmt.Sprintf("task abandoned for timeout %d", tq.sendTaskTimeout)) + return false, &Error{Code: ERROR_TIMEOUT, Msg: "task abandoned for timeout"} + } + + } else { + tq.taskChan <- task //block until success + tq.log(LEVEL_DEBUG, fmt.Sprintf("send task %v", task)) + } + + } else { //unblock send task + select { + case tq.taskChan <- task: + tq.log(LEVEL_DEBUG, fmt.Sprintf("send task %v", task)) + default: + tq.log(LEVEL_WARN, fmt.Sprintf("task abandoned for queue full, chanSize:%d", tq.taskChanSize)) + return false, &Error{Code: ERROR_QUEUE_FULL, Msg: "task abandoned for queue full"} + } + } + + return true, nil +} + +func (tq *TaskQueue) initHandler(handlerNum int) { + + for i := 0; i < handlerNum; i++ { + tq.AddHandler() + } +} + +//add handler goroutine +func (tq *TaskQueue) AddHandler() bool { + if tq.closed { + return false + } + + tq.syncMutex.Lock() + + defer tq.syncMutex.Unlock() + + go func() { + gid := getGID() + tq.log(LEVEL_INFO, fmt.Sprintf("add handler goroutine %d", gid)) + + for { + + select { + case task := <-tq.taskChan: + if task != nil { + func() { + defer func() { + if err := recover(); err != nil { + tq.log(LEVEL_WARN, fmt.Sprintf("gid:%d, handle task error %v", gid, err)) + } + + }() + task.Do() + tq.log(LEVEL_DEBUG, fmt.Sprintf("gid:%d, handle task %v", gid, task)) + + }() + } + + case <-tq.goroutineContext.Done(): + tq.log(LEVEL_INFO, fmt.Sprintf("close handler goroutine %d", gid)) + return + } + } + + }() + + tq.goroutineCount++ + + return true +} + +//关闭队列 +func (tq *TaskQueue) Close() { + tq.syncMutex.Lock() + + defer func() { + if err := recover(); err != nil { + tq.log(LEVEL_WARN, fmt.Sprintf("close error %+v", err)) + } + + tq.closeFunc() + + tq.closed = true + + tq.syncMutex.Unlock() + }() + + if !tq.closed { + close(tq.taskChan) + } +} + +func (tq *TaskQueue) SetLogFunc(logFunc LogFunc) { + + tq.logFunc = logFunc + +} + +func (tq *TaskQueue) log(level int, logStr string) { + if tq.logFunc != nil { + tq.logFunc(level, logStr) + } +} + +func getGID() uint64 { + b := make([]byte, 64) + b = b[:runtime.Stack(b, false)] + b = bytes.TrimPrefix(b, []byte("goroutine ")) + b = b[:bytes.IndexByte(b, ' ')] + n, _ := strconv.ParseUint(string(b), 10, 64) + return n +} diff --git a/task_queue_test.go b/task_queue_test.go new file mode 100644 index 0000000..b6a331d --- /dev/null +++ b/task_queue_test.go @@ -0,0 +1,138 @@ +package nasync + +import ( + "fmt" + "testing" + "time" +) + +func queueLog(level int, logStr string) { + if level > LEVEL_INFO { + //fmt.Println(logStr) + } +} + +var handler = func(msg string) string { + time.Sleep(time.Second * 2) + fmt.Println(msg) + panic("xxxxxxxxxxxxxx") + return "pong" +} + +func TestNewUnBlockQueue(t *testing.T) { + chanSize := 2 + goroutineCount := 2 + taskQueue := NewUnBlockQueue(int64(chanSize), goroutineCount) + taskQueue.SetLogFunc(queueLog) + + logPrefix := "TestNewUnBlockQueue" + + fmt.Printf("%s queue size chanSize:%d, goroutineCount:%d\n", logPrefix, chanSize, goroutineCount) + + for i := 0; i <= 1; i++ { + min := i*10 + 1 + max := min + 6 + for j := min; j < max; j++ { + + _, err := taskQueue.Send(handler, fmt.Sprintf("handle, task TestNewUnBlockQueue :%d", j)) + + if err != nil { + fmt.Printf("%s send task fail: %v\n", logPrefix, err) + } else { + fmt.Printf("%s send task success: %d\n", logPrefix, j) + } + } + + time.Sleep(time.Second * 3) + } + +} + +func TestNewBlockQueue(t *testing.T) { + chanSize := 2 + goroutineCount := 2 + taskQueue := NewBlockQueue(int64(chanSize), goroutineCount) + taskQueue.SetLogFunc(queueLog) + + logPrefix := "TestNewBlockQueue" + + fmt.Printf("%s queue size chanSize:%d, goroutineCount:%d\n", logPrefix, chanSize, goroutineCount) + + for i := 0; i <= 0; i++ { + min := i*10 + 1 + max := min + 6 + for j := min; j < max; j++ { + + _, err := taskQueue.Send(handler, fmt.Sprintf("handle, task NewBlockQueue :%d", j)) + + if err != nil { + fmt.Printf("%s send task fail: %v\n", logPrefix, err) + } else { + fmt.Printf("%s send task success: %d\n", logPrefix, j) + } + } + + time.Sleep(time.Second * 3) + } +} + +func TestNewBlockTimeoutQueue(t *testing.T) { + chanSize := 2 + goroutineCount := 2 + taskQueue := NewBlockTimeoutQueue(int64(chanSize), goroutineCount, time.Second*1) + taskQueue.SetLogFunc(queueLog) + + logPrefix := "NewBlockTimeoutQueue" + + fmt.Printf("%s queue size chanSize:%d, goroutineCount:%d\n", logPrefix, chanSize, goroutineCount) + + for i := 0; i <= 0; i++ { + min := i*10 + 1 + max := min + 6 + for j := min; j < max; j++ { + + _, err := taskQueue.Send(handler, fmt.Sprintf("handle, task NewBlockTimeoutQueue :%d", j)) + + if err != nil { + fmt.Printf("%s send task fail: %v\n", logPrefix, err) + } else { + fmt.Printf("%s send task success: %d\n", logPrefix, j) + } + } + + time.Sleep(time.Second * 3) + } +} + +func TestClose(t *testing.T) { + chanSize := 2 + goroutineCount := 2 + taskQueue := NewBlockQueue(int64(chanSize), goroutineCount) + taskQueue.SetLogFunc(queueLog) + + logPrefix := "TestClose" + + fmt.Printf("%s queue size chanSize:%d, goroutineCount:%d\n", logPrefix, chanSize, goroutineCount) + + go func() { + time.Sleep(time.Second * 3) + taskQueue.Close() + }() + + for i := 0; i <= 0; i++ { + min := i*10 + 1 + max := min + 10 + for j := min; j < max; j++ { + + _, err := taskQueue.Send(handler, fmt.Sprintf("handle, task TestClose :%d", j)) + + if err != nil { + fmt.Printf("%s send task fail: %v\n", logPrefix, err) + } else { + fmt.Printf("%s send task success: %d\n", logPrefix, j) + } + } + + time.Sleep(time.Second * 3) + } +}