1

I have a question about using the kalman filter to predict t+2 values. As we know, the basic kalman filter has two steps, predict and update. The predict part can generate xt based on xt-1. Here are some sample codes I found on the Internet.

import numpy as np

class KalmanFilter(object):
    def __init__(self, F = None, B = None, H = None, Q = None, R = None, P = None, x0 = None):

        if(F is None or H is None):
            raise ValueError("Set proper system dynamics.")

        self.n = F.shape[1]
        self.m = H.shape[1]

        self.F = F
        self.H = H
        self.B = 0 if B is None else B
        self.Q = np.eye(self.n) if Q is None else Q
        self.R = np.eye(self.n) if R is None else R
        self.P = np.eye(self.n) if P is None else P
        self.x = np.zeros((self.n, 1)) if x0 is None else x0

    def predict(self, u = 0):
        self.x = np.dot(self.F, self.x) + np.dot(self.B, u)
        self.P = np.dot(np.dot(self.F, self.P), self.F.T) + self.Q
        return self.x

    def update(self, z):
        y = z - np.dot(self.H, self.x)
        S = self.R + np.dot(self.H, np.dot(self.P, self.H.T))
        K = np.dot(np.dot(self.P, self.H.T), np.linalg.inv(S))
        self.x = self.x + np.dot(K, y)
        I = np.eye(self.n)
        self.P = np.dot(np.dot(I - np.dot(K, self.H), self.P), 
            (I - np.dot(K, self.H)).T) + np.dot(np.dot(K, self.R), K.T)

def example():
    dt = 1.0/60
    F = np.array([[1, dt, 0], [0, 1, dt], [0, 0, 1]])
    H = np.array([1, 0, 0]).reshape(1, 3)
    Q = np.array([[0.05, 0.05, 0.0], [0.05, 0.05, 0.0], [0.0, 0.0, 0.0]])
    R = np.array([0.5]).reshape(1, 1)

    x = np.linspace(-10, 10, 100)
    measurements = - (x**2 + 2*x - 2)  + np.random.normal(0, 2, 100)

    kf = KalmanFilter(F = F, H = H, Q = Q, R = R)
    predictions = []

    for z in measurements:
        predictions.append(np.dot(H,  kf.predict())[0])
        kf.update(z)

    import matplotlib.pyplot as plt
    plt.plot(range(len(measurements)), measurements, label = 'Measurements')
    plt.plot(range(len(predictions)), np.array(predictions), label = 'Kalman Filter Prediction')
    plt.legend()
    plt.show()

if __name__ == '__main__':
    example()

In this problem, we use the value of t-1 to predict t and update with the value of t. If I want to predict the value of t+1 based on t. I changed something accordingly:

import numpy as np

class KalmanFilter(object):
    def __init__(self, F = None, F_1 = None, B = None, H = None, Q = None, R = None, P = None, x0 = None):

        if(F is None or H is None):
            raise ValueError("Set proper system dynamics.")

        self.n = F.shape[1]
        self.m = H.shape[1]

        self.F = F
        self.F_1 = F_1
        self.H = H
        self.B = 0 if B is None else B
        self.Q = np.eye(self.n) if Q is None else Q
        self.R = np.eye(self.n) if R is None else R       
        self.P = np.eye(self.n) if P is None else P
        self.x = np.zeros((self.n, 1)) if x0 is None else x0

    def predict(self, u = 0):
        self.x = np.dot(self.F, self.x) + np.dot(self.B, u)
        self.P = np.dot(np.dot(self.F, self.P), self.F_1) + self.Q
        return self.x

    def update(self, z):
        y = z - np.dot(self.H, self.x)
        S = self.R + np.dot(self.H, np.dot(self.P, self.H.T))
        K = np.dot(np.dot(self.P, self.H.T), np.linalg.inv(S))
        self.x = self.x + np.dot(K, y)
        I = np.eye(self.n)
        self.P = np.dot(np.dot(I - np.dot(K, self.H), self.P), 
                        (I - np.dot(K, self.H)).T) + np.dot(np.dot(K, self.R), K.T)


