cl1p.net - The internet clipboard
Login/Sign Up
cl1p.net/spark
cl1p.net/spark
Login/Sign Up
This cl1p will be deleted in in 25 days.
Copy
Pyspark dataframe creation from database !pip install pyspark !wget https://github.com/xerial/sqlite-jdbc/releases/download/3.46.0.0/sqlite-jdbc-3.46.0.0.jar from pyspark.sql import SparkSession from pyspark.sql.functions import col, mean, when spark = SparkSession.builder \ .appName("SQLiteDatabaseIngestion") \ .config("spark.jars", "/content/sqlite-jdbc-3.46.0.0.jar") \ .getOrCreate() import sqlite3 import pandas as pd data = { "id": [1, 2, 3, 4, 5], "name": ["Alice", "Bob", "Charlie", "David", None], "age": [25, None, 30, 35, None] } pdf = pd.DataFrame(data) conn = sqlite3.connect("/content/sample.db") pdf.to_sql("people", conn, if_exists="replace", index=False) conn.close() print("SQLite database created with missing values!") db_url = "jdbc:sqlite:/content/sample.db" db_properties = {"driver": "org.sqlite.JDBC"} df = spark.read.jdbc(url=db_url, table="people", properties=db_properties) print("Data read from SQLite:") df.show() from pyspark.sql.functions import sum as _sum missing = df.select([_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]) print("Missing values per column:") missing.show() mean_age = df.select(mean(col("age"))).collect()[0][0] # Replace nulls df_filled = df.withColumn("age", when(col("age").isNull(), mean_age).otherwise(col("age"))) \ .withColumn("name", when(col("name").isNull(), "Unknown").otherwise(col("name"))) print("After replacing missing values:") df_filled.show() df_filled.write.jdbc(url=db_url, table="people_cleaned", mode="overwrite", properties=db_properties) print("Cleaned data saved back to database!") Rdd in pyspark from google.colab import files uploaded=files.upload() file_path="employees.csv" #load file into RDD rdd=sc.textFile(file_path) #remove header header=rdd.first() data_rdd=rdd.filter(lambda row:row!=header) record_count=data_rdd.count() print(record_count) salary_rdd=data_rdd.filter(lambda row:int(row.split(",")[7])>7000) print(salary_rdd.collect()) #reduce:sum salaries and counts dept_salary_sum=dept_salary_rdd.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])) dept_salary_sum.collect() #compute averages dept_avg_salary=dept_salary_sum.mapValues(lambda x:x[0]/x[1]) dept_avg_salary.collect() Dataframe creation from csv file from pyspark.sql import SparkSession spark=SparkSession.builder.appName("EmployeeApp").getOrCreate() from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType,DoubleType,DateType from pyspark.sql.functions import col,avg,round,datediff,current_date,year,month,dayofmonth,when schema=StructType([ StructField("EmployeeId",StringType(),True), StructField("Name",StringType(),True), StructField("Department",StringType(),True), StructField("Salary",FloatType(),True), StructField("JoinDate",DateType(),True) ]) df=spark.read.csv("/content/emp (2).csv",header=True,schema=schema) dept_avg=df.groupBy("Department").agg(round(avg("Salary"),2).alias("AverageSalary")) print("Average Salary per Department") dept_avg.show() df=df.join(dept_avg,on="Department",how="left") df=df.withColumn("Salary_Increase",round(((col("Salary")-col("AverageSalary"))/col("AverageSalary"))*100,2)) df=df.withColumn("YearsWithCompany",round(datediff(current_date(),col("JoinDate"))/365,1)) #Salary Category df=df.withColumn("Salary_Category",when(col("Salary")<50000,"Low").when((col("Salary")>=50000)&(col("Salary")<100000),"Medium").otherwise("High")) print("Final Employee Data with Derived Columns:") df.show(truncate=False)