6

I'm trying to implement Vector Udf in C# Spark.

I have created .Net Spark environment by following Spark .Net. Vector Udf (Apache arrow and Microsoft.Data.Analysis both) worked for me for IntegerType column. Now, trying to send the Integer array type column to Vector Udf and couldn't find the way to achieve this.

usings

using System;
using System.Linq;
using Microsoft.Data.Analysis;
using Microsoft.Spark.Sql;
using func = Microsoft.Spark.Sql.Functions;
using DataFrame = Microsoft.Spark.Sql.DataFrame;
using Arrow = Apache.Arrow;

program

SparkSession spark = SparkSession
                .Builder()
                .AppName("sample")
                .GetOrCreate();

DataFrame dataFrame = spark.Range(0, 100).Repartition(4);
            
            Func<Column, Column> array20 = func.Udf<int, int[]>(
                (col1) => Enumerable.Range(0, col1).ToArray());

            dataFrame = dataFrame.WithColumn("array", array20(dataFrame["id"]));

// Apache Arrow
            var arrowVectorUdf = ArrowFunctions.VectorUdf<Arrow.UInt64Array, Arrow.Int64Array>((id) =>
            {
                var int32Array = new Arrow.Int64Array.Builder();
                var count = id.Length;
                foreach (var item in id.Data.Children)
                {
                    int32Array.Append(item.Length + count);
                }
                return int32Array.Build();
            });

// Microsoft.Data.Analysis
            var dataFrameVector = DataFrameFunctions.VectorUdf<Int64DataFrameColumn, Int64DataFrameColumn>((id) => id + id.Length);

Working

            dataFrame = dataFrame.WithColumn("arrowVectorUdfId", arrowVectorUdf(dataFrame["id"]));

            dataFrame = dataFrame.WithColumn("dataFrameVectorId", dataFrameVector(dataFrame["id"]));

Not working

            dataFrame = dataFrame.WithColumn("arrowVectorUdf", arrowVectorUdf(dataFrame["array"]));

            dataFrame = dataFrame.WithColumn("dataFrameVector", dataFrameVector(dataFrame["array"]));

Above Udfs will work, if i send the "id" column instead of "array" column. I'm not sure , what type should be the argument of the Udfs for "array" column. Above code results same error like below for Apache.Arrow and Microsoft.Data.Analysis,

 [2021-03-25T07:02:05.9218517Z] [LAPTOP-0S8GNQ52] [Error] [TaskRunner] [0] Exiting with exception: System.IO.InvalidDataException: Arrow primitive 'List' is unsupported.
   at Apache.Arrow.Ipc.MessageSerializer.GetFieldArrowType(Field field)
   at Apache.Arrow.Ipc.MessageSerializer.GetSchema(Schema schema)
   at Apache.Arrow.Ipc.ArrowStreamReaderImplementation.<ReadSchema>b__12_0(Memory`1 buff)
   at Apache.Arrow.ArrayPoolExtensions.RentReturn(ArrayPool`1 pool, Int32 length, Action`1 action)
   at Apache.Arrow.Ipc.ArrowStreamReaderImplementation.ReadRecordBatch()
   at Microsoft.Spark.Worker.Command.ArrowBasedCommandExecutor.<GetInputIterator>d__2.MoveNext()
   at Microsoft.Spark.Worker.Command.ArrowOrDataFrameSqlCommandExecutor.ExecuteDataFrameSqlCommand(Stream inputStream, Stream outputStream, SqlCommand[] commands)
   at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Boolean& readComplete)
   at Microsoft.Spark.Worker.TaskRunner.Run()
[2021-03-25T07:02:05.9245061Z] [LAPTOP-0S8GNQ52] [Info] [TaskRunner] [0] Finished running 0 task(s).
[2021-03-25T07:02:05.9249567Z] [LAPTOP-0S8GNQ52] [Info] [SimpleWorker] RunSimpleWorker() finished successfully
21/03/25 12:32:05 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.IllegalArgumentException
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:669)
        at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
        at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:163)
        at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:169)
        at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:160)
        at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:62)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:89)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
21/03/25 12:32:06 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job
21/03/25 12:32:06 ERROR DotnetBackendHandler: Failed to execute 'showString' on 'org.apache.spark.sql.Dataset' with args=([Type=java.lang.Integer, Value: 20], [Type=java.lang.Integer, Value: 20], [Type=java.lang.Boolean, Value: false])
WPFUser
  • 1,145
  • 7
  • 24
  • you say this works with "id" but not with "array" column; can u separately paste both code pieces - working and non-working ones – gawkface Mar 31 '21 at 04:58
  • 1
    @harshvchawla updated – WPFUser Mar 31 '21 at 08:46
  • 1
    i dont know spark at all, so all this is mere speculation: (and dont know how dataFrame looks like when u call the Repartition) - your func.Udf requires an int Array[] argument so maybe the contents in column dataFram["id"] are int array[] but contents of column dataFrame["array"] are of a different type? – gawkface Apr 01 '21 at 05:06
  • yes they are different types and array udf requires different argument.. im trying to find out what i should as argument type.. i couldnt find a doc or sample to achieve the same – WPFUser Apr 01 '21 at 10:49
  • how does ur data after the repartition call look like; any details of sample u can post? also try debugging by way of using typeof or just doing a ToArray / ToString stuff to see if the compiler tells u of any problem? Like u can even try to enforce data types instead of var while debugging maybe – gawkface Apr 01 '21 at 16:41
  • you can pass array like this dataFrame = dataFrame.WithColumn("coordinateArray", udfArray(dataFrame["array"])); as mentioned in the following link https://medium.com/@3rd/net-for-apache-spark-udf-vs2019-docker-for-windows-and-a-christmas-puzzle-6eb5bf9f3b66 – Feras Salim Apr 04 '21 at 16:23
  • @harshvchawla problem is just with the argument type of udf. I couldnt set correct argument type. – WPFUser Apr 05 '21 at 15:50
  • @FerasSalim i'm having problem with vector udf. simple udf sends a single row of data at a time, but i want to send multiple row of data at a time. im trying to send multiples rows of array data with the provisions like in code snippet. Thanks – WPFUser Apr 05 '21 at 15:53

1 Answers1

1

It works for me with both of your code samples. I've created the spark environment just like you, except that the environment was not working for me with Hadoop 2.7 and I separately installed Hadoop 2.7.4

Hamed
  • 150
  • 2
  • 16