Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 4 additions & 20 deletions backend/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
from eventlet import greenthread
from flask import Flask, Response
from flask_cors import CORS, cross_origin
from flask_socketio import SocketIO, emit
from flask_socketio import SocketIO
from functools import wraps
from kafka import KafkaConsumer
import eventlet
import json
from kafka import KafkaConsumer
import logging
import os
import server.setup as setup
Expand Down Expand Up @@ -76,24 +76,8 @@ def get_health():


def kafkaconsumer():
consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_IP + ":" + KAFKA_PORT)
try:
while True:
msg_pack = consumer.poll()
if not msg_pack:
greenthread.sleep(1)
continue
for _, messages in msg_pack.items():
for message in messages:
message = json.loads(message.value.decode("utf8"))
log.info("Message: " + str(message))
try:
socketio.emit("consumer", {"data": message})
except Exception as error:
log.info(f"`{message}`, {repr(error)}")
continue
except KeyboardInterrupt:
pass
pass
# TODO: Await messages from the Kafka topic


@app.before_first_request
Expand Down
23 changes: 5 additions & 18 deletions backend/server/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def run(memgraph):
memgraph.drop_database()

log.info("Setting up PageRank")

memgraph.execute("CALL pagerank_online.set(100, 0.2) YIELD *")
memgraph.execute(
"""CREATE TRIGGER pagerank_trigger
Expand All @@ -36,6 +37,7 @@ def run(memgraph):
)

log.info("Setting up community detection")

memgraph.execute(
"CALL community_detection_online.set(False, False, 0.7, 4.0, 0.1, 'weight', 1.0, 100, 5) YIELD *;"
)
Expand All @@ -49,24 +51,9 @@ def run(memgraph):
)

log.info("Creating stream connections on Memgraph")
stream = MemgraphKafkaStream(
name="retweets",
topics=["retweets"],
transform="twitter.tweet",
bootstrap_servers="'kafka:9092'",
)
memgraph.create_stream(stream)
memgraph.start_stream(stream)

log.info("Creating triggers on Memgraph")
trigger = MemgraphTrigger(
name="created_trigger",
event_type=TriggerEventType.CREATE,
event_object=TriggerEventObject.ALL,
execution_phase=TriggerExecutionPhase.AFTER,
statement="CALL publisher.create(createdObjects)",
)
memgraph.create_trigger(trigger)
# TODO Create and start stream
log.info("Creating trigger on Memgraph")
# TODO Create trigger

except Exception as e:
log.info(f"Error on stream and trigger creation: {e}")
Expand Down
25 changes: 5 additions & 20 deletions memgraph/transformations/twitter.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,13 @@
from email.errors import MessageError
import mgp
import json


@mgp.transformation
def tweet(messages: mgp.Messages
) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
def tweet(
messages: mgp.Messages,
) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
result_queries = []

for i in range(messages.total_messages()):
message = messages.message_at(i)
tweet_dict = json.loads(message.payload().decode('utf8'))
if tweet_dict["target_username"]:
result_queries.append(
mgp.Record(
query=("MERGE (u1:User {username: $source_username}) "
"MERGE (u2:User {username: $target_username}) "
"MERGE (u1)-[:RETWEETED]-(u2)"),
parameters={
"source_username": tweet_dict["source_username"],
"target_username": tweet_dict["target_username"]}))
else:
result_queries.append(
mgp.Record(
query=("MERGE (:User {username: $source_username})"),
parameters={
"source_username": tweet_dict["source_username"]}))
# TODO: Write a transformation module
return result_queries