• 前置条件

    在开始使用Saga之前,要确保服务的choerodon-starters-asgard依赖在0.6.3.RELEASE版本及之上, 推荐最新版0.12.0.RELEASE。同时需要对Saga 有一定的了解。可以参考Choerodon猪齿鱼平台中的微服务数据一致性解决方案

    介绍

    Saga 是分布式系统中数据一致性的一种解决方案。本章介绍了如何使用Choerodon 的Saga。并包含如下的内容:

    添加依赖

    在本地服务的pom.xml 中添加如下的依赖。

        <dependency>
            <groupId>io.choerodon</groupId>
            <artifactId>choerodon-starter-asgard</artifactId>
            <version>${choerodon.starters.version}</version>
        </dependency>
    

    消费者端配置

    choerodon:
      saga:
        consumer:
          enabled: false # 启动消费端
          thread-num: 2 # saga消息消费线程池大小
          max-poll-size: 200 # 每次拉取消息最大数量
          poll-interval-ms: 1000 # 拉取间隔,默认1000毫秒
    

    注解

    choerodon-starter-asgard 中,包含有两个注解。

    首先,请确保注解所在类可以被spring扫描到

    @Saga

    在方法或者类上添加@Saga 注解。

    @Saga(code = "asgard-create-user", description = "创建用户", inputSchemaClass = AsgardUser.class)
    
    字段 作用
    code 任务通过@SagaTask订阅,对应@SagaTasksagaCode
    description 描述信息
    inputSchema Saga输入的参数示例。例如{"name":"string", "age":0}。会覆盖inputSchemaClass自动生成
    inputSchemaClass 指定class自动生成。比如指定User将自动生成{"id":0,"username":"string","password":"string"}

    @SagaTask

    在方法上添加@SagaTask注解,@SagaTask本身封装了事务,无需再使用@Transacional声明事务

    @SagaTask(code = "devopsCreateUser",
        description = "devops创建用户",
        sagaCode = "asgard-create-user",
        concurrentLimitNum = 2,
        concurrentLimitPolicy = SagaDefinition.ConcurrentLimitPolicy.NONE,
        seq = 2)
    
    字段 作用
    code taskcode,同一个sagaCode下的taskCode需要唯一
    sagaCode 对应@Sagacode,表示订阅该Saga
    seq 执行顺序,同一个Saga下的task将按照seq顺序一次消费,值越小消费顺序越高
    enabledDbRecord 是否在数据库中记录消息消费,默认
    description 描述
    maxRetryCount 最大自动重试次数,默认次数为1
    concurrentLimitPolicy 并发策略,默认为NONE
    concurrentLimitNum 并发数,当concurrentLimitPolicy不为NONE时生效
    timeoutSeconds 超时时间
    timeoutPolicy 超时策略,默认为重试
    outputSchemaClass 默认将@SagaTask的返回类型生成输出,也可通过此属性指定
    outputSchema 通过json字符串手动指定输出参数。比如{“name”:“wang”,“age”:23}
    transactionIsolation 事务的隔离级别
    transactionManager 使用的事务管理器

    在一个Saga 定义中。上一个SagaTask的输出是下一个的输入,当seq相同时,则并行执行,并行的任务输出的结果json进行一个合并,作为下一个次序的输入。

    并发策略,默认为NONETYPE根据sagaClient.startSaga时的refType设置并发,TYPE_AND_ID根据refTyperef_id设置并发,并发数为concurrentLimitNum。一个服务将@SagaTask注解删除,asgard服务也会同步删除该SagaTask

    开启Saga

    Saga 被定义好之后,可以通过服务自身,启动一个Saga 实例。

    启动Saga实例

    通过Feign启动一个Saga(过时)

    请确保@EnableFeignClients包含io.choerodon.asgard.saga,否则扫描不到该feignClient。例如:@EnableFeignClients("io.choerodon")

    例如创建一个用户时,启动一个Saga:

    @Transactional
        public AsgardUser createUser(@Valid @RequestBody AsgardUser user) {
             // 业务代码
             sagaClient.startSaga("asgard-create-user", new StartInstanceDTO(input, "", ""));
        }
    

    通过TransactionalProducer启动一个Saga

    例如创建一个用户时,启动一个Saga

    producer.applyAndReturn(
                    StartSagaBuilder
                            .newBuilder()
                            .withLevel(ResourceLevel.ORGANIZATION)
                            .withRefType("organization")
                            .withSagaCode("asgard-create-user"),
                    builder -> {
                        asgardService.createuser(user); 
                        builder
                                .withPayloadAndSerialize(sagaPayload)
                                .withRefId(String.valueOf(orgId))
                                .withSourceId(orgId);
                        return sagaPayload;
                    });
    

    同时在代码中添加如下处理逻辑:

    @SagaTask(code = "devopsCreateUser",
            description = "devops创建用户",
            sagaCode = "asgard-create-user",
            concurrentLimitNum = 2,
            concurrentLimitPolicy = SagaDefinition.ConcurrentLimitPolicy.NONE,
            seq = 2)
    public DevopsUser devopsCreateUser(String data) throws IOException {
        AsgardUser asgardUser = objectMapper.readValue(data, AsgardUser.class);
        LOGGER.info("===== asgardUser {}", asgardUser);
        DevopsUser devopsUser = new DevopsUser();
        devopsUser.setId(asgardUser.getId());
        devopsUser.setGroup("test");
        LOGGER.info("===== devopsCreateUser {}", devopsUser);
        return devopsUser;
    }
    

    方法返回值为该任务的输出,本次sagaTask的输出是下一个sagaTask的输入。

    里面执行封装了事务,不需要再加事务,如果需要加外部事务,可通过@SagaTasktransactionDefinition设置事务传播行为。

    输出合并

    同一个Saga下的多个SagaTaskseq相同,则并行执行。这多个SagaTask的输出进行合并后,成为下个SagaTask的输入。

    合并操作如下: Saga1codecode1Saga2codecode2,如果输出结果完全相同,则合并结果为1或者2的输出。

    Saga1输出 Saga2输出 合并结果
    {"name":"23"} null {"name":"23"}
    {"name":"23"} {"name":"23333"} {"name":"23333"}结果被最后一个覆盖
    {"name":"23"} {"age":23} {"name":"23333","age":23}
    [{"id":1},{"id":2}] {"age":23} {"code1":[{"id":1},{"id":2}],"age":23}
    false null {“code1”:false}
    "test" 23 {"code1":"test","code2":23}
    "test" "23" {"code1":"test","code2":"23"}

    如下:如果这次的输出和输入一样,直接将接收数据返回即可。

    @SagaTask(code = "test", sagaCode = "iam-create-project", seq = 1)
    public String iamCreateUser(String data) {
        return data;
    }
    

    这样默认根据方法返回值即String生成的outputChema是错误的,最好手动指定,即:

    @SagaTask(code = "test", sagaCode = "iam-create-project", seq = 1,  outputSchemaClass = AsgardUser.class)
    public String iamCreateUser(String data) {
        return data;
    }
    

    或者指定正确的返回值

    @SagaTask(code = "test", sagaCode = "iam-create-project", seq = 1)
    public AsgardUser iamCreateUser(String data) {
        AsgardUser asgardUser = objectMapper.readValue(data, AsgardUser.class);
        return asgardUser;
    }
    

    消费端模型

    一个定时任务线程定时拉取消息,拉取的消息放到一个线程安全的set里,再由消息消费线程池异步消费,每消费完成(无论成功还是失败)set从中删除,直到set为空再进行下一次拉取消费。

    消费端事务

    1. @SagaTask注解的方法封装了事务,有如下事务属性可配置。
    字段 作用
    transactionIsolation 事务的隔离级别
    transactionManager 使用的事务管理器

    `

    1. 如果@SagaTask方法里面自己又添加了事务,则形成嵌套事务,自己添加的事务设置合适的事务传播行为即可。

    2. @SagaTask的方法执行遇到任何异常都会回滚事务,如果无需回滚,则手动捕获该异常即可,如下:

      @SagaTask(code = "book-tour-hotel",
              description = "预定酒店",
              sagaCode = "book-tour-package",
              concurrentLimitNum = 2,
              seq = 5)
      public TourDTO bookHotel(String data) throws IOException {
          TourDTO tour = mapper.readValue(data, TourDTO.class);
          TourHotel hotel = new TourHotel();
          hotel.setUserId(tour.getUserId());
          hotel.setTourId(tour.getTourId());
          if (tourHotelMapper.insert(hotel) != 1) {
              throw new CommonException("error.tour.bookHotel");
          }
          tour.setHotelId(hotel.getId());
          //比如该feign做一些清理,成功与否无关紧要,则可以手动捕获该异常。
          try {
              XXXFeign.cleanup(tour.getUserId());
          } catch (Exception e) {
              LOGGER
          }
          return tour;
      }
      
    3. @SagaTask的方法里含有feign调用, 最好能保证feign调用的”幂等性”

    Asgard 服务

    在北欧神话中,阿斯加德(古诺斯语:Ásgarðr,英语:Asgard)是神的领域,亦可称作阿萨神域。在Choerodon 中,我们用Asgard。来管理choerodon 中所有的分布式事务。

    asgard-service 启动后,会主动拉取@Saga@SagaTask的注解配置。

    为了防止消费端多实例拉取出现消费,对每条消息设置一个实例锁,锁为·sagaCode + taskCode`。

    更多有关的信息可以从asgard-service获取。