1

I have a generator which generates events for Flink CEP, code for which is given below. Basically, I am using Thread.sleep() and I have read somewhere that java can't sleep less than 1 millisecond even we use System.nanoTime(). Code for the generator is

public class RR_interval_Gen extends RichParallelSourceFunction<RRIntervalStreamEvent> {

Integer InputRate  ;  // events/second
Integer Sleeptime ;
Integer NumberOfEvents;

public RR_interval_Gen(Integer inputRate, Integer numberOfEvents ) {
    this.InputRate = inputRate;
    Sleeptime = 1000 / InputRate;
    NumberOfEvents = numberOfEvents;
}

@Override
public void run(SourceContext<RRIntervalStreamEvent> sourceContext) throws Exception {


    long currentTime;
    Random random = new Random();
    int RRInterval;
    int Sensor_id;

   for(int i = 1 ; i <= NumberOfEvents ; i++) {
        Sensor_id =  2;
       currentTime = System.currentTimeMillis();

       // int randomNum = rand.nextInt((max - min) + 1) + min;
       RRInterval =  10 + random.nextInt((20-10)+ 1);

        RRIntervalStreamEvent stream = new RRIntervalStreamEvent(Sensor_id,currentTime,RRInterval);

        synchronized (sourceContext.getCheckpointLock())
        {
            sourceContext.collect(stream);
             }
        Thread.sleep(Sleeptime);
    }
}

@Override
public void cancel() {

}

}

I will specify my requirement here in simple words. I want generator class to generate events, let's say an ECG stream at 1200 Hz. This generator will accept parameters like input rate and total time for which we have to generate the stream.

So far so good, the issue is that I need to send more than 1000 events / second. How can I do this by using generator function which is generating values U[10,20]?

Also please let me know if I am using wrong way to generate x number of events / second in the above below.

Sleeptime = 1000 / InputRate;

Thanks in advance

Amarjit Dhillon
  • 2,718
  • 3
  • 23
  • 62
  • Why do you need to sleep? – jontro Nov 08 '17 at 00:54
  • Just to make sure events generated are uniform , rather than sending stream and then wait to complete specified tume interval and then send again, that way events will not be uniform inter arrival time – Amarjit Dhillon Nov 08 '17 at 01:17
  • 1
    `SleepTime` is an integer, so `Sleeptime = 1000 / InputRate;` is going to yield zero if `inputRate` is greater than 1000. You're using the wrong datatypes, or the wrong timer resolution. If you work in milliseconds, you can't handle more than 1000 events per second. – user207421 Nov 08 '17 at 01:18
  • 1
    You could busy wait as stated in this answer https://stackoverflow.com/a/11498647/429972 . Not sure if that's recommended though – jontro Nov 08 '17 at 01:20

1 Answers1

0

The least sleep time in Windows systems is ~ 10 ms and in Linux and Macintosh is 1 millisecond as mentioned here.

The granularity of sleep is generally bound by the thread scheduler's interrupt period. In Linux, this interrupt period is generally 1ms in recent kernels. In Windows, the scheduler's interrupt period is normally around 10 or 15 milliseconds

Through my research, I learned that using the nano time sleep in java will not help as the issue in at OS level. If you want to send data at arrival rate > 1000 in a controlled way, then it can be done using Real-Time Operating Systems (RTOS), as they can sleep for less then a millisecond. Now, I have come up with another way of doing it, but in this solution, the interarrival times will not be constantly distributed.

Let's say you want arrival rate of 3000 events/ second, then you can create a for loop which iterates 3 times to send data in each iteration and then sleep for 1ms. So for the 3 tuples, the interarrival time will be close to one another, but the issue will be solved. This may be a stupid solution but it works.

Please let me know if there is some better solution to this.

Amarjit Dhillon
  • 2,718
  • 3
  • 23
  • 62