3

I am doing async calls , after 10 seconds for 1 minute that means roughly 6 calls will be made , but Problem is in I want it apply delay on specific condition

Observable
.just(listOfSomethings_Locally)
.take(1, TimeUnit.MINUTES)
.serialize()
.delaySubscription( // this is confusing part 
() ->
    Observable.just(listOfItems_Network).take(10,TimeUnit.SECONDS)
) 

What i want is to delay the network call for 10 seconds except for first call , and cancel network call after 10 seconds , so i should have exact 6 calls in 1 minute.

EDIT

Due to confusion in scenario here is redefined scenario:

what i have is large list of drivers locally and i want to send request to each of them after every 10 seconds and listen to another subscriber to check if driver didn't canceled it within 10 seconds , this process will go for about 1 minute, if one driver cancel i should immediately send request to next one

Code written so far:

Observable.from(driversGot)
                .take(1,TimeUnit.MINUTES)
                .serialize()
                .map(this::requestRydeObservable) // requesting for single driver from driversGot (it's a network call)
                .flatMap(dif ->
                        Observable.amb(
                                kh.getFCM().driverCanceledRyde(), // listen for if driver cancel request returns integer
                                kh.getFCM().userRydeAccepted()) // listen for driver accept returns RydeAccepted object
                                .map(o -> {
                                    if (o instanceof Integer) {
                                        return new RydeAccepted();
                                    } else if (o instanceof RydeAccepted) {
                                        return (RydeAccepted) o;
                                    }
                                    return null;
                                }).delaySubscription(10,TimeUnit.SECONDS)
                )
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(fua -> {
                    if (fua == null) {
                        UiHelpers.showToast(context, "Invalid Firebase response");
                    } else if (!fua.getStatus()) { // ryde is canceled because object is empty
                        UiHelpers.showToast(context, "User canceled ryde");
                    } else { // ryde is accepted
                        UiHelpers.showToast(context, "User accepted ryde");
                    }
                }, t -> {
                    t.printStackTrace();
                    UiHelpers.showToast(context,"Error sending driver requests");
                }, UiHelpers::stopLoading);
Zulqurnain Jutt
  • 1,083
  • 3
  • 15
  • 41
  • Please provide more detailed information about your observables and your use-case. As I understand your update: You got a List of Items which will be transformed into a Observable. You want to process one element at a time, which has been pushed to you. The values must be scheduled at you within every 10 sec. Example: Sec 0: Value 1, Sec 10: Value 2. Each emitted value will be processed via a web-call. You check on another obs if the item has been cancelled within 10 sec. If not the process for one element will be 60 seconds. If cancelled within 10 sec, you start the next one in line? – Sergej Isbrecht Nov 15 '16 at 11:57
  • You understand the flow exactly except, 60 seconds is overall limit time for all elements in list – Zulqurnain Jutt Nov 15 '16 at 12:42
  • So, your list has 6 elements in it, or how would you like to finish in 60 seconds, if you schnedule one element every 10 seconds? – Sergej Isbrecht Nov 15 '16 at 12:45
  • Each element have 10 seconds quote but it can be cancelled within it so next element should immediately schedule – Zulqurnain Jutt Nov 15 '16 at 12:49
  • Ok, got it. Could you please provide a little implementation detail (return type) for driverCanceledRyde, userRydeAccepted, requestRydeObservable, fua – Sergej Isbrecht Nov 15 '16 at 12:54
  • Wait, this does not make sense with the 10 seconds scheduling rate. I think you want to schedule a action, if it does not get a result in 10 seconds, it will be cancelled and the next one will be scheduled. If it does result within the 10 seconds, the next one will be scheduled? – Sergej Isbrecht Nov 15 '16 at 12:57
  • Drivercancel and rydeaccepted are just observable that may or may not emit value , if it does drivercancel is called next one will schedule if rydeaccepted called overall all requests will be cancelled and some operation will be performed – Zulqurnain Jutt Nov 15 '16 at 13:06
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/128180/discussion-between-mr-z-and-hans-wurst). – Zulqurnain Jutt Nov 15 '16 at 15:31

3 Answers3

2

Feedback on your code

You don't need take and serialize as just will emit stuff immediately + serially already.

delaySubscription seems to be a weird choice as after the passed observable generates an event, further events are not delayed (which contradicts your case) delaySubscription

Option #1, rx only

Use delay + calculate the individual delays for the rest of the events (so 1st is delayed for 0 second, 2nd delayed for 1 second, 3rd for 3, ...)

            AtomicLong counter = new AtomicLong(0);
    System.out.println(new Date());
    Observable.just("1", "2", "3", "4", "5", "6")
        .delay(item -> Observable.just(item).delay(counter.getAndIncrement(), TimeUnit.SECONDS))
        .subscribe(new Consumer<String>() {
            public void accept(String result) throws Exception {
                System.out.println(result + " " + new Date());
            }
        });        
        System.in.read();

