본문 바로가기
데이터분석 교육 (제로베이스)

[스터디 노트] 32번째 Spark Pyspark (241205), 제로베이스 데이터 분석 스쿨 내용

by davidlds 2024. 12. 5.
반응형

제로베이스 데이터 분석 스쿨 내용에 대한 기록이다.

32번째는 Spark Pyspark 강의이다.

 

저번에 이어서 spark 내용을 진행했다.

 

[스파크 기본]

[환경 설정]

  • 설치
    •  
    • !apt-get install openjdk-8-jdk-headless !wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz !tar -xf spark-3.0.0-bin-hadoop3.2.tgz !pip install findspark !pip install kaggle --upgrade import os import findspark os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2" findspark.init()
  • pyspark 시작
    • from pyspark.sql import SparkSession
      
      spark = (
        SparkSession
        .builder
        .appName("pyspark_test")
        .master("local[*]")
        .getOrCreate()
      )
      
  • 캐글 api 연결
    • from google.colab import files
      
      files.upload()
      
    • 토큰 json 파일 업로드
    • 캐글 데이터셋 -> 다운로드 옆 점3개 -> copy api link
    • !mkdir -p ~/.kaggle/
      !cp kaggle.json ~/.kaggle/
      !chmod 600 ~/.kaggle/kaggle.json
      !kaggle datasets download -d wethanielaw/iowa-liquor-sales-20230401  # link here
      !unzip iowa-liquor-sales-20230401.zip
      



[spark dataframe]

  • 읽어오기
    • 너무 커서 판다스로 못읽는 경우도 pyspark로 가능하다.
    • df = spark.read.csv(
        path="Iowa_Liquor_Sales.csv", header=True, inferSchema=True
      )
      
  • 데이터 체크
    • df.show(10)
      df.printSchema()
      
  • 저장하기
    • # parquet 파일의 컬럼 이름은 공백이랑 괄호 불가능, replace 필수.
      df.write.format("parquet").save(
        path = "data_parquet",
        header=True
      )
      
  • parquet 파일 다운로드
    • download_list = os.listdir("./data_parquet")
      for file_name in download_list:
          if file_name[-3:] != 'crc':
              files.download("./data_parquet/" + _)
      
  • parquet 파일 읽어오기
    • parquet_df = spark.read.parquet("data_parquet")
      
  • 데이터 처리
    • from pyspark.sql import functions as F
      
      parquet_df.filter(F.col("City")=="MAXWELL")
      



[spark SQL]

  • 일단 데이터프레임으로 불러오고난 뒤 뷰로 등록한다.
  • SQL like 코딩을 할 수 있지만 선호하는 편은 아니다.
  • 뷰 등록
    • parquet_df.createOrReplaceTempView('parquet_sql')
      
  • 쿼리 작성
    • sqlWay = spark.sql('''
        select *
        from parquet_sql
        where City = 'MAXWELL'
        '''
      )
      



[spark dataframe 함수]

  • 데이터 보기
    • df.show(10)
  • 행 카운트
    • df.count()
  • 컬럼 정보 보기
    • df.printSchema()
  • 함수 import
    • from pyspark.sql import functions as F
  • null
    • null 확인
      • df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()
    • null 데이터 세부내용 보기
      • df.filter(F.col("col1").isNull()).show()
    • null 데이터 드랍 (컬럼)
      • df = df.drop("col1")
    • 다른 값으로 채우기 (첫번째 값)
      • df = (
        df
        .withColumn(
            "CategoryName",
            F.first(
                F.col("CategoryName"), ignorenulls=True
            ).over(W.partitionBy(F.col("Category")).orderBy(F.col("Date").desc()))
            )
        )
        
    • null 데이터 제거 (행)
      • df = df.filter(F.col("col1").isNotNull())
  • 전체 값 리스트로 출력
    • df.select(F.collect_list('col1')).show()
  • 고유한 값만 출력
    • df.select(F.collect_set('col1')).show()
  • 고유한 값의 갯수
    • df.select(F.size(F.collect_set('col1'))).show()
  • 윈도우 정의하고 사용
    • over로 윈도우를 정의한다.
    • W가 윈도우로 서브쿼리 같은 것이다.
    • partitionBy는 group by와 유사하다.
    • df.withColumn("col3",
                  F.collect_set("col2")
                  .over(W.partitionBy  ("col1"))
                  ).show()
      
  • date 형태로 변환
    • df.printSchema() 으로 확인한다.
    • date 이름인데 string인 경우 변환이 필요하다.
    • 현재 포멧이 어떤식으로 써있는지 같이 입력한다.
    • df = df.withColumn("Date", F.to_date(F.col("Date"), 'MM/dd/yyyy'))
    • to_date 처럼 to_timestamp 도 있다.

 

깃허브 링크

 

깃허브 Spark 공부 부분 링크

반응형