I would like to be able to write a pyspark dataframe to kafka as avro, with automatically registering schema with the Azure schema registry in EventHub.
This should be possible according to docs e.g. https://learn.microsoft.com/en-us/azure/databricks/structured-streaming/avro-dataframe
However the code below fails with
TypeError: to_avro() takes from 1 to 2 positional arguments but 3 were given
So I am probably not importing the right to_avro method? My environment is Azure Databricks - DBR 11.3 LTS
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_addr = "https://someeventhub.servicebus.windows.net:8081"
df \
.select(
to_avro(col("key"), lit("t-key"),
schema_registry_addr).alias("key"),
to_avro(col("value"), lit("t-value"), schema_registry_addr).alias("value")) \
.writeStream \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
.option("topic", TOPIC) \
.option("checkpointLocation", "/FileStore/checkpoint") \
.start()