我最近开始尝试使用RxJava,并且遇到了Netflix工程师的演示文稿,该工程师建议将我们的业务API移动到Observable API,例如:
public interface VideoService { ObservablecreateVideoBasicInfo(VideoBasicInfo videoBasic); Observable getVideoBasicInfo(Integer videoId); Observable getVideoRating(Integer videoId); }
但是,我没有找到任何解释如何在此服务中管理事务性的地方.起初我只是注释了我的服务实现@Transactional
@Service @Transactional public class VideoServiceImpl implements VideoService{ @Autowired private VideoBasicInfoRepository basicInfoRepo; @Autowired private VideoRatingRepository ratingRepo; public ObservablecreateVideoBasicInfo(VideoBasicInfo videoBasic){ return Observable.create( s -> { s.onNext(basicInfoRepo.save(videBasic)); }); }
我们想要的是Object.create
lambda(s -> { // This code }
)中所有代码的执行都发生在事务中.但是,实际发生的是:
调用以createVideoBasicInfo()
事务方式执行,返回冷可观察对象.
的save()
执行就象一个原子事务.
显然它是有道理的,因为Spring代理适用于serviceImpl方法.我已经想到了如何做我真正期望的事情,例如启动程序化事务:
return Observable.create( s -> { VideoBasicInfo savedBasic = transactionTemplate.execute( status -> { VideoBasicInfo basicInfo = basicInfoRepo.save(videoBasicInfo); return basicInfo; }); s.onNext(savedBasic); });
这是在使用反应式API时管理事务的推荐方法吗?
Spring Data JpaRepository方法签名已标记为@Transactional,因此如果您只使用一个,那么您不需要做任何特殊的事情:
public interface PersonRepository extends JpaRepository{ }
@RunWith(SpringJUnit4ClassRunner.class) @SpringApplicationConfiguration(classes = {RepositoryConfiguration.class}) public class PersonRepositoryTest { private PersonRepository personRepository; @Autowired public void setPersonRepository(PersonRepository PersonRepository) { this.personRepository = PersonRepository; } @Test public void testReactiveSavePerson() { Person person = new Person("Jane", "Doe"); assertNull(person.getId()); //null before save //save person Observable.create(s -> { s.onNext(personRepository.save(person)); }).subscribe(); //fetch from DB Person fetchedPerson = personRepository.findOne(person.getId()); //should not be null assertNotNull(fetchedPerson); //should equal assertEquals(person.getId(), fetchedPerson.getId()); assertEquals(person.getFirstName(), fetchedPerson.getFirstName()); } }
如果需要将多个存储库组合到一个事务中,可以使用类似下面的类:
@Component() public class ObservableTxFactory { public finalObservable create(Observable.OnSubscribe f) { return new ObservableTx<>(this, f); } @Transactional public void call(Observable.OnSubscribe onSubscribe, Subscriber subscriber) { onSubscribe.call(subscriber); } private static class ObservableTx extends Observable { public ObservableTx(ObservableTxFactory observableTxFactory, OnSubscribe f) { super(new OnSubscribeDecorator<>(observableTxFactory, f)); } } private static class OnSubscribeDecorator implements Observable.OnSubscribe { private final ObservableTxFactory observableTxFactory; private final Observable.OnSubscribe onSubscribe; OnSubscribeDecorator(final ObservableTxFactory observableTxFactory, final Observable.OnSubscribe s) { this.onSubscribe = s; this.observableTxFactory = observableTxFactory; } @Override public void call(Subscriber super T> subscriber) { observableTxFactory.call(onSubscribe, subscriber); } } }
工厂bean也需要定义:
@Bean ObservableTxFactory observableTxFactory() { return new ObservableTxFactory(); }
服务:
@Service public class PersonService { @Autowired PersonRepository personRepository; @Autowired ObservableTxFactory observableTxFactory; public ObservablecreatePerson(String firstName, String lastName) { return observableTxFactory.create(s -> { Person p = new Person(firstName, lastName); s.onNext(personRepository.save(p)); }); } }
测试:
@RunWith(SpringJUnit4ClassRunner.class) @SpringApplicationConfiguration(classes = {RepositoryConfiguration.class}) public class PersonServiceTest { @Autowired PersonRepository personRepository; @Autowired ObservableTxFactory observableTxFactory; @Test public void testPersonService() { final PersonService service = new PersonService(); service.personRepository = personRepository; service.observableTxFactory = observableTxFactory; final ObservablepersonObservable = service.createPerson("John", "Doe"); personObservable.subscribe(); //fetch from DB final Person fetchedPerson = StreamSupport.stream(personRepository.findAll().spliterator(), false) .filter(p -> p.getFirstName().equals("John") && p.getLastName().equals("Doe")) .findFirst() .get(); //should not be null assertNotNull(fetchedPerson); } }
截图显示代理: