2

I'll try to simplify the problem I am trying to solve. I have an employee data stream which is being read from a JSON file and has the following schema:

StructType([ \
  StructField("timeStamp", TimestampType()),\
  StructField("emp_id", LongType()),\
  StructField("on_duty", LongType()) ])
# on_duty is an int boolean-> 0,1

Sample:

{"timeStamp": 1514765160, "emp_id": 12471979, "on_duty": 0}
{"timeStamp": 1514765161, "emp_id": 12472154, "on_duty": 1}

I would like to find out 2 things every minute, the total number of employees online and those NOT on duty and process it using structured spark streaming

This is per minute wrt. the timestamp, not the system time.

Kafka Prod

_producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: 
                         json.dumps(x).encode('utf-8'))
    
    # schedule.every(1).minutes.do(_producer.send(topic_name, value=( json.loads(json.dumps(dataDict))) ) )

    with open(filepath, 'r', encoding="utf16") as f: 

        for item in json_lines.reader(f):
            dataDict.update({'timeStamp':item['timestamp'],
                    'emp_id':item['emp_id'],
                    'on_duty':item['on_duty']})
            _producer.send(topic_name, value=( json.loads(json.dumps(dataDict))) )
            sleep(1)


# ^ Threading doesn't work BTW

Spark streaming

emp_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "emp_dstream") \
  .option("startingOffsets", "latest") \
  .load() \
  .selectExpr("CAST(value AS STRING)") 

emp_data = emp_stream.select([
  get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
  for c in ["timeStamp", "emp_id", "on_duty"]])

# this query is a filler attempt which is not the end goal of the task 
query = emp_data.groupBy(["on_duty"]).count()

emp_data.writeStream \
  .outputMode("append") \
  .format("console") \
  .start() \
  .awaitTermination()

I am confused how to proceed. Do I make changes in the kafka producer or while processing the stream with spark? And how should I do that?

Would be grateful for any hints or help!


Update Acc to @Srinivas solution