Option #2: rate limiting

It seems your usecase fits to rate limiting, so we can use RateLimiter from guava:

            RateLimiter limiter = RateLimiter.create(1);
    System.out.println(new Date());
    Observable.just("1", "2", "3", "4", "5", "6")
        .map(r -> {
            limiter.acquire();
            return r;
        })
        .subscribe(new Consumer<String>() {
            public void accept(String result) throws Exception {
                System.out.println(result + " " + new Date());
            }
        });        
        System.in.read();

Both work similarly:

Tue Nov 15 11:14:34 EET 2016
1 Tue Nov 15 11:14:34 EET 2016
2 Tue Nov 15 11:14:35 EET 2016
3 Tue Nov 15 11:14:36 EET 2016
4 Tue Nov 15 11:14:37 EET 2016
5 Tue Nov 15 11:14:38 EET 2016
6 Tue Nov 15 11:14:39 EET 2016

Rate limiter would work better in case of your request e.g. handing for 5 seconds, then it would allow next requests go faster to compensate for the delay and reach the goal of 1req/s for 10 seconds.

Ivan
  • 3,781
  • 16
  • 20
  • in your option#1 , how to limit request for 1 minute time , as each request is executing after `(order number)-1` seconds , but it doesn't fit my question , what i have is large list of drivers locally and i want to send request to each of them after every 10 seconds and listen to another subscriber to check if driver didn't canceled it within 10 seconds , this process will go for about 1 minute, if one driver cancel i should immediately send request to next one – Zulqurnain Jutt Nov 15 '16 at 11:20
0

Hello you can use retry with a delay there is a way how you can do that here

Community
  • 1
  • 1
Boukharist
  • 1,219
  • 11
  • 19
0

I would like to update @Ivans post, because it is missing error-handling and is using side-effects.

This post will only use RxJava-operators. The observable will provide a value every 10 seconds. The request will timeout in 10 seconds. If it hits the timeout, a fallback value will be returned.

The first test-method will receive 10 values in 60 seconds. The observable may finish before 60 seconds, if the last request finishes earlier than 10 seconds.

public class TimeOutTest {
    private static String DUMMY_VALUE = "ERROR";

    @Test
    public void handles_in_60_seconds() throws Exception {
        List<Integer> actions = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        Observable<Long> timer = Observable.interval(0, 10, TimeUnit.SECONDS).take(6);

        Observable<Integer> vales = Observable.fromIterable(actions)
                .take(6);

        Observable<Integer> observable = Observable.zip(timer, vales, (time, result) -> {
            return result;
        });

        Observable<String> stringObservable = observable.flatMap(integer -> {
            return longNetworkLong(9_000)
                    .timeout(10, TimeUnit.SECONDS)
                    .onErrorReturnItem(DUMMY_VALUE);
        }).doOnNext(s -> System.out.println("VALUE"));

        stringObservable.test()
                .awaitDone(60, TimeUnit.SECONDS)
                .assertValueCount(6);
    }

    @Test
    public void last_two_values_timeOut() throws Exception {
        List<Integer> actions = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        Observable<Long> timer = Observable.interval(0, 10, TimeUnit.SECONDS).take(6);

        Observable<Integer> vales = Observable.fromIterable(actions)
                .take(6);

        Observable<Integer> observable = Observable.zip(timer, vales, (time, result) -> {
            return result;
        });

        Observable<String> stringObservable = observable
                .map(integer -> integer * 2500)
                .flatMap(integer -> {
                    return longNetworkLong(integer)
                            .timeout(10, TimeUnit.SECONDS)
                            .doOnError(throwable -> System.out.print("Timeout hit?"))
                            .onErrorReturnItem(DUMMY_VALUE);
                })
                .doOnNext(s -> System.out.println("VALUE"))
                .filter(s -> !Objects.equals(s, DUMMY_VALUE));

        stringObservable.test()
                .awaitDone(60, TimeUnit.SECONDS)
                .assertValueCount(4);

    }

    private Observable<String> longNetworkLong(int delayTime) {
        return Observable.fromCallable(() -> {
            Thread.sleep(delayTime);
            return "result";
        });
    }
}
Sergej Isbrecht
  • 3,842
  • 18
  • 27
  • 1
    It takes a List and transforms it into an Observable. There is another Observable which emmites a value every 10 seconds. I take 6 values from both and Zip them together. So, I will get a value pushed at me every 10 seconds, starting with second 0. For each value I invoke a long running method: longNetworkLong. If the result of longNetworkLong Observable takes longer than 10 seconds to come in, I cancel the request and provide a fallback-value. I will have a look at your new requirement. – Sergej Isbrecht Nov 15 '16 at 11:45