cl1p.net - The internet clipboard
Login/Sign Up
cl1p.net/spark
cl1p.net/spark
Login/Sign Up
This cl1p will be deleted in in 19 days.
Copy
1 # 1. Install Java and download Spark !apt-get update -qq !apt-get install -y openjdk-11-jdk-headless -qq # 2. Download and extract Spark 3.5.0 (Hadoop 3) !wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz !tar -xzf spark-3.5.0-bin-hadoop3.tgz # 3. Install Python packages # - findspark helps Colab find the Spark installation # - pyspark is the Python API for Spark (sometimes optional but safe) !pip install -q findspark pyspark 2 import os import findspark # set JAVA and SPARK home (must match installed packages above) os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64' os.environ['SPARK_HOME'] = '/content/spark-3.5.0-bin-hadoop3' findspark.init() # allow python to find spark from pyspark.sql import SparkSession from pyspark.sql.functions import col, sum as _sum # create Spark session spark = SparkSession.builder.appName("COVID_Large_Analysis").getOrCreate() print("Spark session started") 3.a # Download the CSV used in your notebook (countries-aggregated from GitHub) !wget -q https://raw.githubusercontent.com/datasets/covid-19/main/data/countries-aggregated.csv -O /content/covid.csv # show first few bytes to confirm download (optional) !head -n 5 /content/covid.csv 3.b # Read the CSV with Spark and show sample rows data_path = "/content/covid.csv" df = ( spark.read .option("header", True) .option("inferSchema", True) .csv(data_path) ) print("Dataset loaded successfully") df.show(5, truncate=False) 4 # show latest records for India df.filter(col("Country") == "India").orderBy(col("Date").desc()).show(10, truncate=False) 5 # Country summary aggregations country_summary = ( df.groupBy("Country") .agg( _sum("Confirmed").alias("TotalConfirmed"), _sum("Deaths").alias("TotalDeaths"), _sum("Recovered").alias("TotalRecovered") ) .orderBy(col("TotalConfirmed").desc()) ) print("Top 10 countries by total confirmed cases:") country_summary.show(10, truncate=False) 6 print("Top 10 countries by deaths:") country_summary.orderBy(col("TotalDeaths").desc()).show(10, truncate=False) # Daily global trend (sum across countries by Date) daily_summary = ( df.groupBy("Date") .agg( _sum("Confirmed").alias("WorldConfirmed"), _sum("Deaths").alias("WorldDeaths"), _sum("Recovered").alias("WorldRecovered") ) .orderBy("Date") ) print("Daily global trend sample:") daily_summary.show(10, truncate=False) 7 # convert top 10 country_summary to pandas for plotting top10_pd = country_summary.limit(10).toPandas() import matplotlib.pyplot as plt plt.figure(figsize=(12,6)) plt.bar(top10_pd['Country'], top10_pd['TotalConfirmed']) plt.xticks(rotation=45, ha='right') plt.ylabel('Total Confirmed Cases') plt.xlabel('Country') plt.title('Top 10 Countries by Total Confirmed Cases') plt.tight_layout() plt.show() 8 # write a small text file for the example text_data = """Data science is fun. Data processing using Spark is powerful. Big data needs distributed computing. Spark makes big data processing easy.""" with open("/content/input.txt", "w") as f: f.write(text_data) # RDD-based word count sc = spark.sparkContext rdd = sc.textFile("/content/input.txt") words = rdd.flatMap(lambda line: line.split(" ")) word_pairs = words.map(lambda w: (w.lower().strip(), 1)) word_count = word_pairs.reduceByKey(lambda a, b: a + b) # convert to DataFrame and show sorted counts df_word = word_count.toDF(["word", "count"]) df_sorted = df_word.orderBy(col("count").desc()) df_sorted.show(truncate=False) 9 # Retail store simulated streaming (simple batched processing) from pyspark.sql.functions import col as Fcol import pandas as pd import numpy as np import time spark_stream = SparkSession.builder.appName("RetailStreamingInMemory").getOrCreate() def generate_batch(batch_num, rows=500): data = { "timestamp": pd.date_range("2025-11-01", periods=rows, freq="S"), "store": np.random.choice(["Store_A", "Store_B", "Store_C", "Store_D"], rows), "product": np.random.choice(["Milk","Bread","Butter","Juice","Apple"], rows), "quantity": np.random.randint(1, 20, rows), "price": np.round(np.random.uniform(10, 100, rows), 2), } dfp = pd.DataFrame(data) return spark_stream.createDataFrame(dfp) # simulate a few batches and aggregate by store & product batches = [generate_batch(i, rows=500) for i in range(1,4)] print("Streaming started - processing incoming data...") final_store_product = None def combine(old_df, new_df, keys): if old_df is None: return new_df else: return old_df.union(new_df).groupBy(keys).agg(_sum("total_sales").alias("total_sales")) for i, batch_df in enumerate(batches, start=1): enriched_df = batch_df.withColumn("total", Fcol("quantity") * Fcol("price")) store_product_sales = enriched_df.groupBy("store", "product").agg(_sum("total").alias("total_sales")) print(f"\nAggregated results for batch {i}:") store_product_sales.show(truncate=False) final_store_product = combine(final_store_product, store_product_sales, ["store", "product"]) time.sleep(1) print("\nFinal Aggregated Sales by Store and Product:") final_store_product.show(truncate=False) print("\nStreaming completed successfully!") 10 spark.stop() print("Spark session stopped.")