3

Following the breadcrumbs, I cobbled some code that seems to do what I want: run in the background, look at ongoing jobs, then collect... whatever information may be available:

def do_background_monitoring(sc: pyspark.context.SparkContext):
    thread = threading.Thread(target=monitor, args=[sc])
    thread.start()
    return thread

def monitor(sc: pyspark.context.SparkContext):
    job_tracker: pyspark.status.StatusTracker = sc.statusTracker() # should this go inside the loop?
    while True:
        time.sleep(1)
        for job_id in job_tracker.getActiveJobsIds():
            job: pyspark.status.SparkJobInfo = job_tracker.getJobInfo(job_id)
            stages = job.stageIds
            # ???

However, that's where I hit a dead end. According to the docs, stageIds is an int[], and apparently py4j or whatever doesn't know what to do with it? (py4j claims otherwise...)

ipdb> stages
JavaObject id=o34
ipdb> stages. 
          equals    notify    wait     
          getClass  notifyAll          
          hashCode  toString           
ipdb> stages.toString()
'[I@4b1009f3'

Is this a dead end? Are there other ways to achieve this? If I were willing to write scala to do this, would I be able to have just this bit be in Scala and keep the rest in Python?

badp
  • 11,409
  • 3
  • 61
  • 89

1 Answers1

1

...while the repl made it look like Python knew nothing about the object other than it was some sort of Object, py4j does make the contents of the array available to you:

ipdb> type(si)
<class 'py4j.java_collections.JavaArray'>
ipdb> tuple(si)
(0,)

and now I feel really silly :)

badp
  • 11,409
  • 3
  • 61
  • 89
  • Thank you for posting your eventual answer! I also just struggled way longer than this than the solution might make us feel is reasonable :D – fwetdb Oct 27 '22 at 09:41