-1

I need to read csv files as stream and then convert that to pandas dataframe.

Here is what I have done so far


    DataShema = StructType([ StructField("TimeStamp", LongType(), True), \
    StructField("Count", IntegerType(), True), \
    StructField("Reading", FloatType(), True) ])

    group_columns = ['TimeStamp','Count','Reading']

    @pandas_udf(DataShema, PandasUDFType.GROUPED_MAP)
    def get_pdf(pdf):
        return pd.DataFrame([pdf[group_columns]],columns=[group_columns])

    # getting Surge data from the files
    SrgDF = spark \
        .readStream \
        .schema(DataShema) \
        .csv("ProcessdedData/SurgeAcc")

    mydf = SrgDF.groupby(group_columns).apply(get_pdf)

    qrySrg = SrgDF \
        .writeStream \
        .format("console") \
        .start() \
        .awaitTermination()

I believe from another source (Convert Spark Structure Streaming DataFrames to Pandas DataFrame) that converting structured streaming dataframe to pandas is not directly possible and it seems that pandas_udf is the right approach but cannot figure out exactly how to achieve this. I need the pandas dataframe to pass into my functions.

Edit

when I run the code (changing the query to mydf rather than SrgDF) then I get the following error: pyspark.sql.utils.StreamingQueryException: 'Writing job aborted.\n=== Streaming Query ===\nIdentifier: [id = 18a15e9e-9762-4464-b6d1-cb2db8d0ac41, runId = e3da131e-00d1-4fed-82fc-65bf377c3f99]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {FileStreamSource[file:/home/mls5/Work_Research/Codes/Misc/Python/MachineLearning_ArtificialIntelligence/00_Examples/01_ApacheSpark/01_ComfortApp/ProcessdedData/SurgeAcc]: {"logOffset":0}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nFlatMapGroupsInPandas [Count#1], get_pdf(TimeStamp#0L, Count#1, Reading#2), [TimeStamp#10L, Count#11, Reading#12]\n+- Project [Count#1, TimeStamp#0L, Count#1, Reading#2]\n +- StreamingExecutionRelation FileStreamSource[file:/home/mls5/Work_Research/Codes/Misc/Python/MachineLearning_ArtificialIntelligence/00_Examples/01_ApacheSpark/01_ComfortApp/ProcessdedData/SurgeAcc], [TimeStamp#0L, Count#1, Reading#2]\n' 19/05/20 18:32:29 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver /usr/local/lib/python3.6/dist-packages/pyarrow/__init__.py:152: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream warnings.warn("pyarrow.open_stream is deprecated, please use ".

EDIT-2

Here is the code to reproduce the error

import sys

from pyspark import SparkContext
from pyspark.sql import Row, SparkSession, SQLContext
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

from pyspark.streaming import StreamingContext

from pyspark.sql.types import *

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyarrow as pa

import glob

#####################################################################################

if __name__ == '__main__' :

    spark = SparkSession \
        .builder \
        .appName("RealTimeIMUAnalysis") \
        .getOrCreate() 

    spark.conf.set("spark.sql.execution.arrow.enabled", "true")

    # reduce verbosity
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")

    ##############################################################################

    # using the saved files to do the Analysis
    DataShema = StructType([ StructField("TimeStamp", LongType(), True), \
    StructField("Count", IntegerType(), True), \
    StructField("Reading", FloatType(), True) ])

    group_columns = ['TimeStamp','Count','Reading']

    @pandas_udf(DataShema, PandasUDFType.GROUPED_MAP)
    def get_pdf(pdf):
        return pd.DataFrame([pdf[group_columns]],columns=[group_columns])

    # getting Surge data from the files
    SrgDF = spark \
        .readStream \
        .schema(DataShema) \
        .csv("SurgeAcc")

    mydf = SrgDF.groupby('Count').apply(get_pdf)
    #print(mydf)

    qrySrg = mydf \
        .writeStream \
        .format("console") \
        .start() \
        .awaitTermination()

To run, you need to create a folder named SurgeAcc where the code is and create a csv file inside with the following format:

TimeStamp,Count,Reading
1557011317299,45148,-0.015494
1557011317299,45153,-0.015963
1557011319511,45201,-0.015494
1557011319511,45221,-0.015494
1557011315134,45092,-0.015494
1557011315135,45107,-0.014085
1557011317299,45158,-0.015963
1557011317299,45163,-0.015494
1557011317299,45168,-0.015024`

  • If you are using pandas_udf (for GROUPED_MAP), the input to the function will be pandas dataframe and output is also pandas dataframe . I don't see any issue here. Are you facing any issues? – Ranga Vure May 20 '19 at 09:52
  • The error message is too long, so I add as an Edit to my message. – Ashkan Rafiee May 20 '19 at 10:36
  • Please update with sample data also, that helps in troubleshooting for others. It could be data issue also. – Ranga Vure May 20 '19 at 10:45
  • @RangaVure I have just edited my question with the code and data that would reproduce the error. I would really appreciate your feedback. Just to add further, the reason I am using Structured Streaming (rather than static read) is that the csv file will constantly get updated by another stream – Ashkan Rafiee May 20 '19 at 11:27

1 Answers1

0

Your return pandas_udf dataframe is not matching with the schema specified.

Please note that input to the pandas_udf will be pandas dataframe and also returns pandas dataframe.

You can use all pandas functions inside the pandas_udf. Only thing you have to make sure is the ReturnDataShema should match with actual output of the function.

ReturnDataShema = StructType([StructField("TimeStamp", LongType(), True), \
                            StructField("Count", IntegerType(), True), \
                            StructField("Reading", FloatType(), True), \
                            StructField("TotalCount", FloatType(), True)])

@pandas_udf(ReturnDataShema, PandasUDFType.GROUPED_MAP)
    def get_pdf(pdf):
        # This following stmt is causing schema mismatch
        # return pd.DataFrame([pdf[group_columns]],columns=[group_columns])
        # If you want to return all the rows of pandas dataframe
        # you can simply
        # return pdf
        # If you want to do any aggregations, you can do like the below, or use pandas query
        # but make sure the return pandas dataframe complies with ReturnDataShema
        total_count = pdf['Count'].sum()
        return pd.DataFrame([(pdf.TimeStamp[0],pdf.Count[0],pdf.Reading[0],total_count)])
Ranga Vure
  • 1,922
  • 3
  • 16
  • 23