当前位置:  开发笔记 > 编程语言 > 正文

在RxJava服务中管理事务性的正确方法是什么?

如何解决《在RxJava服务中管理事务性的正确方法是什么?》经验,为你挑选了1个好方法。

我最近开始尝试使用RxJava,并且遇到了Netflix工程师的演示文稿,该工程师建议将我们的业务API移动到Observable API,例如:

public interface VideoService {
    Observable createVideoBasicInfo(VideoBasicInfo videoBasic);
    Observable getVideoBasicInfo(Integer videoId);
    Observable getVideoRating(Integer videoId);
}

但是,我没有找到任何解释如何在此服务中管理事务性的地方.起初我只是注释了我的服务实现@Transactional

@Service
@Transactional
public class VideoServiceImpl implements VideoService{

    @Autowired
    private VideoBasicInfoRepository basicInfoRepo;
    @Autowired
    private VideoRatingRepository ratingRepo;

    public Observable createVideoBasicInfo(VideoBasicInfo videoBasic){
        return Observable.create( s -> {
            s.onNext(basicInfoRepo.save(videBasic));
        });
    }

我们想要的是Object.createlambda(s -> { // This code })中所有代码的执行都发生在事务中.但是,实际发生的是:

    调用以createVideoBasicInfo()事务方式执行,返回冷可观察对象.

    save()执行就象一个原子事务.

显然它是有道理的,因为Spring代理适用于serviceImpl方法.我已经想到了如何做我真正期望的事情,例如启动程序化事务:

return Observable.create( s -> {
    VideoBasicInfo savedBasic = transactionTemplate.execute( status -> {
        VideoBasicInfo basicInfo = basicInfoRepo.save(videoBasicInfo);
        return basicInfo;
    });
    s.onNext(savedBasic);
});

这是在使用反应式API时管理事务的推荐方法吗?



1> John Scatter..:

Spring Data JpaRepository方法签名已标记为@Transactional,因此如果您只使用一个,那么您不需要做任何特殊的事情:

public interface PersonRepository extends JpaRepository {
}

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = {RepositoryConfiguration.class})
public class PersonRepositoryTest {
    private PersonRepository personRepository;

    @Autowired
    public void setPersonRepository(PersonRepository PersonRepository) {
        this.personRepository = PersonRepository;
    }

    @Test
    public void testReactiveSavePerson() {
        Person person = new Person("Jane", "Doe");
        assertNull(person.getId()); //null before save

        //save person
        Observable.create(s -> {
            s.onNext(personRepository.save(person));
        }).subscribe();

        //fetch from DB
        Person fetchedPerson = personRepository.findOne(person.getId());

        //should not be null
        assertNotNull(fetchedPerson);

        //should equal
        assertEquals(person.getId(), fetchedPerson.getId());
        assertEquals(person.getFirstName(), fetchedPerson.getFirstName());
    }
}

如果需要将多个存储库组合到一个事务中,可以使用类似下面的类:

@Component()
public class ObservableTxFactory {
    public final  Observable create(Observable.OnSubscribe f) {
        return new ObservableTx<>(this, f);
    }

    @Transactional
    public void call(Observable.OnSubscribe onSubscribe, Subscriber subscriber) {
        onSubscribe.call(subscriber);
    }

    private static class ObservableTx extends Observable {

        public ObservableTx(ObservableTxFactory observableTxFactory, OnSubscribe f) {
            super(new OnSubscribeDecorator<>(observableTxFactory, f));
        }
    }

    private static class OnSubscribeDecorator implements Observable.OnSubscribe {

        private final ObservableTxFactory observableTxFactory;
        private final Observable.OnSubscribe onSubscribe;

        OnSubscribeDecorator(final ObservableTxFactory observableTxFactory, final Observable.OnSubscribe s) {
            this.onSubscribe = s;
            this.observableTxFactory = observableTxFactory;
        }

        @Override
        public void call(Subscriber subscriber) {
            observableTxFactory.call(onSubscribe, subscriber);
        }
    }
}

工厂bean也需要定义:

@Bean
ObservableTxFactory observableTxFactory() {
    return new ObservableTxFactory();
}

服务:

@Service
public class PersonService {
    @Autowired
    PersonRepository personRepository;
    @Autowired
    ObservableTxFactory observableTxFactory;

    public Observable createPerson(String firstName, String lastName) {
        return observableTxFactory.create(s -> {
            Person p = new Person(firstName, lastName);
            s.onNext(personRepository.save(p));
        });
    }
}

测试:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = {RepositoryConfiguration.class})
public class PersonServiceTest {
    @Autowired
    PersonRepository personRepository;
    @Autowired
    ObservableTxFactory observableTxFactory;

    @Test
    public void testPersonService() {
        final PersonService service = new PersonService();
        service.personRepository = personRepository;
        service.observableTxFactory = observableTxFactory;

        final Observable personObservable = service.createPerson("John", "Doe");
        personObservable.subscribe();

        //fetch from DB
        final Person fetchedPerson = StreamSupport.stream(personRepository.findAll().spliterator(), false)
                .filter(p -> p.getFirstName().equals("John") && p.getLastName().equals("Doe"))
                .findFirst()
                .get();

        //should not be null
        assertNotNull(fetchedPerson);
    }

}

截图显示代理: 在此输入图像描述

推荐阅读
手机用户2402852307
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有