1

I have a springboot application that implements a user referral system. One use case is that when a user signs up using a valid referral code from another user, the referrer user gets one reward point, and for every five points they get 10$ in credit. According to this, I have implemented a use case in my application that honors these business rules, and to test proper behavior under high concurrency, I've created an integration test using @DataJpaTest and spring data repositories and H2 DB as storage system. In my test, I create a first user, and then I create a certain amount of users using the first user referral code, every one of those users is created on a different thread using a thread pool executor to spawn those threads. My problem is that the users created through thread pool spawned threads don't see the first user created in the main thread, even though I'm using JpaRepository.saveAndFlush() method to save them.

Could someone give me an explanation about what's happening here? Is it because Hibernate's session is not thread-safe?

You can see my code below, the first test has been simplified to just check the amount of user's in the repository.

@DataJpaTest(includeFilters = @ComponentScan.Filter(type = FilterType.ANNOTATION, classes = Repository.class))
public class JpaTest {

    @Autowired
    private TestEntityManager entityManager;

    @Autowired
    @Qualifier("JpaUserRepository")
    private JpaUserRepository userRepository;

    @Autowired
    @Qualifier("JpaReferralRepository")
    private ReferralRepository referralRepository;

    private RegisterReferredUser registerReferredUser;
    private CreateUser createUser;
    private GetUser getUser;

    @BeforeEach
    void setUp() {
        registerReferredUser = new RegisterReferredUser(referralRepository, userRepository);
        createUser = new CreateUser(userRepository, referralRepository, registerReferredUser);
        getUser = new GetUser(userRepository);
    }

    @Test
    void createUser_shouldWorksProperlyUnderConcurrentExecution() throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        EmailAddress referrerUserEmail = EmailAddress.of("john.doe@acme.inc");
        User referrerUser = createUser.execute(new CreateUserCommand(referrerUserEmail.getValue(), null));
        String referralCode = referrerUser.getReferralCode().getValue();
        int maxIterations = 10;

        for (int i = 0; i < maxIterations; i++) {
            int emailSeed = i;
            executor.submit(() -> {
                    createUser.execute(new CreateUserCommand(anEmailAddress(emailSeed), referralCode));
            });
        }

        executor.shutdown();
        if (!executor.awaitTermination(20, TimeUnit.SECONDS)) {
            fail("Executor didn't finish in time");
        }

        assertThat(entityManager.getEntityManager().createQuery("from JpaUser").getResultList().size()).isEqualTo(maxIterations + 1);
        // This fails: just 1 user in the repository, however, if I register users without referral (without checking the existence of the first user), users are created and this pass
    }

    @Test
    void just_a_POC() throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        userRepository.save(UserMother.aUserWithEmail("john.doe@acme.inc"));
        int maxIterations = 10;

        for (int i = 0; i < maxIterations; i++) {
            int emailSeed = i;
            executor.submit(() -> userRepository.save(UserMother.aUserWithEmail(anEmailAddress(emailSeed))));
        }

        executor.shutdown();
        if (!executor.awaitTermination(20, TimeUnit.SECONDS)) {
            fail("Executor didn't finish in time");
        }

        assertThat(entityManager.getEntityManager().createQuery("from JpaUser").getResultList().size()).isEqualTo(maxIterations + 1);
        // This pass
    }
}

In the CreateUser I have the following code:

private void assertReferralCodeIsValid(ReferralCode referralCode, EmailAddress email) {
    if (!userRepository.findByReferralCode(referralCode).isPresent()) {
        throw new NonExistentReferralCode(referralCode);
    }
    if (referralRepository.findByEmailAndCode(email, referralCode).isPresent()) {
        throw new ReferralCodeAlreadyUsed(email, referralCode);
    }
}

And this is the JpaUserRepository.save() method:

@Repository("JpaUserRepository")
public class JpaUserRepository implements UserRepository {
    
    private JpaUserCrudRepository crudRepository;

    public JpaUserRepository(JpaUserCrudRepository crudRepository) {
        this.crudRepository = crudRepository;
    }
       
    @Override
    public void save(User user) {
        crudRepository.saveAndFlush(JpaUser.fromDomain(user));
    }

}
beni0888
  • 1,050
  • 1
  • 12
  • 40
  • Where do the transactions get committed? Data doesn’t get seen by other threads until it is committed. – Nathan Hughes Jun 27 '20 at 02:00
  • I've made `CreateUser` transactional with `@Transactional(propagation=Propagation.REQUIRES_NEW)` and explicitly committed the transaction in test method after first user creation, this way the spawned threads can see the user in DB, and they're also persisted. However, it seems that threads themselves cannot see each other's changes, and I haven't been able to create and commit a new transaction for each thread. Could someone help with this? – beni0888 Jun 28 '20 at 14:11

1 Answers1

2

Look at the isolation level configured for your transactions. Database engines usually try to serve data as fast as possible without blocking (when possible). So if all your threads read a table at the same time, they may get an "uncommited" version of the records.

If you need synchronization, you can change the isolation level, or lock the table before working on it.

More on this topic:

Guillaume F.
  • 5,905
  • 2
  • 31
  • 59