반응형
제로베이스 데이터 분석 스쿨 내용에 대한 기록이다.
32번째는 Spark Pyspark 강의이다.
저번에 이어서 spark 내용을 진행했다.
[스파크 기본]
[환경 설정]
- 설치
!apt-get install openjdk-8-jdk-headless !wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz !tar -xf spark-3.0.0-bin-hadoop3.2.tgz !pip install findspark !pip install kaggle --upgrade import os import findspark os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2" findspark.init()
- pyspark 시작
-
from pyspark.sql import SparkSession spark = ( SparkSession .builder .appName("pyspark_test") .master("local[*]") .getOrCreate() )
-
- 캐글 api 연결
-
from google.colab import files files.upload()
- 토큰 json 파일 업로드
- 캐글 데이터셋 -> 다운로드 옆 점3개 -> copy api link
-
!mkdir -p ~/.kaggle/ !cp kaggle.json ~/.kaggle/ !chmod 600 ~/.kaggle/kaggle.json !kaggle datasets download -d wethanielaw/iowa-liquor-sales-20230401 # link here !unzip iowa-liquor-sales-20230401.zip
-
[spark dataframe]
- 읽어오기
- 너무 커서 판다스로 못읽는 경우도 pyspark로 가능하다.
-
df = spark.read.csv( path="Iowa_Liquor_Sales.csv", header=True, inferSchema=True )
- 데이터 체크
-
df.show(10) df.printSchema()
-
- 저장하기
-
# parquet 파일의 컬럼 이름은 공백이랑 괄호 불가능, replace 필수. df.write.format("parquet").save( path = "data_parquet", header=True )
-
- parquet 파일 다운로드
-
download_list = os.listdir("./data_parquet") for file_name in download_list: if file_name[-3:] != 'crc': files.download("./data_parquet/" + _)
-
- parquet 파일 읽어오기
-
parquet_df = spark.read.parquet("data_parquet")
-
- 데이터 처리
-
from pyspark.sql import functions as F parquet_df.filter(F.col("City")=="MAXWELL")
-
[spark SQL]
- 일단 데이터프레임으로 불러오고난 뒤 뷰로 등록한다.
- SQL like 코딩을 할 수 있지만 선호하는 편은 아니다.
- 뷰 등록
-
parquet_df.createOrReplaceTempView('parquet_sql')
-
- 쿼리 작성
-
sqlWay = spark.sql(''' select * from parquet_sql where City = 'MAXWELL' ''' )
-
[spark dataframe 함수]
- 데이터 보기
- df.show(10)
- 행 카운트
- df.count()
- 컬럼 정보 보기
- df.printSchema()
- 함수 import
- from pyspark.sql import functions as F
- null
- null 확인
- df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()
- null 데이터 세부내용 보기
- df.filter(F.col("col1").isNull()).show()
- null 데이터 드랍 (컬럼)
- df = df.drop("col1")
- 다른 값으로 채우기 (첫번째 값)
-
df = ( df .withColumn( "CategoryName", F.first( F.col("CategoryName"), ignorenulls=True ).over(W.partitionBy(F.col("Category")).orderBy(F.col("Date").desc())) ) )
-
- null 데이터 제거 (행)
- df = df.filter(F.col("col1").isNotNull())
- null 확인
- 전체 값 리스트로 출력
- df.select(F.collect_list('col1')).show()
- 고유한 값만 출력
- df.select(F.collect_set('col1')).show()
- 고유한 값의 갯수
- df.select(F.size(F.collect_set('col1'))).show()
- 윈도우 정의하고 사용
- over로 윈도우를 정의한다.
- W가 윈도우로 서브쿼리 같은 것이다.
- partitionBy는 group by와 유사하다.
-
df.withColumn("col3", F.collect_set("col2") .over(W.partitionBy ("col1")) ).show()
- date 형태로 변환
- df.printSchema() 으로 확인한다.
- date 이름인데 string인 경우 변환이 필요하다.
- 현재 포멧이 어떤식으로 써있는지 같이 입력한다.
- df = df.withColumn("Date", F.to_date(F.col("Date"), 'MM/dd/yyyy'))
- to_date 처럼 to_timestamp 도 있다.
반응형
'데이터분석 교육 (제로베이스)' 카테고리의 다른 글
[스터디 노트] 31번째 Spark Pyspark (241128), 제로베이스 데이터 분석 스쿨 내용 (1) | 2024.11.28 |
---|---|
[스터디 노트] 30번째 SQL 분석 Power BI (241108), 제로베이스 데이터 분석 스쿨 내용 (1) | 2024.11.08 |
[스터디 노트] 29번째 SQL 분석 (241106), 제로베이스 데이터 분석 스쿨 내용 (2) | 2024.11.06 |
[스터디 노트] 28번째 개인 프로젝트 (241029), 제로베이스 데이터 분석 스쿨 내용 (0) | 2024.10.29 |
[스터디 노트] 27번째 SQL 분석 Power BI (241023), 제로베이스 데이터 분석 스쿨 내용 (0) | 2024.10.23 |