....----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+
|[1970-01-18 04:46:00, 1970-01-18 04:47:00]|1970-01-18 04:46:05|1070         |[1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,....

-------------------------------------------
Batch: 40
-------------------------------------------
+------------------------------------------+-------------------+--------------+-----------------+
|window                                    |timestamp          |Online_emp|Available_emp|
+------------------------------------------+-------------------+--------------+-----------------+
|[2017-12-31 16:04:00, 2017-12-31 16:05:00]|2017-12-31 16:04:53|20            |12               |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:44|20            |0                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:47|4             |0                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:27|20            |4                |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:10|4             |0                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:25|4             |0                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:42|12            |4                |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:20|4             |0                |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:49|4             |0                |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:44|12            |8                |
|[2017-12-31 16:02:00, 2017-12-31 16:03:00]|2017-12-31 16:02:19|8             |4                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:15|8             |0                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:08|12            |4                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:50|8             |0                |
|[2017-12-31 16:04:00, 2017-12-31 16:05:00]|2017-12-31 16:04:27|16            |0                |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:38|5             |0                |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:13|4             |4                |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:36|8             |4                |
|[2017-12-31 16:04:00, 2017-12-31 16:05:00]|2017-12-31 16:04:59|24            |4                |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:40|10            |0                |
+------------------------------------------+-------------------+--------------+-----------------+
only showing top 20 rows

-------------------------------------------
Batch: 41
-------------------------------------------
+------------------------------------------+-------------------+--------------+-----------------+
|window                                    |timestamp          |Online_emp|Available_emp|
+------------------------------------------+-------------------+--------------+-----------------+
|[2017-12-31 16:04:00, 2017-12-31 16:05:00]|2017-12-31 16:04:53|20            |12               |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:44|20            |0                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:47|4             |0                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:27|20            |4                |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:10|4             |0                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:25|4             |0                |

Update 2

How to get output like this:

Time    Online_Emp  Available_Emp
2019-01-01 00:00:00 52  23
2019-01-01 00:01:00 58  19
2019-01-01 00:02:00 65  28
jokol
  • 383
  • 2
  • 12
  • 1
    I am working on a similar problem myself. The documentation has got me a little confused.looking forward to finding out the solution – No Holidays Jun 20 '20 at 13:53
  • 1
    You mean am I sure? yes these are timestamp values. Should I change the longtype in the schema to timestamp – jokol Jun 21 '20 at 00:45
  • 1
    Now i understood, I am using timestamp in millies, but your timestamp is not in millies. you have to change ```.withColumn("timestamp",F.from_unixtime(F.col("timestamp") / 1000))``` with this ```.withColumn("timestamp",F.from_unixtime(F.col("timestamp")))``` – Srinivas Jun 21 '20 at 00:51
  • 1
    yes it works but all the batches show the same info, repeatedly. It doesn't update with the kafka producer – jokol Jun 21 '20 at 01:02
  • 1
    i just did. Am i producing the kafka strea incorrectly by sendingit line by line and using sleep(1) – jokol Jun 21 '20 at 01:06
  • so if i have a large json file (~10GB) , do I use a different way to produce the kafka stream? – jokol Jun 21 '20 at 01:09
  • give me a little time to get back to you, the vm is acting a little slow – jokol Jun 21 '20 at 01:12
  • but wouldn't that output the whole file at once – jokol Jun 21 '20 at 01:16
  • yes because i wanted it to reflect real world stream – jokol Jun 21 '20 at 01:21
  • ok, may be you can test your logic for small stream of data and then apply same logic on big real world stream. – Srinivas Jun 21 '20 at 01:23
  • sorry for the delay, I have a simple query, when we process this data on spark , are we processing ALL the data that is on the console or batching it as it arrives? Cause maybe it processes all the data on it and that's why the same results everytime – jokol Jun 21 '20 at 01:36
  • It will process all data that is stored in kafka. – Srinivas Jun 21 '20 at 02:30

1 Answers1

3

Use window function.

Sample data in Kafka

{"timeStamp": 1592669811475, "emp_id": 12471979, "on_duty": 0}
{"timeStamp": 1592669811475, "emp_id": 12472154, "on_duty": 1}
{"timeStamp": 1592669811475, "emp_id": 12471980, "on_duty": 0}
{"timeStamp": 1592669811475, "emp_id": 12472181, "on_duty": 1}
{"timeStamp": 1592669691475, "emp_id": 12471982, "on_duty": 0}
{"timeStamp": 1592669691475, "emp_id": 12472183, "on_duty": 1}
{"timeStamp": 1592669691475, "emp_id": 12471984, "on_duty": 0}
{"timeStamp": 1592669571475, "emp_id": 12472185, "on_duty": 1}
{"timeStamp": 1592669571475, "emp_id": 12472186, "on_duty": 1}
{"timeStamp": 1592669571475, "emp_id": 12472187, "on_duty": 0}
{"timeStamp": 1592669571475, "emp_id": 12472188, "on_duty": 1}
{"timeStamp": 1592669631475, "emp_id": 12472185, "on_duty": 1}
{"timeStamp": 1592669631475, "emp_id": 12472186, "on_duty": 1}
{"timeStamp": 1592669631475, "emp_id": 12472187, "on_duty": 0}
{"timeStamp": 1592669631475, "emp_id": 12472188, "on_duty": 1}
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, StructField, StructType, LongType, TimestampType

schema = StructType([ \
    StructField("timeStamp", LongType()), \
    StructField("emp_id", LongType()), \
    StructField("on_duty", LongType())])

df = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe","emp_dstream")\
    .option("startingOffsets", "earliest")\
    .load()\
    .selectExpr("CAST(value AS STRING)")\
    .select(F.from_json(F.col("value"), schema).alias("value"))\
    .select(F.col("value.*"))\
    .withColumn("timestamp",F.from_unixtime(F.col("timestamp") / 1000))\
    .groupBy(F.window(F.col("timestamp"), "1 minutes"), F.col("timestamp"))\
    .agg(F.count(F.col("timeStamp")).alias("total_employees"),F.collect_list(F.col("on_duty")).alias("on_duty"),F.sum(F.when(F.col("on_duty") == 0, F.lit(1)).otherwise(F.lit(0))).alias("not_on_duty"))\
    .writeStream\
    .format("console")\
    .outputMode("complete")\
    .option("truncate", "false")\
    .start()\
    .awaitTermination()

Output

+---------------------------------------------+-------------------+---------------+------------+-----------+
|window                                       |timestamp          |total_employees|on_duty     |not_on_duty|
+---------------------------------------------+-------------------+---------------+------------+-----------+
|[2020-06-20 21:42:00.0,2020-06-20 21:43:00.0]|2020-06-20 21:42:51|4              |[1, 1, 0, 1]|1          |
|[2020-06-20 21:44:00.0,2020-06-20 21:45:00.0]|2020-06-20 21:44:51|3              |[0, 1, 0]   |2          |
|[2020-06-20 21:46:00.0,2020-06-20 21:47:00.0]|2020-06-20 21:46:51|4              |[0, 1, 0, 1]|2          |
|[2020-06-20 21:43:00.0,2020-06-20 21:44:00.0]|2020-06-20 21:43:51|4              |[1, 1, 0, 1]|1          |
+---------------------------------------------+-------------------+---------------+------------+-----------+

Spark Batch

spark \
    .read \
    .schema(schema) \
    .json("/tmp/data/emp_data.json") \
    .select(F.to_json(F.struct("*")).cast("string").alias("value")) \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "emp_data") \
    .save()

