Iam currently developing a mini project that integrates Jupyter and Kafka with Spark Streaming for data processing, and uses Cassandra for storage. For visualization and alert management, I am using Grafana. The project also includes a specific feature to display the temperature from various regions, which is managed randomly. However, I am encountering an issue with the connection between the producer and consumer initially. I would appreciate some help, please. I have deployed this on Docker
producer
# Installation de la bibliothèque kafka-python si nécessaire !pip install kafka-python import json import random import time from kafka import KafkaProducer import threading from IPython.display import display import ipywidgets as widgets # Configuration du producteur Kafka producer = KafkaProducer( bootstrap_servers=['kafka:9092'], # Ajustez l'adresse si nécessaire value_serializer=lambda v: json.dumps(v).encode('utf-8') ) # Fonction pour générer des données simulées def generate_sensor_data(): return { "sensor_id": random.randint(1, 100), "timestamp": int(time.time() * 1000), "temperature": round(random.uniform(20.0, 35.0), 2), "humidity": round(random.uniform(40.0, 60.0), 2), "pressure": round(random.uniform(970.0, 1030.0), 2) } # Fonction pour envoyer les données def send_sensor_data(stop_event): while not stop_event.is_set(): data = generate_sensor_data() producer.send('temperature', value=data) print(f"Data sent: {data}") time.sleep(1) stop_event = threading.Event() # Widgets pour contrôler l'envoi des données start_button = widgets.Button(description="Start Sending Data") stop_button = widgets.Button(description="Stop Sending Data") output = widgets.Output() display(start_button, stop_button, output) def start_sending_data(b): with output: global thread if not stop_event.is_set(): stop_event.clear() thread = threading.Thread(target=send_sensor_data, args=(stop_event,)) thread.start() print("Started sending data...") def stop_sending_data(b): with output: if not stop_event.is_set(): stop_event.set() thread.join() print("Stopped sending data.") start_button.on_click(start_sending_data) stop_button.on_click(stop_sending_data)
et consumateur
`from pyspark.sql import SparkSession from pyspark.sql.functions import col, from_json, to_timestamp, from_unixtime, window, avg, min, max from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType # Initialisation de SparkSession avec l'intégration de Cassandra spark = SparkSession \ .builder \ .appName("Weather Data Streaming") \ .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \ .config("spark.cassandra.connection.host", "localhost") \ .getOrCreate() # Lecture des messages en streaming depuis Kafka df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "kafka:9092") \ .option("subscribe", "temperature") \ .option("startingOffsets", "earliest") \ .load() # Schéma des données JSON reçues de Kafka schema = StructType([ StructField("sensor_id", StringType()), StructField("timestamp", LongType()), # temps en millisecondes depuis l'époque UNIX StructField("temperature", DoubleType()), StructField("humidity", DoubleType()), StructField("pressure", DoubleType()) ]) # Transformation des données brutes en DataFrame structuré weather_data = df.select( from_json(col("value").cast("string"), schema).alias("data") ).select( "data.sensor_id", to_timestamp(from_unixtime(col("data.timestamp") / 1000)).alias("timestamp"), "data.temperature", "data.humidity", "data.pressure" ) # Calcul des statistiques sur les températures weather_stats = weather_data \ .groupBy( window(col("timestamp"), "1 hour"), col("sensor_id") ) \ .agg( avg("temperature").alias("avg_temp"), min("temperature").alias("min_temp"), max("temperature").alias("max_temp") ) # Fonction pour écrire les résultats dans Cassandra def write_to_cassandra(batch_df, batch_id): batch_df.write \ .format("org.apache.spark.sql.cassandra") \ .mode("append") \ .options(table="weather", keyspace="weatherSensors") \ .save() # Configuration du Stream pour écrire dans Cassandra query = weather_stats \ .writeStream \ .outputMode("complete") \ .foreachBatch(write_to_cassandra) \ .start() query.awaitTermination()`
Top comments (0)