0

I recently try to do a task which handles the dataset below(please save it as dataset.log):

2015-07-22T09:00:28.019143Z marketpalce-shop 123.242.248.130:54635 10.0.6.158:80 0.000022 0.026109 0.00002 200 200 0 699 "GET https://paytm.com:443/shop/authresponse?code=f2405b05-e2ee-4b0d-8f6a-9fed0fcfe2e0&state=null HTTP/1.1" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.130 Safari/537.36" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2
2015-07-22T09:00:27.894580Z marketpalce-shop 203.91.211.44:51402 10.0.4.150:80 0.000024 0.15334 0.000026 200 200 0 1497 "GET https://paytm.com:443/shop/wallet/txnhistory?page_size=10&page_number=0&channel=web&version=2 HTTP/1.1" "Mozilla/5.0 (Windows NT 6.1; rv:39.0) Gecko/20100101 Firefox/39.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2
2015-07-22T09:00:27.885745Z marketpalce-shop 1.39.32.179:56419 10.0.4.244:80 0.000024 0.164958 0.000017 200 200 0 157 "GET https://paytm.com:443/shop/wallet/txnhistory?page_size=10&page_number=0&channel=web&version=2 HTTP/1.1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.134 Safari/537.36" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2
2015-07-22T09:00:28.048369Z marketpalce-shop 180.179.213.94:48725 10.0.6.108:80 0.00002 0.002333 0.000021 200 200 0 35734 "GET https://paytm.com:443/shop/p/micromax-yu-yureka-moonstone-grey-MOBMICROMAX-YU-DUMM141CD60AF7C_34315 HTTP/1.0" "-" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2

I wish to load it into pyspark and then print the dataframe in spark.

Below is my code written in pyspark:

from pyspark.sql.types import *
from pyspark.sql import SparkSession

from pyspark.sql.functions import udf

if __name__ == "__main__":
    sparkSession = SparkSession.builder.master('local') \
        .appName('data') \
        .getOrCreate()

    sparkSession.sparkContext.setLogLevel('WARN')

    schema = StructType([
        StructField('create_time', TimestampType()),
        StructField('elb', StringType()),
        StructField('client_host_port', StringType()),
        StructField('backend_host_port', StringType()),
        StructField('request_processing_time', DoubleType()),
        StructField('backend_processing_time', DoubleType()),
        StructField('response_processing_time', DoubleType()),
        StructField('elb_status_code', IntegerType()),
        StructField('backend_status_code', IntegerType()),
        StructField('received_bytes', IntegerType()),
        StructField('sent_bytes', IntegerType()),
        StructField('request', StringType()),
        StructField('user_agent', StringType()),
        StructField('ssl_cipher', StringType()),
        StructField('ssl_protocol', StringType())
    ])

    fileStreamDF = sparkSession.readStream \
        .option("delimiter", " ") \
        .option("quote", "\"") \
        .option("header", False) \
        .schema(schema) \
        .csv('dataset.log')
    # .option("charset", StandardCharsets.UTF_8.name())\

    trimmedDF = fileStreamDF.select('create_time', 'client_host_port', 'request')

    query = trimmedDF.writeStream \
        .outputMode('append') \
        .format('console') \
        .option('truncate', 'false') \
        .option('numRows', 30) \
        .start() \
        .awaitTermination()

I dont know why I received errors such as below:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-3-2ebe0db96473> in <module>
     46         .format('console') \
     47         .option('truncate', 'false') \
---> 48         .option('numRows', 30) \
     49         .start() \
     50         .awaitTermination()

c:\users\public\pycharmprojects\project\venv\lib\site-packages\pyspark\sql\streaming.py in start(self, path, format, outputMode, partitionBy, queryName, **options)
   1209             self.queryName(queryName)
   1210         if path is None:
-> 1211             return self._sq(self._jwrite.start())
   1212         else:
   1213             return self._sq(self._jwrite.start(path))

c:\users\public\pycharmprojects\project\venv\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

c:\users\public\pycharmprojects\project\venv\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
    126     def deco(*a, **kw):
    127         try:
--> 128             return f(*a, **kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)

c:\users\public\pycharmprojects\project\venv\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(




Py4JJavaError: An error occurred while calling o104.start.
: java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\public\AppData\Local\Temp\temporary-649f0f0f-d13e-4793-a497-a64531b2148b\.metadata.8d3a01dc-32b8-4659-8191-572af16df617.tmp
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
    at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
    at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1017)
    at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:100)
    at org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:353)
    at org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400)
    at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:596)
    at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:686)
    at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:682)
    at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
    at org.apache.hadoop.fs.FileContext.create(FileContext.java:688)
    at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:310)
    at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:133)
    at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:136)
    at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:316)
    at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:78)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$streamMetadata$1(StreamExecution.scala:177)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:175)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:49)
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:317)
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:359)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)

I tried to refer to many other solutions but non worked. So please can anyone assist me in this.

seoung jil
  • 35
  • 1
  • 7
  • Similar question [Failed to locate the winutils binary in the hadoop binary path](https://stackoverflow.com/questions/19620642/failed-to-locate-the-winutils-binary-in-the-hadoop-binary-path) does it help? – user2314737 Oct 05 '20 at 07:21
  • Hi sir, thank you but that is not a similar question. – seoung jil Oct 05 '20 at 08:33
  • I tried this on jupyter notebook and then I received a different error which is more clear i think. – seoung jil Oct 05 '20 at 08:38
  • Have you tried [this](https://stackoverflow.com/a/48012285/3364069)? Looks very similar to your problem. – busfighter Oct 05 '20 at 10:58

0 Answers0