4
temp = Window.partitionBy("id").orderBy("time").rowsBetween(-5, 5)
spark_df.withColumn("movingAvg",fn.avgspark_df("average")).over(temp)).show()

I'm getting this error in the last line .

dataframe object is not callable

Florian
  • 24,425
  • 4
  • 49
  • 80
xinlin li
  • 43
  • 1
  • 1
  • 5

2 Answers2

2

You are missing a bracket, but it also seems some of the syntax is wrong. I assume this is what your code was before the bracket got missing:

fn.avgspark_df("average")

Which is why you get the error; you are trying to call the DataFrame as a function. I believe you can achieve what you want with:

import pyspark.sql.functions as fn
from pyspark.sql import Window

df = pd.DataFrame({'id': [0,0,0,0,0,1,1,1,1,1],
                   'time': [1,2,3,4,5,1,2,3,4,5],
                   'average':[0,1,2,3,4,5,6,7,8,9] })
df = sqlContext.createDataFrame(df)

temp = Window.partitionBy("id").orderBy("time").rowsBetween(-1, 1)
df.withColumn("movingAvg",fn.avg("average").over(temp)).show()
Florian
  • 24,425
  • 4
  • 49
  • 80
  • Thanks for my help! I can run your code successfully,but I can't do it in my code.Maybe because I use :spark= SparkSession.builder.appName(" ").getOrCreate() file = "D:\project\HistoryData.csv" lines = pd.read_csv(file) cc=lines.values.tolist() spark_df=spark.createDataFrame(cc,['time','average','max','min']) this to create my dataframe – xinlin li Jul 13 '18 at 01:12
  • It tell me pyspark.sql.utils.AnalysisException. – xinlin li Jul 13 '18 at 01:17
  • @xinlinli Does that happen before or after the calculation of moving average?See e.g. here https://stackoverflow.com/questions/41785342/how-to-create-a-table-as-select-in-pyspark-sql or https://stackoverflow.com/questions/42091575/pyspark-load-file-path-does-not-exist or https://stackoverflow.com/questions/43100458/pyspark-sql-utils-analysisexception-upath-does-not-exist or https://stackoverflow.com/questions/39016440/analysisexception-ucannot-resolve-name-given-input-columns-list-in-sqlco – Florian Jul 13 '18 at 06:19
  • Now I succeeded in solving the problem,but I meet a new strange problem https://stackoverflow.com/questions/51316635/pysparks-window-functions-fn-avg-only-output-same-data – xinlin li Jul 13 '18 at 06:28
  • @xinlinli Please consider [accepting this answer](https://meta.stackexchange.com/questions/5234/how-does-accepting-an-answer-work) if it helped you solve your issue. I will take a look at your new question. – Florian Jul 13 '18 at 06:33
  • OK,I will,Thank you! – xinlin li Jul 13 '18 at 06:36
1
 from pyspark.sql import SparkSession
 from pyspark.sql import Window
 from pyspark.sql.functions import max,min,avg
 spark = SparkSession.builder.appName("Data Frame Example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

 l=[("Alice", "2016-05-01", 50.00),
                                ("Alice", "2016-05-03", 45.00),
                                ("Alice", "2016-05-04", 55.00),
                                ("Bob", "2016-05-01", 25.00),
                                ("Bob", "2016-05-04", 29.00),
                                ("Bob", "2016-05-06", 27.00)]

customers = spark.sparkContext.parallelize(l).toDF(["name", "date", "amountSpent"])

temp = Window.partitionBy("name").orderBy("date")

customers.withColumn( "movingAvg",avg("amountSpent").over(temp)).show()
abiratsis
  • 7,051
  • 3
  • 28
  • 46