11

I need to get a work-item from a work-queue and then sequentially run a series of containers to process each work-item. This can be done using initContainers (https://stackoverflow.com/a/46880653/94078)

What would be the recommended way of restarting the process to get the next work-item?

  • Jobs seem ideal but don't seem to support an infinite/indefinite number of completions.
  • Using a single Pod doesn't work because initContainers aren't restarted (https://github.com/kubernetes/kubernetes/issues/52345).
  • I would prefer to avoid the maintenance/learning overhead of a system like argo or brigade.

Thanks!

eug
  • 1,118
  • 1
  • 16
  • 25

2 Answers2

18

Jobs should be used for working with work queues. When using work queues you should not set the .spec.comletions (or set it to null). In that case Pods will keep getting created until one of the Pods exit successfully. It is a little awkward exiting from the (main) container with a failure state on purpose, but this is the specification. You may set .spec.parallelism to your liking irrespective of this setting; I've set it to 1 as it appears you do not want any parallelism.

In your question you did not specify what you want to do if the work queue gets empty, so I will give two solutions, one if you want to wait for new items (infinite) and one if want to end the job if the work queue gets empty (finite, but indefinite number of items).

Both examples use redis, but you can apply this pattern to your favorite queue. Note that the part that pops an item from the queue is not safe; if your Pod dies for some reason after having popped an item, that item will remain unprocessed or not fully processed. See the reliable-queue pattern for a proper solution.

To implement the sequential steps on each work item I've used init containers. Note that this really is a primitve solution, but you have limited options if you don't want to use some framework to implement a proper pipeline.

There is an asciinema if any would like to see this at work without deploying redis, etc.

Redis

To test this you'll need to create, at a minimum, a redis Pod and a Service. I am using the example from fine parallel processing work queue. You can deploy that with:

kubectl apply -f https://rawgit.com/kubernetes/website/master/docs/tasks/job/fine-parallel-processing-work-queue/redis-pod.yaml
kubectl apply -f https://rawgit.com/kubernetes/website/master/docs/tasks/job/fine-parallel-processing-work-queue/redis-service.yaml

The rest of this solution expects that you have a service name redis in the same namespace as your Job and it does not require authentication and a Pod called redis-master.

Inserting items

To insert some items in the work queue use this command (you will need bash for this to work):

echo -ne "rpush job "{1..10}"\n" | kubectl exec -it redis-master -- redis-cli

Infinite version

This version waits if the queue is empty thus it will never complete.

apiVersion: batch/v1
kind: Job
metadata:
  name: primitive-pipeline-infinite
spec:
  parallelism: 1
  completions: null
  template:
    metadata:
      name: primitive-pipeline-infinite
    spec:
      volumes: [{name: shared, emptyDir: {}}]
      initContainers:
      - name: pop-from-queue-unsafe
        image: redis
        command: ["sh","-c","redis-cli -h redis blpop job 0 >/shared/item.txt"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      - name: step-1
        image: busybox
        command: ["sh","-c","echo step-1 working on `cat /shared/item.txt` ...; sleep 5"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      - name: step-2
        image: busybox
        command: ["sh","-c","echo step-2 working on `cat /shared/item.txt` ...; sleep 5"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      - name: step-3
        image: busybox
        command: ["sh","-c","echo step-3 working on `cat /shared/item.txt` ...; sleep 5"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      containers:
      - name: done
        image: busybox
        command: ["sh","-c","echo all done with `cat /shared/item.txt`; sleep 1; exit 1"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      restartPolicy: Never

Finite version

This version stops the job if the queue is empty. Note the trick that the pop init container checks if the queue is empty and all the subsequent init containers and the main container immediately exit if it is indeed empty - this is the mechanism that signals Kubernetes that the Job is completed and there is no need to create new Pods for it.

apiVersion: batch/v1
kind: Job
metadata:
  name: primitive-pipeline-finite
spec:
  parallelism: 1
  completions: null
  template:
    metadata:
      name: primitive-pipeline-finite
    spec:
      volumes: [{name: shared, emptyDir: {}}]
      initContainers:
      - name: pop-from-queue-unsafe
        image: redis
        command: ["sh","-c","redis-cli -h redis lpop job >/shared/item.txt; grep -q . /shared/item.txt || :>/shared/done.txt"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      - name: step-1
        image: busybox
        command: ["sh","-c","[ -f /shared/done.txt ] && exit 0; echo step-1 working on `cat /shared/item.txt` ...; sleep 5"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      - name: step-2
        image: busybox
        command: ["sh","-c","[ -f /shared/done.txt ] && exit 0; echo step-2 working on `cat /shared/item.txt` ...; sleep 5"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      - name: step-3
        image: busybox
        command: ["sh","-c","[ -f /shared/done.txt ] && exit 0; echo step-3 working on `cat /shared/item.txt` ...; sleep 5"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      containers:
      - name: done
        image: busybox
        command: ["sh","-c","[ -f /shared/done.txt ] && exit 0; echo all done with `cat /shared/item.txt`; sleep 1; exit 1"]
        volumeMounts: [{name: shared, mountPath: /shared}]
      restartPolicy: Never
Janos Lenart
  • 25,074
  • 5
  • 73
  • 75
  • 2
    Thanks for this answer. I'm after the infinite version where new items are waited for - and this approach to set restartPolicy to Never and then fail the pod is new to me - but the resulting Error statuses are not great since they interfere with monitoring and make it harder to see actual Errors.. – eug Apr 04 '18 at 13:30
  • 2
    Yeah, I am not a fond of that behavior. How many items are you looking to process a minute? The reason I ask is, because if it's only say 10 items a minute you could set `completions: 1000000000` (1 billion) which will be enough for about 190 years (1000000000/10/60/24/365). Then you can exit successfully from the 'done' container. – Janos Lenart Apr 04 '18 at 16:22
  • Actually these jobs are generated manually, so it'll be more like 1 every minute max. This is basically what I've ended up using - just a normal Job. I've set completions to 10,000 but good point that I can go much higher - it's an int32. That will be plenty. – eug Apr 05 '18 at 02:32
  • This doesn't support automatic horizontal scaling of the worker pods based on message queue metrics, correct? – Connor Brinton Dec 27 '18 at 18:09
  • 1
    @connor.brinton, correct; this is specifically for sequential processing (see the question) – Janos Lenart Dec 27 '18 at 19:29
  • after 6 items I get : ``` message: Job has reached the specified backoff limit reason: BackoffLimitExceeded``` – MoShe Oct 15 '19 at 23:28
1

The easiest way in this case is to use CronJob. CronJob runs Jobs according to a schedule. For more information go through documentation.

Here is an example (I took it from here and modified it)

apiVersion: batch/v1beta1
kind: CronJob 
metadata:
  name: sequential-jobs
spec:
  schedule: "*/1 * * * *" #Here is the schedule in Linux-cron format
  jobTemplate:
    spec:
      template:
        metadata:
          name: sequential-job
        spec:
          initContainers:
          - name: job-1
            image: busybox
            command: ['sh', '-c', 'for i in 1 2 3; do echo "job-1 `date`" && sleep 5s; done;']
          - name: job-2
            image: busybox
            command: ['sh', '-c', 'for i in 1 2 3; do echo "job-2 `date`" && sleep 5s; done;']
          containers:
          - name: job-done
            image: busybox
            command: ['sh', '-c', 'echo "job-1 and job-2 completed"']
          restartPolicy: Never

his solution however has some limitations:

  • It cannot run more often than 1 minute
  • If you need to process your work-items one-by-one you need to create additional check for running jobs in InitContainer
  • CronJobs are available only in Kubernetes 1.8 and higher
Artem Golenyaev
  • 2,568
  • 12
  • 20
  • 1
    Thanks, this is not too bad and once can set .spec.concurrencyPolicy to Forbid. The 1 minute limitation is not ideal though.. – eug Mar 31 '18 at 07:18