I'm trying to figure out how streaming with Kafka works in combination with Memgraph. I have a Memgraph running in a Docker container. I've created a module called music.py
using Visual Studio Code but I can't save it into the docker.
import mgp
import json
@mgp.transformation
def rating(messages: mgp.Messages
) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
result_queries = []
for i in range(messages.total_messages()):
message = messages.message_at(i)
movie_dict = json.loads(message.payload().decode('utf8'))
result_queries.append(
mgp.Record(
query=("MERGE (u:User {id: $userId}) "
"MERGE (m:Album {id: $albumId, title: $title}) "
"WITH u, m "
"UNWIND $genres as genre "
"MERGE (m)-[:OF_GENRE]->(:Genre {name: genre}) "
"MERGE (u)-[r:RATED {rating: ToFloat($rating), timestamp: $timestamp}]->(m)"),
parameters={
"userId": album_dict["userId"],
"albumId": album_dict["movie"]["movieId"],
"title": album_dict["album"]["title"],
"genres": album_dict["album"]["genres"],
"rating": album_dict["rating"],
"timestamp": album_dict["timestamp"]}))
return result_queries
Should I run vi
inside docker and copy/paste the code into it or is there another way?