I need to read csv files as stream and then convert that to pandas dataframe
.
Here is what I have done so far
DataShema = StructType([ StructField("TimeStamp", LongType(), True), \
StructField("Count", IntegerType(), True), \
StructField("Reading", FloatType(), True) ])
group_columns = ['TimeStamp','Count','Reading']
@pandas_udf(DataShema, PandasUDFType.GROUPED_MAP)
def get_pdf(pdf):
return pd.DataFrame([pdf[group_columns]],columns=[group_columns])
# getting Surge data from the files
SrgDF = spark \
.readStream \
.schema(DataShema) \
.csv("ProcessdedData/SurgeAcc")
mydf = SrgDF.groupby(group_columns).apply(get_pdf)
qrySrg = SrgDF \
.writeStream \
.format("console") \
.start() \
.awaitTermination()
I believe from another source (Convert Spark Structure Streaming DataFrames to Pandas DataFrame) that converting structured streaming dataframe to pandas is not directly possible and it seems that pandas_udf is the right approach but cannot figure out exactly how to achieve this. I need the pandas dataframe to pass into my functions.
Edit
when I run the code (changing the query to mydf
rather than SrgDF
) then I get the following error: pyspark.sql.utils.StreamingQueryException: 'Writing job aborted.\n=== Streaming Query ===\nIdentifier: [id = 18a15e9e-9762-4464-b6d1-cb2db8d0ac41, runId = e3da131e-00d1-4fed-82fc-65bf377c3f99]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {FileStreamSource[file:/home/mls5/Work_Research/Codes/Misc/Python/MachineLearning_ArtificialIntelligence/00_Examples/01_ApacheSpark/01_ComfortApp/ProcessdedData/SurgeAcc]: {"logOffset":0}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nFlatMapGroupsInPandas [Count#1], get_pdf(TimeStamp#0L, Count#1, Reading#2), [TimeStamp#10L, Count#11, Reading#12]\n+- Project [Count#1, TimeStamp#0L, Count#1, Reading#2]\n +- StreamingExecutionRelation FileStreamSource[file:/home/mls5/Work_Research/Codes/Misc/Python/MachineLearning_ArtificialIntelligence/00_Examples/01_ApacheSpark/01_ComfortApp/ProcessdedData/SurgeAcc], [TimeStamp#0L, Count#1, Reading#2]\n'
19/05/20 18:32:29 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
/usr/local/lib/python3.6/dist-packages/pyarrow/__init__.py:152: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
warnings.warn("pyarrow.open_stream is deprecated, please use "
.
EDIT-2
Here is the code to reproduce the error
import sys
from pyspark import SparkContext
from pyspark.sql import Row, SparkSession, SQLContext
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.streaming import StreamingContext
from pyspark.sql.types import *
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyarrow as pa
import glob
#####################################################################################
if __name__ == '__main__' :
spark = SparkSession \
.builder \
.appName("RealTimeIMUAnalysis") \
.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
# reduce verbosity
sc = spark.sparkContext
sc.setLogLevel("ERROR")
##############################################################################
# using the saved files to do the Analysis
DataShema = StructType([ StructField("TimeStamp", LongType(), True), \
StructField("Count", IntegerType(), True), \
StructField("Reading", FloatType(), True) ])
group_columns = ['TimeStamp','Count','Reading']
@pandas_udf(DataShema, PandasUDFType.GROUPED_MAP)
def get_pdf(pdf):
return pd.DataFrame([pdf[group_columns]],columns=[group_columns])
# getting Surge data from the files
SrgDF = spark \
.readStream \
.schema(DataShema) \
.csv("SurgeAcc")
mydf = SrgDF.groupby('Count').apply(get_pdf)
#print(mydf)
qrySrg = mydf \
.writeStream \
.format("console") \
.start() \
.awaitTermination()
To run, you need to create a folder named SurgeAcc
where the code is and create a csv file inside with the following format:
TimeStamp,Count,Reading
1557011317299,45148,-0.015494
1557011317299,45153,-0.015963
1557011319511,45201,-0.015494
1557011319511,45221,-0.015494
1557011315134,45092,-0.015494
1557011315135,45107,-0.014085
1557011317299,45158,-0.015963
1557011317299,45163,-0.015494
1557011317299,45168,-0.015024`