- 简介
- 启动
- 目录
- 注册Worker
- AddFunc
- AddWorker
- Queue驱动
- Queue接口
- 获取Queue驱动
- 注入Queue驱动
- 消息入队
- 其他配置
- 针对性启用Topic消费
- 设置日志Logger及等级
- 设置标准输出日志等级
- 设置Worker默认并发数
- 详细文档
简介
Queue Worker是自己实现的队列调度服务,因为在开源的作品发现基本都是耦合了redis队列服务的,不太符合业务的需求。作为组件服务,需要抽象一种非侵入式的queue接口设计,由业务方注入实现了queue接口的驱动,比如redis、alimns、kafka、rabbitMQ。目前只实现了redis和alimns,其他后续看业务使用场景可扩展。
启动
build/bin/snow -a job
目录
app/job
注册Worker
在队列调度任务中注册需要执行的Worker,目前提供以下两种方式进行注册。具体Worker的业务逻辑需要注意,需要保证业务逻辑是同步阻塞的,如果需要进行并发,需要阻塞等待所有的并发结束,否则无法控制worker执行的并发数。
AddFunc
第一个参数为topic,第二个参数为回调函数,第三个为worker的并发数[可选,未设置使用默认并发数]
job.AddFunc("topic:test", test, 1)
//test函数示例,遵循如下的输入返回数据结构
func test(task work.Task) (work.TaskResult) {
//业务逻辑
}
AddWorker
需要传入Worker的数据结构,示例如下:
//test函数输入输出同上
job.AddWorker("topic:test2", &work.Worker{Call: work.MyWorkerFunc(test), MaxConcurrency: 1})
Queue驱动
队列调度要跑起来需要去拉取Queue驱动的数据,只要实现了如下Queue接口的定义,都可以进行Queue驱动注入。
Queue接口
type Queue interface {
Enqueue(ctx context.Context, key string, message string, args ...interface{}) (isOk bool, err error)
Dequeue(ctx context.Context, key string) (message string, token string, err error)
AckMsg(ctx context.Context, key string, token string) (ok bool, err error)
BatchEnqueue(ctx context.Context, key string, messages []string, args ...interface{}) (isOk bool, err error)
}
获取Queue驱动
目前框架已经集成好Queue的组件,可以直接使用。
## 第一个参数为依赖注入的别名,第二个参数为驱动类型(目前支持redis和alimns)
q, err := queue.GetQueue(redis.SingletonMain, queue.DriverTypeRedis)
注入Queue驱动
针对topic设置相关的queue
job.AddQueue(q, "topic:test1", "topic:test2")
设置默认的queue, 没有设置过的topic会使用默认的queue
job.AddQueue(q)
消息入队
目前在app/jobs子包下提供了4个消息入队的方法,最终消息是以Task结构json序列化的字符串保存到消息队列中,通过队列调度服务从消息队列中取出进行json反序列化,交给对应的worker处理。
/**
* 消息入队 -- 原始message
*/
func Enqueue(ctx context.Context, topic string, message string, args ...interface{}) (isOk bool, err error) {
return getJob().Enqueue(ctx, topic, message, args...)
}
/**
* 消息入队 -- Task数据结构
*/
func EnqueueWithTask(ctx context.Context, topic string, task work.Task, args ...interface{}) (isOk bool, err error) {
return getJob().EnqueueWithTask(ctx, topic, task, args...)
}
/**
* 消息批量入队 -- 原始message
*/
func BatchEnqueue(ctx context.Context, topic string, messages []string, args ...interface{}) (isOk bool, err error) {
return getJob().BatchEnqueue(ctx, topic, messages, args...)
}
/**
* 消息批量入队 -- Task数据结构
*/
func BatchEnqueueWithTask(ctx context.Context, topic string, tasks []work.Task, args ...interface{}) (isOk bool, err error) {
return getJob().BatchEnqueueWithTask(ctx, topic, tasks, args...)
}
其他配置
针对性启用Topic消费
目前会有这样的应用场景,一份代码需要注册所有的topic,但是不同机器需要跑不同的topic消费Worker。可以通过如下方法实现:
代码实现:
if config.GetOptions().Queue != "" {
topics := strings.Split(config.GetOptions().Queue, ",")
job.SetEnableTopics(topics...)
}
启动服务控制参数:
# -queue为空或者不配置时,默认启用所有注册的topic
build/bin/snow -a job -queue "topic1,topic2"
设置日志Logger及等级
## 非侵入式设计,只需要实现work.Logger的接口即可
job.SetLogger(logger.GetLogger())
## 设置日志等级,默认为work.Info
job.SetLevel(work.WARN)
设置标准输出日志等级
## 默认为work.Info
job.SetConsoleLevel(work.WARN)
设置Worker默认并发数
## 默认值为5
job.SetConcurrency(10)
详细文档
https://github.com/qit-team/work