The following question is rather long. It depicts this:
- I have a row dataset made of primitive types
a1, a2...a10, b1, b2...b8, c1..c4, d1
. - They are hiding objects
A
,B
,C
accompanied sometimes with others primitives attributes not in a class:d1
, for example. - I could return instead of a
Dataset<Row>
aDataset<E>
whereE
would be a class having for members theA
,B
,C
objects and the attributed1
. - But I want to avoid this, if I can, yet, and see how much I can approach a solution where I would return a dataset of joined objects [and attributes] :
Dataset<A+B+C+d1>
(where the+
sign means that objects are linked together by a join).
If it's possible, is it really manageable?
My code uses mostly Dataset<Row>
.
For example, I have a method that builds a (French) city description:
/**
* Obtenir un Dataset des communes.
* @param session Session Spark.
* @param anneeCOG Année du Code Officiel Géographique de référence.
* @param verifications Vérifications demandées.
* @return Dataset des communes.
* @throws TechniqueException si un incident survient.
*/
public Dataset<Row> rowCommunes(SparkSession session, int anneeCOG, Verification... verifications) throws TechniqueException {
String nomStore = "communes_par_codeCommune";
Dataset<Row> communes = loadFromStore(session, "{0}_{1,number,#0}", nomStore, anneeCOG, verifications);
if (communes != null) {
return communes;
}
LOGGER.info("Constitution du dataset des communes depuis pour le Code Officiel Géographique (COG) de l'année {}...", anneeCOG);
Dataset<Row> c = loadAndRenameCommmunesCSV(session, anneeCOG, false, verifications);
Dataset<Row> s = this.datasetSirenCommunaux.rowSirenCommunes(session, anneeCOG, TriSirenCommunaux.CODE_COMMUNE);
Column condition1 = c.col("codeCommune").equalTo(s.col("codeCommune"));
Column condition2 = c.col("codeCommuneParente").equalTo(s.col("codeCommune"));
verifications("jonction communes et siren par codeCommune", c, null, s, condition1, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES);
Dataset<Row> join1 = c.join(s, condition1)
.drop(s.col("codeCommune"))
.drop(s.col("nomCommune"))
.drop(s.col("codeRegion"))
.drop(s.col("codeDepartement"));
verifications("jonction communes et siren par codeCommune, join1", c, null, null, null, verifications);
verifications("jonction communes et siren par codeCommuneParente", c, null, s, condition2, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES);
Dataset<Row> join2 = c.join(s, condition2)
.drop(s.col("codeCommune"))
.drop(s.col("nomCommune"))
.drop(s.col("codeRegion"))
.drop(s.col("codeDepartement"));
verifications("jonction communes et siren par codeCommuneParente, join2", c, null, null, null, verifications);
communes = join1.union(join2);
// La strate communale doit concorder avec celle des comptes individuels des communes.
communes = communes.withColumn("strateCommune",
when(s.col("populationTotale").between(0, 249), lit(1)) // communes de moins de 250 hab
.when(s.col("populationTotale").between(250, 499), lit(2)) // communes de 250 à 500 hab
.when(s.col("populationTotale").between(500, 1999), lit(3)) // communes de 500 à 2 000 hab
.when(s.col("populationTotale").between(2000, 3499), lit(4)) // communes de 2 000 à 3 500 hab
.when(s.col("populationTotale").between(3500, 4999), lit(5)) // communes de 3 500 à 5 000 hab
.when(s.col("populationTotale").between(5000, 9999), lit(6)) // communes de 5 000 à 10 000 hab
.when(s.col("populationTotale").between(10000, 19999), lit(7)) // communes de 10 000 à 20 000 hab
.when(s.col("populationTotale").between(20000, 49999), lit(8)) // communes de 20 000 à 50 000 hab
.when(s.col("populationTotale").between(50000, 99999), lit(9)) // communes de 50 000 à 100 000 hab
.otherwise(lit(10))); // communes de plus de 100 000 hab
// Obtenir les contours des communes.
// "(requête SQL) contours" est la forme de substitution pour Spark. cf https://stackoverflow.com/questions/38376307/create-spark-dataframe-from-sql-query
String format = "(select insee as codecommuneosm, nom as nomcommuneosm, surf_ha as surface2, st_x(st_centroid(geom)) as longitude, st_y(st_centroid(geom)) as latitude from communes_{0,number,#0}) contours";
String sql = MessageFormat.format(format, anneeCOG);
Dataset<Row> contours = sql(session, sql).load();
contours = contours.withColumn("surface", col("surface2").cast(DoubleType)).drop(col("surface2"))
.orderBy("codecommuneosm");
Column conditionJoinContours = col("codeCommune").equalTo(col("codecommuneosm"));
verifications("jonction communes et contours communaux OSM (centroïde, surface)", communes, null, contours, conditionJoinContours, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES);
communes = communes.join(contours, conditionJoinContours, "left_outer")
.drop(col("codecommuneosm")).drop(col("nomcommuneosm"));
verifications("jonction communes et contours communaux OSM (centroïde, surface)", communes, null, null, null, verifications);
// Associer à chaque commune son code intercommunalité, si elle en a un (les communes-communautés peuvent ne pas en avoir).
Dataset<Row> perimetres = this.datasetPerimetres.rowPerimetres(session, anneeCOG, EPCIPerimetreDataset.TriPerimetresEPCI.CODE_COMMUNE_MEMBRE).selectExpr("sirenCommuneMembre", "sirenGroupement as codeEPCI", "nomGroupement as nomEPCI");
Column conditionJoinPerimetres = communes.col("sirenCommune").equalTo(perimetres.col("sirenCommuneMembre"));
verifications("jonction communes et périmètres", communes, null, perimetres, conditionJoinPerimetres, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES);
communes = communes.join(perimetres, conditionJoinPerimetres, "left");
// Y associer les départements.
communes = this.datasetDepartements.withDepartement(session, "codeDepartementRetabli", communes, "codeDepartement", null, true, anneeCOG)
.drop("codeRegionDepartement")
.drop("codeDepartementRetabli");
communes = communes.repartition(col("codeDepartement"))
.sortWithinPartitions(col("codeCommune"))
.persist(); // Important : améliore les performances.
saveToStore(communes, new String[] {"codeDepartement"}, "{0}_{1,number,#0}", nomStore, anneeCOG);
LOGGER.info("Le dataset des communes du Code Officiel Géographique de l'année {} est prêt et stocké.", anneeCOG);
return communes;
}
Sometimes, it's useful if I convert these rows to a Commune
object, because business objects, at least on server side, can have methods that bring them some kind of intelligence (limited to looking at themselves or to the objects of their package).
For example, the Commune
object has this method to help detecting it has the same name than another one when an article can be found in its name.
/**
* Déterminer si notre commune a le même nom que celle en paramètre.
* @param nomCandidat Nom de commune : il peut contenir une charnière.
* @return true si c'est le cas.
*/
public boolean hasMemeNom(String nomCandidat) {
// Si le nom soumis vaut null, répondre non.
if (nomCandidat == null) {
return false;
}
// Faire une comparaison directe de nom de commune tout d'abord, car l'emploi du collator est très coûteux.
if (nomCandidat.equalsIgnoreCase(this.nomCommune)) {
return true;
}
// Puis, rechercher avec les différentes charnières.
if (nomCandidat.equalsIgnoreCase(nomAvecType(false, PrefixageNomCommune.AUCUN))) {
return true;
}
if (nomCandidat.equalsIgnoreCase(nomAvecType(false, PrefixageNomCommune.A))) {
return true;
}
if (nomCandidat.equalsIgnoreCase(nomAvecType(false, PrefixageNomCommune.POUR))) {
return true;
}
// En cas d'échec, reprendre ces tests, mais avec le collator, plus lent, mais qui passera outre les caractères accentués.
if (collator.equals(nomCandidat, this.nomCommune)) {
return true;
}
if (collator.equals(nomCandidat, nomAvecType(false, PrefixageNomCommune.AUCUN))) {
return true;
}
if (collator.equals(nomCandidat, nomAvecType(false, PrefixageNomCommune.A))) {
return true;
}
if (collator.equals(nomCandidat, nomAvecType(false, PrefixageNomCommune.POUR))) {
return true;
}
return false;
}
To do that conversion from a Dataset<Row>
to a Dataset<Commune>
, I wrote this function, which works, but troubles me because it looks clumsy to me:
/**
* Obtenir un Dataset des communes (comme objets communes, incluant les données siren communaux).
* @param session Session Spark.
* @param anneeCOG Année du Code Officiel Géographique de référence.
* @return Dataset des communes.
* @throws TechniqueException si un incident survient.
*/
public Dataset<Commune> obtenirCommunes(SparkSession session, int anneeCOG) throws TechniqueException {
Dataset<Row> communes = rowCommunes(session, anneeCOG);
return communes
.select(communes.col("typeCommune"), communes.col("codeCommune"), communes.col("codeRegion"), communes.col("codeDepartement"),
communes.col("arrondissement"), communes.col("typeNomEtCharniere"), communes.col("nomMajuscules"), communes.col("nomCommune"),
communes.col("codeCanton"), communes.col("codeCommuneParente"), communes.col("sirenCommune"), communes.col("populationTotale").alias("population"),
communes.col("strateCommune"), communes.col("codeEPCI"), communes.col("surface"), communes.col("longitude"), communes.col("latitude"),
communes.col("nomDepartement"), communes.col("nomEPCI"))
.as(Encoders.bean(Commune.class))
.cache();
}
However, my main problem is:
The Commune
object (or rows) is almost never used (or returned) alone in most datasets, after that.
Often a Commune
will come with some employment data associated, or with financial information linked. Or sometimes, a calculation method can associate only one or two raw (primitive) data values to a city, if they make sense.
So, until today, what happened?
I was starting with a
Dataset<Row>
of let's say 20 columns.if I was doing some joins, calculations, enrichments, etc.: that dataset reached 40 or 50 columns sometimes,
- Let's say that a record was:
a1, a2...a10, b1, b2...b8, c1..c4, d1
of rows columns having primitive types.
- Let's say that a record was:
then I was extracting from that record, through the mean of
Encoders.bean(...)
method, the various business objects it was hiding, depending on my needs.- From a record, I could extract
A
froma1, a2...a10
orC
fromc1...c4
.
- From a record, I could extract
It worked (and it's working) but I'm not really proud of this manner of doing things.
I would enjoy more starting with a plain Dataset<Commune>
dataset, hiding completely the row phase to the users of my API, then being able to return to them, depending of their needs:
a dataset containing
Commune
andEmployment
business object together, orCommune
andAccounting
.a dataset containing
Commune
, but with also few row values of columns"d1"
,"h1"
and"k1"
(if some calculation was needed to provide a specific information for that city, for some exceptional purpose/case that don't lead to changing the whole business object description, but only at this time to return an extra column value aside that city description).
It means that I will encounter cases where I would like to return a dataset showing per "record":
A, B
(concrete objects coming together)
or
A, C
or even sometimes A,B,C
.
Or what I'm fearing the most:
A, C, d1
(two concrete objects, plus... a primitive value).
Can you give me some ideas about how treating such problems before I start great moves?
Warning: this problem isn't a simple one. It could even not have a clear solution.
Starting from row records made of primitive types
a1, a2...a10, b1, b2...b8, c1..c4, d1
, I am able to extractA
,B
,C
objects from them, if I agree to do up to n additional transformations for n different objects types I can find in my attributes.But I wonder about approaching the most I can the situation where, given the
+
sign would mean: join, I had a dataset :
Dataset<A+B+C>
and even:Dataset<A+B+C+d1>
, whered1
would still be a primitive type not in an object.If this could be achieved, it could cause troubles.
What is aDataset<A+B+C+d1>.foreach
?
AA
,B
orC
object?
How would you manage it after that?
I'm living in a magma of attributes yet, and I wonder if I can improve my datasets to new ones using objects. Objects that are tied together to describe a single record.
This question could have the solution of creating an E
object having for members A
, B
or C
objects and the d1
primitive attribute, and returning a Dataset<E>
.
But this I want to avoid the most I can, at this time. And try to find what I can do else, first.