Skip to content

Commit 99f2632

Browse files
committed
feat: add goroutine model
modified: go.mod modified: go.sum new file: grpcrun/go_grpc.go modified: grpcrun/grpcrun_test.go modified: grpcrun/task.go
1 parent 3e58497 commit 99f2632

File tree

5 files changed

+188
-25
lines changed

5 files changed

+188
-25
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.19
55
require go.uber.org/zap v1.24.0
66

77
require (
8+
github.com/bwmarrin/snowflake v0.3.0 // indirect
89
github.com/pkg/errors v0.9.1 // indirect
910
github.com/stretchr/testify v1.8.2 // indirect
1011
go.uber.org/atomic v1.10.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
2+
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
3+
github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE=
24
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
35
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
46
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

grpcrun/go_grpc.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Author: BeYoung
2+
// Date: 2023/2/26 14:27
3+
// Software: GoLand
4+
5+
package grpcrun
6+
7+
import (
8+
"context"
9+
"errors"
10+
"github.com/bwmarrin/snowflake"
11+
"go.uber.org/zap"
12+
"sync"
13+
"time"
14+
)
15+
16+
var (
17+
mu sync.Mutex
18+
node *snowflake.Node
19+
)
20+
21+
func init() {
22+
var err error
23+
mu = sync.Mutex{}
24+
if node, err = snowflake.NewNode(int64(time.Now().Day())); err != nil {
25+
panic(err)
26+
}
27+
}
28+
29+
// GoGrpc is used to run some goroutine of grpc.
30+
// the grpc's return will filled in response and error
31+
// Example:
32+
//
33+
// func example() {
34+
// run := GoGrpc{}
35+
// run.AddNewTask(nil, nil, nil)
36+
// run.Call()
37+
// run.Wait()
38+
// }
39+
type GoGrpc struct {
40+
mu sync.Mutex
41+
ctx context.Context
42+
cancel context.CancelFunc
43+
wait sync.WaitGroup
44+
Timeout time.Duration
45+
Task map[string]*GrpcTask
46+
}
47+
48+
func NewGoGrpc() *GoGrpc {
49+
mu.Lock()
50+
defer mu.Unlock()
51+
g := GoGrpc{}
52+
g.ctx, g.cancel = context.WithTimeout(context.Background(), 3*time.Second)
53+
g.mu = sync.Mutex{}
54+
g.wait = sync.WaitGroup{}
55+
g.Task = make(map[string]*GrpcTask, 0)
56+
return &g
57+
}
58+
59+
// SetTimeout reset timeout, replace default timeout with a special time duration
60+
func (g *GoGrpc) SetTimeout(timeout time.Duration) {
61+
mu.Lock()
62+
mu.Unlock()
63+
g.ctx, g.cancel = context.WithTimeout(context.Background(), timeout)
64+
}
65+
66+
func (g *GoGrpc) Run() {
67+
for _, t := range g.Task {
68+
go g.run(t)
69+
}
70+
}
71+
72+
func (g *GoGrpc) Wait() {
73+
defer g.cancel()
74+
g.wait.Wait()
75+
}
76+
77+
func (g *GoGrpc) AddTask(task *GrpcTask) {
78+
g.mu.Lock()
79+
defer g.mu.Unlock()
80+
g.Task[task.Name] = task
81+
g.wait.Add(1)
82+
}
83+
84+
func (g *GoGrpc) AddNewTask(grpcName string, grpcMethod any, request any) {
85+
g.mu.Lock()
86+
defer g.mu.Unlock()
87+
zap.S()
88+
task := GrpcTask{
89+
ctx: &g.ctx,
90+
grpcMethod: grpcMethod,
91+
request: request,
92+
Name: grpcName,
93+
log: zap.S().Named(grpcName),
94+
}
95+
96+
g.Task[node.Generate().String()] = &task
97+
g.wait.Add(1)
98+
return
99+
}
100+
101+
func (g *GoGrpc) run(t *GrpcTask) {
102+
defer g.wait.Done()
103+
for {
104+
select {
105+
case <-g.ctx.Done():
106+
t.Err = errors.New("context canceled")
107+
return
108+
default:
109+
t.Call()
110+
return
111+
}
112+
}
113+
}
114+
115+
func example() {
116+
run := GoGrpc{}
117+
run.AddNewTask("nil", nil, nil)
118+
run.Run()
119+
run.Wait()
120+
}

grpcrun/grpcrun_test.go

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"fmt"
77
"go.uber.org/zap"
8+
"strconv"
89
"testing"
910
"time"
1011

@@ -72,18 +73,18 @@ var (
7273
func TestGrpcTask(t *testing.T) {
7374

7475
for i, d := range datas {
75-
call := grpcrun.NewGrpc(&d.ctx, d.method, d.req)
76-
call.GrpcTask()
76+
call := grpcrun.NewGrpcTask(&d.ctx, "test{"+strconv.Itoa(i)+"}", d.method, d.req)
77+
call.Call()
7778

7879
t.Logf("第 %d 次执行\n", i+1)
7980
if call.Err != nil {
8081
fmt.Println(call.Err)
8182
fmt.Println()
8283
continue
8384
}
84-
//if should.NoError(call.Err) {
85+
// if should.NoError(call.Err) {
8586
// fmt.Println(call.Response.(*loginResp))
86-
//}
87+
// }
8788
fmt.Println(call.Response.(*loginResp))
8889
fmt.Println()
8990

@@ -107,17 +108,43 @@ func init() {
107108

108109
// 测试表格
109110
datas = []*data{
110-
newData(ctx, Login, req), // 正常
111-
newData(ctx, Login1, req), // [grpcMethod]必须有2个参数(context.Context, *request)
112-
newData(ctx, Login2, req), // [grpcMethod]的第1个参数必须是:context.Context
113-
newData(ctx, Login3, req), // [grpcMethod]必须有2个返回值(*Response, error)
114-
newData(ctx, Login4, req), // [grpcMethod]的第2个返回值必须是:error
115-
newData(nil, Login, req), // 请正确的传递[Context],不支持:nil
116-
newData(ctx, nil, req), // [grpcMethod]必须是一个GRPC的函数类型,现在是:invalid
117-
newData(ctx, Login, nil), // 请正确的传递[request],不支持:invalid
118-
newData(ctx, "其他类型", req), // [grpcMethod]必须是一个GRPC的函数类型,现在是:string
119-
newData(ctx, Login, "其他类型"), // 请正确的传入[request],不支持:string
120-
newData(ctx, Login, zap.S()), // [request]的参数与[grpcMethod]的参数不匹配:grpcMethod = v3_test.loginReq, request = zap.SugaredLogger
111+
newData(ctx, Login, req), // 正常
112+
newData(ctx, Login1, req), // [grpcMethod]必须有2个参数(context.Context, *request)
113+
newData(ctx, Login2, req), // [grpcMethod]的第1个参数必须是:context.Context
114+
newData(ctx, Login3, req), // [grpcMethod]必须有2个返回值(*Response, error)
115+
newData(ctx, Login4, req), // [grpcMethod]的第2个返回值必须是:error
116+
newData(nil, Login, req), // 请正确的传递[Context],不支持:nil
117+
newData(ctx, nil, req), // [grpcMethod]必须是一个GRPC的函数类型,现在是:invalid
118+
newData(ctx, Login, nil), // 请正确的传递[request],不支持:invalid
119+
newData(ctx, "其他类型", req), // [grpcMethod]必须是一个GRPC的函数类型,现在是:string
120+
newData(ctx, Login, "其他类型"), // 请正确的传入[request],不支持:string
121+
newData(ctx, Login, zap.S()), // [request]的参数与[grpcMethod]的参数不匹配:grpcMethod = v3_test.loginReq, request = zap.SugaredLogger
121122

122123
}
123124
}
125+
126+
func TestGoGrpc_AddNewTask(t *testing.T) {
127+
run := grpcrun.NewGoGrpc()
128+
for i, d := range datas {
129+
run.AddNewTask("test{"+strconv.Itoa(i)+"}", d.method, d.req)
130+
}
131+
}
132+
133+
func TestGoGrpc_Run(t *testing.T) {
134+
run := grpcrun.NewGoGrpc()
135+
for i, d := range datas {
136+
run.AddNewTask("test{"+strconv.Itoa(i)+"}", d.method, d.req)
137+
}
138+
run.Run()
139+
run.Wait()
140+
141+
for _, t := range run.Task {
142+
if t.Err != nil {
143+
fmt.Println(t.Err)
144+
fmt.Println()
145+
continue
146+
}
147+
fmt.Println(t.Response.(*loginResp))
148+
fmt.Println()
149+
}
150+
}

grpcrun/task.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import (
88
"reflect"
99
)
1010

11-
// Grpc 用于构建Grpc请求
12-
type Grpc struct {
11+
// GrpcTask 用于构建Grpc Task
12+
type GrpcTask struct {
1313
// 必须符合GRPC的 Method 签名
1414
grpcMethod any
1515

@@ -18,25 +18,38 @@ type Grpc struct {
1818
request any
1919

2020
// GRPC的调用返回值
21+
Name string
2122
Response any
2223
Err error
2324

2425
// 日志对象
2526
log *zap.SugaredLogger
2627
}
2728

28-
func NewGrpc(ctx *context.Context, grpcMethod any, req any) *Grpc {
29+
// NewGrpcTask creates a new GrpcTask
30+
//
31+
// Note:
32+
// @param grpcName string name of the grpc, this should be unique
33+
func NewGrpcTask(ctx *context.Context, grpcName string, grpcMethod any, request any) *GrpcTask {
34+
mu.Lock()
35+
defer mu.Unlock()
2936
zap.S()
30-
return &Grpc{
37+
38+
if grpcName == "" {
39+
grpcName = node.Generate().String()
40+
}
41+
42+
return &GrpcTask{
3143
ctx: ctx,
3244
grpcMethod: grpcMethod,
33-
request: req,
34-
log: zap.S().Named("Grpc-Task"),
45+
request: request,
46+
Name: grpcName,
47+
log: zap.S().Named(grpcName),
3548
}
3649
}
3750

38-
// GrpcTask :去调用GRPC的方法
39-
func (c *Grpc) GrpcTask() {
51+
// Call 去调用GRPC的方法
52+
func (c *GrpcTask) Call() {
4053
// 进行参数校验
4154
c.validate()
4255
if c.Err != nil {
@@ -48,7 +61,7 @@ func (c *Grpc) GrpcTask() {
4861
c.call()
4962
}
5063

51-
func (c *Grpc) call() {
64+
func (c *GrpcTask) call() {
5265
v := reflect.ValueOf(c.grpcMethod)
5366

5467
// 调用参数
@@ -67,7 +80,7 @@ func (c *Grpc) call() {
6780
}
6881

6982
// 校验结构体
70-
func (c *Grpc) validate() {
83+
func (c *GrpcTask) validate() {
7184

7285
// 校验 req 类型
7386
reqV := reflect.ValueOf(c.request)

0 commit comments

Comments
 (0)