Spark Streaming

spark \
    .readStream \
    .schema(schema) \
    .json("/tmp/data/emp_data.json") \
    .select(F.to_json(F.struct("*")).cast("string").alias("value")) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "emp_data") \
    .start()

JSON data in kafka

/tmp/data> kafka-console-consumer --bootstrap-server localhost:9092 --topic emp_data
{"timeStamp":1592669811475,"emp_id":12471979,"on_duty":0}
{"timeStamp":1592669811475,"emp_id":12472154,"on_duty":1}
{"timeStamp":1592669811475,"emp_id":12471980,"on_duty":0}
{"timeStamp":1592669811475,"emp_id":12472181,"on_duty":1}
{"timeStamp":1592669691475,"emp_id":12471982,"on_duty":0}
{"timeStamp":1592669691475,"emp_id":12472183,"on_duty":1}
{"timeStamp":1592669691475,"emp_id":12471984,"on_duty":0}
{"timeStamp":1592669571475,"emp_id":12472185,"on_duty":1}
{"timeStamp":1592669571475,"emp_id":12472186,"on_duty":1}
{"timeStamp":1592669571475,"emp_id":12472187,"on_duty":0}
{"timeStamp":1592669571475,"emp_id":12472188,"on_duty":1}
{"timeStamp":1592669631475,"emp_id":12472185,"on_duty":1}
{"timeStamp":1592669631475,"emp_id":12472186,"on_duty":1}
{"timeStamp":1592669631475,"emp_id":12472187,"on_duty":0}
{"timeStamp":1592669631475,"emp_id":12472188,"on_duty":1}
^CProcessed a total of 15 messages
Srinivas
  • 8,957
  • 2
  • 12
  • 26
  • 1
    thank you for your answer but my console output is completely bizarre. I'll post a part of my console output in the question – jokol Jun 21 '20 at 00:34
  • 2
    it works with this code `datetime.fromtimestamp(item['timestamp']).strftime('%Y-%m-%d %H:%M:%S')` , the tiestamps are from 2017-2019 – jokol Jun 21 '20 at 00:52
  • 1
    it greatly helped :) thank you. A small question that i wasn't able to find anywhere else, how do you split a json and send it to kafka bit by bit at an interval? Must be a way to use the `split` in unix command, right – jokol Jun 21 '20 at 01:40
  • i meant something like this (ingestion of the tazi data into kafka) https://www.adaltas.com/en/2019/04/18/spark-streaming-data-pipelines-with-structured-streaming/ – jokol Jun 21 '20 at 01:46
  • so, spark-producer --> kafka message producer --> spark-processing ? Didn't know that was possible.. makes me wonder why we need kafka in the first place ;P – jokol Jun 21 '20 at 01:48
  • No..no.. what I am telling is read file using spark , transform that data as per your need and send that data to Kafka.. in this case you are sending data to kafka using spark..you can call this as producer. – Srinivas Jun 21 '20 at 01:50
  • i see, didn't know that. but would it be reasonable or even possible to open a large file, like 10GB, and process it? even the way I am doing it right now is incorrect for going through a large file – jokol Jun 21 '20 at 02:07
  • Yes, spark will process big files.. 10 GB is very less.. we are using spark to process TB of data. – Srinivas Jun 21 '20 at 02:11
  • just saw this.. if possible,could you share a link/code snippet/term to google for to preprocess json using spark and sending it to kafka? I can't find it.. – jokol Jun 21 '20 at 10:03
  • sure, I will share. – Srinivas Jun 21 '20 at 10:15
  • @Srinivas So you use spark to transform data and send to kafka ? That's interesting. Any links/github for it? I am in the same boat as jokl – No Holidays Jun 21 '20 at 12:53
  • 1
    No.. you can directly use dataframe and convert data of type string and column name as value then use format as Kafka to send messages – Srinivas Jun 21 '20 at 13:29
  • 1
    updated code check for spark batch & streaming sending messages to kafka. – Srinivas Jun 21 '20 at 14:10
  • thanks @Srinivas ! I am trying it right now. I actually had a followup to this which I posted here; https://stackoverflow.com/questions/62499604/processing-data-on-spark-structured-streaming-before-outputting-to-the-console. if you have the time :) – jokol Jun 21 '20 at 14:49