0

I have tried to stream twitter data using Apache Spark and I want to save streamed data as csv file but I couldn't how can I fix my code to get it in csv

I use RDD.

this is my main code:

val ssc = new StreamingContext(conf, Seconds(3600))
val stream = TwitterUtils.createStream(ssc, None, filters)

val tweets = stream.map(t => {
  Map(
    // This is for tweet
    "text" -> t.getText,
    "retweet_count" -> t.getRetweetCount,
    "favorited" -> t.isFavorited,
    "truncated" -> t.isTruncated,
    "id_str" -> t.getId,
    "in_reply_to_screen_name" -> t.getInReplyToScreenName,
    "source" -> t.getSource,
    "retweeted" -> t.isRetweetedByMe,
    "created_at" -> t.getCreatedAt,
    "in_reply_to_status_id_str" -> t.getInReplyToStatusId,
    "in_reply_to_user_id_str" -> t.getInReplyToUserId,

    // This is for tweet's user
    "listed_count" -> t.getUser.getListedCount,
    "verified" -> t.getUser.isVerified,
    "location" -> t.getUser.getLocation,
    "user_id_str" -> t.getUser.getId,
    "description" -> t.getUser.getDescription,
    "geo_enabled" -> t.getUser.isGeoEnabled,
    "user_created_at" -> t.getUser.getCreatedAt,
    "statuses_count" -> t.getUser.getStatusesCount,
    "followers_count" -> t.getUser.getFollowersCount,
    "favorites_count" -> t.getUser.getFavouritesCount,
    "protected" -> t.getUser.isProtected,
    "user_url" -> t.getUser.getURL,
    "name" -> t.getUser.getName,
    "time_zone" -> t.getUser.getTimeZone,
    "user_lang" -> t.getUser.getLang,
    "utc_offset" -> t.getUser.getUtcOffset,
    "friends_count" -> t.getUser.getFriendsCount,
    "screen_name" -> t.getUser.getScreenName
  )
})

tweets.repartition(1).saveAsTextFiles("~/streaming/tweets")

1 Answers1

2

You need to convert the tweets which is RDD[Map[String, String]] to a dataframe to save as CSV. The reason is simple RDD doesn't have a schema. Whereas csv format has a specific schema. So you have to convert the RDD to dataframe which has a schema.

There are several ways of doing that. One approach could be using a case class instead of putting the data into maps.

 case class(text:String, retweetCount:Int ...)

Now instead of Map(...) you instantiate the case class with proper parameters.

Finally convert tweets to dataframe using spark implicit conversion

import spark.implicits._
tweets.toDF.write.csv(...) // saves as CSV

Alternatively you can convert the Map to a dataframe using the solution given here

Avishek Bhattacharya
  • 6,534
  • 3
  • 34
  • 53