I have a data frame with an arbitrarily large number of rows created by doing something similar to:
// pivot data to wide format
val wide = df.groupBy("id").pivot("ip").sum("msgs")
// drop columns and fill in null values
val dfmat = wide.drop("id").na.fill(0)
val dimnames = dfmat.columns
I have no idea how many different "ip"
's there will be. I'm then trying to take each row of dfmat
and create an RDD[Vector]
object for use with the org.apache.spark.mllib.Statistics.corr
. To do that I'm doing the below and running into errors:
// try a different mapping
val mat = dfmat.rdd.map(row => Vectors.parse(row.mkString("[",",","]")))
// create correlation matrix
val correlMatrix: Matrix = Statistics.corr(mat, "pearson")
This works find for small datasets (1 million or less records), but fails when operating on the full dataset. I also get really, really big log records with weird records like:
/* 125222 */ this.value_8326 = -1L;
/* 125223 */ this.isNull_8327 = true;
/* 125224 */ this.value_8327 = -1L;
/* 125225 */ this.isNull_8328 = true;
/* 125226 */ this.value_8328 = -1L;
/* 125227 */ this.isNull_8329 = true;
/* 125228 */ this.value_8329 = -1L;
/* 125229 */ this.isNull_8330 = true;
/* 125230 */ this.value_8330 = -1L;
/* 125231 */ this.isNull_8331 = true;
/* 125232 */ this.value_8331 = -1L;
/* 125233 */ this.isNull_8332 = true;
/* 125234 */ this.value_8332 = -1L;
/* 125235 */ this.isNull_8333 = true;
/* 125236 */ this.value_8333 = -1L;
/* 125237 */ }
/* 125238 */
/* 125239 */ public org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
/* 125240 */ mutableRow = row;
/* 125241 */ return this;
/* 125242 */ }
/* 125243 */
/* 125244 */ /* Provide immutable access to the last projected row. */
/* 125245 */ public InternalRow currentValue() {
/* 125246 */ return (InternalRow) mutableRow;
/* 125247 */ }
/* 125248 */
/* 125249 */ public java.lang.Object apply(java.lang.Object _i) {
/* 125250 */ InternalRow i = (InternalRow) _i;
/* 125251 */ apply16668_0(i);
/* 125252 */ apply16668_1(i);
/* 125253 */ apply16668_2(i);
/* 125254 */ apply16668_3(i);
/* 125255 */ apply16668_4(i);
/* 125256 */ apply16668_5(i);
/* 125257 */ apply16668_6(i);
/* 125258 */ apply16668_7(i);
/* 125259 */ apply16668_8(i);
/* 125260 */ apply16668_9(i);
/* 125261 */ apply16668_10(i);
/* 125262 */ apply16668_11(i);
/* 125263 */ apply16668_12(i);
/* 125264 */ apply16668_13(i);
/* 125265 */ apply16668_14(i);
/* 125266 */ apply16668_15(i);
/* 125267 */ apply16668_16(i);
/* 125268 */ apply16668_17(i);
/* 125269 */ apply16668_18(i);
/* 125270 */ // copy all the results into MutableRow
/* 125271 */ apply16669_0(i);
/* 125272 */ apply16669_1(i);
/* 125273 */ apply16669_2(i);
/* 125274 */ apply16669_3(i);
/* 125275 */ apply16669_4(i);
/* 125276 */ apply16669_5(i);
/* 125277 */ apply16669_6(i);
/* 125278 */ apply16669_7(i);
/* 125279 */ apply16669_8(i);
/* 125280 */ apply16669_9(i);
/* 125281 */ apply16669_10(i);
/* 125282 */ apply16669_11(i);
/* 125283 */ apply16669_12(i);
/* 125284 */ apply16669_13(i);
/* 125285 */ apply16669_14(i);
/* 125286 */ apply16669_15(i);
/* 125287 */ apply16669_16(i);
/* 125288 */ apply16669_17(i);
/* 125289 */ apply16669_18(i);
/* 125290 */ apply16669_19(i);
/* 125291 */ apply16669_20(i);
/* 125292 */ apply16669_21(i);
/* 125293 */ apply16669_22(i);
/* 125294 */ apply16669_23(i);
/* 125295 */ return mutableRow;
/* 125296 */ }
/* 125297 */ }
/* 125298 */
And finally:
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:555)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:575)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:572)
at org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
... 31 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Lorg/apache/spark/sql/catalyst/expressions/Expression;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
at org.codehaus.janino.CodeContext.write(CodeContext.java:854)
at org.codehaus.janino.CodeContext.writeShort(CodeContext.java:959)
at org.codehaus.janino.UnitCompiler.writeConstantFieldrefInfo(UnitCompiler.java:10279)
at org.codehaus.janino.UnitCompiler.putfield(UnitCompiler.java:9956)
at org.codehaus.janino.UnitCompiler.compileSet2(UnitCompiler.java:5086)
at org.codehaus.janino.UnitCompiler.access$11800(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler$15.visitFieldAccess(UnitCompiler.java:5062)
at org.codehaus.janino.Java$FieldAccess.accept(Java.java:3235)
at org.codehaus.janino.UnitCompiler.compileSet(UnitCompiler.java:5070)
at org.codehaus.janino.UnitCompiler.compileSet2(UnitCompiler.java:5095)
at org.codehaus.janino.UnitCompiler.access$11900(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler$15.visitFieldAccessExpression(UnitCompiler.java:5063)
at org.codehaus.janino.Java$FieldAccessExpression.accept(Java.java:3563)
at org.codehaus.janino.UnitCompiler.compileSet(UnitCompiler.java:5070)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2675)
at org.codehaus.janino.UnitCompiler.access$4500(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler$7.visitAssignment(UnitCompiler.java:2619)
at org.codehaus.janino.Java$Assignment.accept(Java.java:3405)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2654)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1643)
at org.codehaus.janino.UnitCompiler.access$1100(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler$4.visitExpressionStatement(UnitCompiler.java:936)
at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2097)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:958)
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1007)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2293)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:518)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:550)
... 35 more
Which looks like an error caused by automatic code generation. I'm not quite sure what is going on though. Any ideas on how to debug or how to do such a thing in a different way are appreciated. If there is no other suitable solution to do the same thing, then how do I decrease the size of the automatically generated code to be smaller than the constraint? Can I change the constraint?
Thanks,