1

I was new to Databricks and Scala. I was trying to run multiple Notebooks in parallel from Main Notebook.

I got the below code from Databricks website which run notebook parallel.

parallel

import scala.concurrent. [Future, Await}
import scala.concurrent.duration._
import scala.util.control.NonFatal


case class NotebookData (path: String, timeout: Int, parameters: Map [String, String] = Map.empty [String, String])

def parallelNotebooks (notebooks: Seq[NotebookData]): Future[Seq[String]] = {

import scala.concurrent.{Future, blocking, Await}

import java.util.concurrent.Executors
import scala.concurrent. ExecutionContext
import com.databricks.WorkflowException

val numNotebooksInParallel = 4

// If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once. 
// This code limits the number of parallel notebooks.

implicit val ec = ExecutionContext. fromExecutor (Executors.newFixed ThreadPool (numNotebooksInParallel)) 

val ctx = dbutils.notebook.getContext() I

Future.sequence (

notebooks.map { notebook =>

Future {

dbutils.notebook.setContext(ctx)

if (notebook.parameters.nonEmpty)

    dbutils.notebook.run(notebook.path, notebook. timeout, notebook.parameters)

else

    dbutils.notebook.run(notebook.path, notebook. timeout)

}

.recover {

case NonFatal(e) => s'ERROR: ${e.getMessage}"

}

}
)

}


def parallelNotebook (notebook: NotebookData): Future [String] = {

import scala.concurrent. {Future, blocking, Await}
import java.util.concurrent. Executors
import scala.concurrent. ExecutionContext. Implicits.global
import com.databricks.WorkflowException

val ctx = dbutils. notebook.getContext ()
// The simplest interface we can have but doesn't
// have protection for submitting to many notebooks in parallel at once
Future {
    dbutils. notebook.setContext(ctx)
    if (notebook.parameters.nonEmpty)

        dbutils.notebook.run(notebook.path, notebook. timeout, notebook.parameters)

    else

        dbutils.notebook.run (notebook.path, notebook. timeout)

}
.recover{
case NonFatal(e) => s'ERROR: ${e.getMessage}"
}


}

concurrent


import scala.concurrent. Await
import scala.concurrent.duration..
import scala.language.postfix0ps


var notebooks = Seq(
NotebookData("testing", 15),
NotebookData("testing-2", 15, Map ("Hello" > "yes")),
NotebookData("testing-2", 15, Map ("Hello" -> "else")),
NotebookData("testing-2", 15, Map ("Hello" -> "lots of notebooks")
)

va res = parallelNotebooks (notebooks)
Await.result (res, 30 seconds) // this is a blocking call.
res.value


In above they are giving required notebooks path and parameters directly into a Sequence. But i want to give notebook paths based on IF condition.

For example:
val notebook1="Y"
val notebook2="Y"
val notebook2="N"

I want to provide notebooks with the flag as "Y" only to the sequence.

if(notebook1=="Y")

then this notebook should be added to Sequence.

sai m
  • 99
  • 1
  • 15

1 Answers1

1

The very simple way to achieve this is by using the dbutils.notebook utility. call the dbutils.notebook.run() from a notebook and you can run. If call multiple times from a same cell and will do the job.

dbutils.notebook.run( "/path/mynotebook", timeout_seconds = 60,
  arguments = {"arg1": "value1", "arg2": "value2"})

If we have to run concurrently, the instances of a parameterized notebook then,

  1. Create a pool of resources required for concurrency. This can be calculated how by how many instances need to exist for a run.
from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)
  1. Then consume this pool for the concurrent execution.
pool.map(
  lambda invalue: dbutils.notebook.run(
    timeout_seconds=60, "/User/Path/notebook", arguments= {"argument": invalue}),
    ["parameter1", "parameter2"])
  • Thanks, I am having different notebooks, not one notebook with different parameters. – sai m Jun 23 '22 at 12:54
  • See https://stackoverflow.com/questions/68937734/execute-multiple-notebooks-in-parallel-in-pyspark-databricks – Renhuai Feb 09 '23 at 20:00