I have been working with Google Analytics
data I have got in s3. I am loading the file as follows -
df = sc.textFile('s3n://BUCKET_NAME/2017/1/2/')
After this, I get an RDD
. But if we want to see the schema I have loaded the data into spark SQL
and the data schema is like this -
root
|-- channelGrouping: string (nullable = true)
|-- customDimensions: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- index: string (nullable = true)
| | |-- value: string (nullable = true)
|-- date: string (nullable = true)
|-- device: struct (nullable = true)
| |-- browser: string (nullable = true)
| |-- browserSize: string (nullable = true)
| |-- browserVersion: string (nullable = true)
| |-- deviceCategory: string (nullable = true)
| |-- flashVersion: string (nullable = true)
| |-- isMobile: boolean (nullable = true)
| |-- javaEnabled: boolean (nullable = true)
| |-- language: string (nullable = true)
| |-- mobileDeviceBranding: string (nullable = true)
| |-- mobileDeviceInfo: string (nullable = true)
| |-- mobileDeviceMarketingName: string (nullable = true)
| |-- mobileDeviceModel: string (nullable = true)
| |-- mobileInputSelector: string (nullable = true)
| |-- operatingSystem: string (nullable = true)
| |-- operatingSystemVersion: string (nullable = true)
| |-- screenColors: string (nullable = true)
| |-- screenResolution: string (nullable = true)
|-- fullVisitorId: string (nullable = true)
|-- geoNetwork: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- cityId: string (nullable = true)
| |-- continent: string (nullable = true)
| |-- country: string (nullable = true)
| |-- latitude: string (nullable = true)
| |-- longitude: string (nullable = true)
| |-- metro: string (nullable = true)
| |-- networkDomain: string (nullable = true)
| |-- networkLocation: string (nullable = true)
| |-- region: string (nullable = true)
| |-- subContinent: string (nullable = true)
what I tried -
def remove_null_device(val):
_ori = json.loads(val)
# _ori = val
_dic_val = _ori['device']
for key, _value in _dic_val.items():
if _value == "null":
_dic_val[key] = "Hello There I am Testing this"
_ori["device"] = _dic_val
return _ori
device_data = df_rdd.map(remove_null_device)
Problem Statement - I want to iterate over every record as this is the nested structure I am thinking to pass one main key at a time like we have device, geoNetwork and check if the values are empty or not or null.
But this seems to change the structure of the whole record and the items are not getting updated don't know why. Please suggest any better approach for same. Thanks!
Ok I want to check for all the fields in device if they are empty or null or (not set) and then updated those values and return the row I have updated and schema should remain intact.