I recently try to do a task which handles the dataset below(please save it as dataset.log):
2015-07-22T09:00:28.019143Z marketpalce-shop 123.242.248.130:54635 10.0.6.158:80 0.000022 0.026109 0.00002 200 200 0 699 "GET https://paytm.com:443/shop/authresponse?code=f2405b05-e2ee-4b0d-8f6a-9fed0fcfe2e0&state=null HTTP/1.1" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.130 Safari/537.36" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2
2015-07-22T09:00:27.894580Z marketpalce-shop 203.91.211.44:51402 10.0.4.150:80 0.000024 0.15334 0.000026 200 200 0 1497 "GET https://paytm.com:443/shop/wallet/txnhistory?page_size=10&page_number=0&channel=web&version=2 HTTP/1.1" "Mozilla/5.0 (Windows NT 6.1; rv:39.0) Gecko/20100101 Firefox/39.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2
2015-07-22T09:00:27.885745Z marketpalce-shop 1.39.32.179:56419 10.0.4.244:80 0.000024 0.164958 0.000017 200 200 0 157 "GET https://paytm.com:443/shop/wallet/txnhistory?page_size=10&page_number=0&channel=web&version=2 HTTP/1.1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.134 Safari/537.36" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2
2015-07-22T09:00:28.048369Z marketpalce-shop 180.179.213.94:48725 10.0.6.108:80 0.00002 0.002333 0.000021 200 200 0 35734 "GET https://paytm.com:443/shop/p/micromax-yu-yureka-moonstone-grey-MOBMICROMAX-YU-DUMM141CD60AF7C_34315 HTTP/1.0" "-" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2
I wish to load it into pyspark and then print the dataframe in spark.
Below is my code written in pyspark:
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
if __name__ == "__main__":
sparkSession = SparkSession.builder.master('local') \
.appName('data') \
.getOrCreate()
sparkSession.sparkContext.setLogLevel('WARN')
schema = StructType([
StructField('create_time', TimestampType()),
StructField('elb', StringType()),
StructField('client_host_port', StringType()),
StructField('backend_host_port', StringType()),
StructField('request_processing_time', DoubleType()),
StructField('backend_processing_time', DoubleType()),
StructField('response_processing_time', DoubleType()),
StructField('elb_status_code', IntegerType()),
StructField('backend_status_code', IntegerType()),
StructField('received_bytes', IntegerType()),
StructField('sent_bytes', IntegerType()),
StructField('request', StringType()),
StructField('user_agent', StringType()),
StructField('ssl_cipher', StringType()),
StructField('ssl_protocol', StringType())
])
fileStreamDF = sparkSession.readStream \
.option("delimiter", " ") \
.option("quote", "\"") \
.option("header", False) \
.schema(schema) \
.csv('dataset.log')
# .option("charset", StandardCharsets.UTF_8.name())\
trimmedDF = fileStreamDF.select('create_time', 'client_host_port', 'request')
query = trimmedDF.writeStream \
.outputMode('append') \
.format('console') \
.option('truncate', 'false') \
.option('numRows', 30) \
.start() \
.awaitTermination()
I dont know why I received errors such as below:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-3-2ebe0db96473> in <module>
46 .format('console') \
47 .option('truncate', 'false') \
---> 48 .option('numRows', 30) \
49 .start() \
50 .awaitTermination()
c:\users\public\pycharmprojects\project\venv\lib\site-packages\pyspark\sql\streaming.py in start(self, path, format, outputMode, partitionBy, queryName, **options)
1209 self.queryName(queryName)
1210 if path is None:
-> 1211 return self._sq(self._jwrite.start())
1212 else:
1213 return self._sq(self._jwrite.start(path))
c:\users\public\pycharmprojects\project\venv\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
c:\users\public\pycharmprojects\project\venv\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
126 def deco(*a, **kw):
127 try:
--> 128 return f(*a, **kw)
129 except py4j.protocol.Py4JJavaError as e:
130 converted = convert_exception(e.java_exception)
c:\users\public\pycharmprojects\project\venv\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o104.start.
: java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\public\AppData\Local\Temp\temporary-649f0f0f-d13e-4793-a497-a64531b2148b\.metadata.8d3a01dc-32b8-4659-8191-572af16df617.tmp
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1017)
at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:100)
at org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:353)
at org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400)
at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:596)
at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:686)
at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:682)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.create(FileContext.java:688)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:310)
at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:133)
at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:136)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:316)
at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:78)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$streamMetadata$1(StreamExecution.scala:177)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:175)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:49)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:317)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:359)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
I tried to refer to many other solutions but non worked. So please can anyone assist me in this.