1

I am usally able to get a new dataset by adding new column to an existing dataset using withColumn. But I am not sure why this case is giving error.

Dataset<Row> inputDSAAcolonly = inputDSAA.select(colNameA);
Dataset<Row> inputDSBBcolonly = inputDSBB.select(colNameB);
inputDSBBcolonly.withColumn(colNameA, inputDSAAcolonly.apply(colNameA)).show();

where inputDSSAAcolonly is

+----+
|Exer|
+----+
|Some|
|None|
|None|
|None|

and inputDSSBBColonly is

+-----+
|Smoke|
+-----+
|Never|
|Regul|
|Occas|
|Never|

Basically single columns in the datasets.

I need a DS with 2 columns side by side. withColumn has worked but this throwing error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) Exer#7 missing from Smoke#47 in operator
!Project [Smoke#47, Exer#7 AS Exer#112];;!Project [Smoke#47, Exer#7 AS Exer#112]

pheeleeppoo
  • 1,491
  • 6
  • 25
  • 29
Binu
  • 153
  • 1
  • 1
  • 13

2 Answers2

1

You are basically trying to join the two datasets.

withColumn works on a dataframe with operations on the columns of the dataframe. You are trying to do operations on a different dataframe.

If indeed the issue is as simple as your code, then you can select both and do the relevant operations while both are used. Otherwise you would need to do a join

Assaf Mendelson
  • 12,701
  • 5
  • 47
  • 56
  • Agree with @Assaf. See also [here](http://stackoverflow.com/questions/40508489/spark-add-dataframe-column-to-another-dataframe-merge-two-dataframes). – pheeleeppoo Mar 01 '17 at 09:21
  • ..have been using it without zip or adding an index column with both the datasets... say I am able to do this... Dataset dswithColAAandTotal=dswithColAA.withColumn("Total", dswithColAA.col(colnames[0])); dswithColAAandTotal.show(); works fine giving me ColAA and a calculated column Total.... – Binu Mar 01 '17 at 10:15
  • where dswithColAA was arrived as below from a Dataset.. DSvaluesonly.createOrReplaceTempView("tmpTableAA"); String SQLQueryAA="select * from tmpTableAA"; Dataset dswithColAA=sqlctx.sql(SQLQueryAA).toDF(); – Binu Mar 01 '17 at 10:23
  • I tried to convert to DF() .,..but below doesn't work.... inputDSAAcolonly.createOrReplaceTempView("ColAA"); String SQLQueryColumnAA="select * from ColAA"; Dataset colAADF=sqlctx.sql(SQLQueryColumnAA).toDF(); inputDSBBcolonly.createOrReplaceTempView("ColBB"); String SQLQueryColumnBB="select * from ColBB"; Dataset colBBDF=sqlctx.sql(SQLQueryColumnBB).toDF(); Dataset ColAABB=colAADF.withColumn(colNameB, colBBDF.apply(colNameB));ColAABB.show() throws error .. – Binu Mar 01 '17 at 10:28
  • 1
    Unless the data comes from the SAME original dataframe with two different filters you will have to do a join. All the things you try use different dataframes with no relation between them. – Assaf Mendelson Mar 01 '17 at 18:21
  • @Binu : If you are okay with above, please care vote-up/accept answer as owner to close. – Ram Ghadiyaram Mar 08 '17 at 05:57
0
    Dataset<Row> inputDS=spark.read().option("header", false).
            option("inferSchema",true).csv("data/irisAA.csv");
    inputDS.show(4);
    String colAname=inputDS.columns()[0];
    log.error(colAname);
    String colBname=inputDS.columns()[1];
    log.error(colBname);
    Dataset<Row> DSColA=inputDS.select(inputDS.col(colAname));
    DSColA.show(4);
    Dataset<Row> DSColB=inputDS.select(inputDS.col(colBname));
    DSColB.show(4);
    Dataset<Row> DSColAandColA=DSColA.withColumn("Addt_Column", inputDS.col(colAname));
    DSColAandColA.show(4);
    /*Dataset<Row> DSColAandColB=DSColA.withColumn("Addt_Column", inputDS.col(colBname));
    DSColAandColB.show(4); //THIS FAILS........STILL DON'T GET WHY */
    Dataset<Row>  DSColAwithIndex=DSColA.withColumn("df1Key", monotonically_increasing_id());
    DSColAwithIndex.show(4);
    Dataset<Row>  DSColBwithIndex=DSColB.withColumn("df2Key", monotonically_increasing_id());
    DSColBwithIndex.show(4);
    DSColAwithIndex.join(DSColBwithIndex).show(4);
    DSColA.join(DSColB).show(4);
    Dataset<Row> DSwithJoinofTwo=DSColAwithIndex.join(DSColBwithIndex, col("df1Key").equalTo(col("df2Key")), "inner");
    DSwithJoinofTwo.show(4);
    Dataset<Row> DSwithJointrimmed=DSwithJoinofTwo.drop(DSwithJoinofTwo.apply("df1Key")).drop(DSwithJoinofTwo.apply("df2Key"));
    DSwithJointrimmed.show(4); //JOINED DATASET FINALLY OF COLUMN A AND COLUMN B FROM SAME OR DIFF. DATASETS
Binu
  • 153
  • 1
  • 1
  • 13
  • used withcolumn to add a column – Binu Mar 03 '17 at 15:14
  • used join to add the datasets by adding an index column to each and using inner join... withcolumn seems to work if you are trying to create a dataset with additional columns where the columns depend on a column of the same dataset...like Assaf suggested.. I will explore and try to understand withColumn usage and limitations better..for now above should be ok i guess...Thank you... – Binu Mar 03 '17 at 15:17