def example():
    dt = 1.0/60
    F_0 = np.array([[1, dt, 0], [0, 1, dt], [0, 0, 1]])
    F = np.dot(F_0, F_0)
    F_1 = np.dot(F_0.T, F_0.T)
    H = np.array([1, 0, 0]).reshape(1, 3)
    Q = np.array([[0.05, 0.05, 0.0], [0.05, 0.05, 0.0], [0.0, 0.0, 0.0]])
    R = np.array([0.5]).reshape(1, 1)

    x = np.linspace(-10, 10, 100)
    measurements = - (x**2 + 2*x - 2)  + np.random.normal(0, 2, 100)

    kf = KalmanFilter(F = F, F_1 = F_1, H = H, Q = Q, R = R)
    predictions = []

    for i in range(1, len(measurements), 2):
            predictions.append(np.dot(H,  kf.predict())[0])
            kf.update(measurements[i])

    import matplotlib.pyplot as plt
    plt.plot(range(len(measurements)), measurements, label = 'Measurements')
    plt.plot(range(len(predictions)), np.array(predictions), label = 'Kalman Filter Prediction')
    plt.legend()
    plt.show()

if __name__ == '__main__':
    example()

The major changes are these two:

  1. I changed F matrix.

  2. I used the value of t+1 timestep to update my result.

However, the length of results I got is only half of the original measurements. Because of I kind of jumping to update them.

I'm a little confused now. Does anybody have suggestions or solutions? Thank you so much

Pei Li
  • 302
  • 3
  • 15

1 Answers1

0

I see following problems:

  1. The prediction of x(t+1) based on x(t) is actually the same as prediction of x(t) based on x(t-1). Everything depends on the definition of the time step (dt)
  2. You don't need to change your F matrix, it was correct in the first code. But it depends on your dt.
  3. In your second code you replaced self.F.T through self.F_1. But T stands for transpose of F. It is not good. That's why probably your result vector has another dimensions.

So if you want to predict over some amount of time, only what you need is to know this amount of time. It would be your new dt, which of course will change your F and Q.

If your measurements are taken with a time step dt and you want to see what happens if each second measurement falls off you have two options:

Option 1

Change dt of the prediction step, so that it equals the time difference between two last measurements and recalculate the F matrix with the new value dt (be aware: in this case you will need to change your Q matrix, because the system gets more uncertainty over a bigger time step).

Option 2

Execute prediction with your original time step dt several times to fill the time period between the measurements. So if you want to process only each second measurement, your code would look like

for i in range(1, len(measurements), 2):
    kf.predict()
    kf.predict()
    kf.update(measurements[i])

UPDATE

To your question from the comment:

i       t      todo

1    0*dt      init
2    1*dt      predict
3    2*dt      predict, update
4    3*dt      predict
5    4*dt      predict, update

Did you mean this case?

Have a look at this post. It shows what happens if you predict a lot without new updates.

Anton
  • 4,544
  • 2
  • 25
  • 31
  • I have one concern. For example, if we have a dataset [1, 2, 3, 4, 5] with corresponding timestep [0, 1, 2, 3, 4]. Assuming we start from time 0 and want to predict the value of time 2, we will update it with '3'. Then next one will be time 4 and update with '5'. Somehow I feel I miss the value of odd timestep. Do you have any ideas about this problem? Thank you! – Pei Li Jun 06 '19 at 13:20
  • You can see it like this: for each point in time execute `predict` to cover the last time period and then if you have a measurement execute `update` – Anton Jun 06 '19 at 14:06
  • normally you initialize your filter with the first measurement, so you can see it as a kind of update. you can avoid updates as long as you want, but you need to predict each time. – Anton Jun 06 '19 at 14:22
  • Yes, thank you. I only have one concern, if I use it in 2dt timesteps, seems I lose some update chances, which may give me more errors. – Pei Li Jun 06 '19 at 14:31
  • You have to perform updates as often as you can, otherwise the error will grow rapidly. Why do you want to avoid some measurements? – Anton Jun 06 '19 at 14:42