Notice
Recent Posts
Recent Comments
Link
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
Tags
more
Archives
Today
Total
관리 메뉴

짱이 될거야

2022-09-21(2): pyspark에서 dataframe 활용하기 모음 본문

Today I Learned

2022-09-21(2): pyspark에서 dataframe 활용하기 모음

jeong57 2022. 9. 21. 23:16

이번에는 pyspark에서 dataframe으로 할 수 있는 것들을 다 정리해보고자 한다.

내가 프로젝트를 하면서 꼭 필요했던 것만 정리해둔 것으로 나중에 나도 기억이 나지 않을 때 참고할 예정이다.

 

[1] csv를 가져와서 dataframe 만들기

1. csv에 컬럼명이 있는 경우

먼저, csv가 저장되어 있는 경로를 변수 PATH에다가 저장해둔다(재사용 위해).

그 다음, SparkSession.builder로 spark를 만들고 거기에다 read.csv로 저장해둔 csv 파일을 읽어와서 dataframe을 만든다.

header=True: 컬럼명이 있는 csv

encoding='cp949': 폴더에 저장해둔 csv를 열어보면 한글이 깨져있는 경우가 있는데, 그것을 복구시키기 위해서 인코딩을 해준다.

import findspark
findspark.init()
from pyspark.sql import SparkSession


PATH = './data.csv'
    
spark = SparkSession.builder\
    .master('local[*]')\
    .appName('SparkSQL')\
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

file_df = spark.read.csv(PATH, header=True, encoding='cp949')
file_df.show()
# file_df.write.csv("./csv")	# "csv" 파일이 생기고 그 안에 해당 csv가 저장된다.

 

2. csv에 컬럼명이 없는 경우

Sqoop을 이용해 DB에 있는 값을 Hadoop으로 옮기면 csv에 있던 컬럼명이 없어지게 된다.

하지만 spark 내에서는 컬럼명을 활용해 연산을 처리해야 하는 경우가 많고, 따라서 컬럼명만 있는 뼈대에다가 컬럼명 없이 값만 있는 데이터를 집어넣었다.

먼저, schema를 만드는데, 이때 컬럼명과 각각에 해당하는 type을 지정한다.

그리고 spark를 만들고 이전에 만들어둔 schema에 csv 값을 불러와서 dataframe을 만든다.

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, DateType, IntegerType
import pyspark.sql.functions as f
from pyspark.sql import SparkSession


PATH = './data.csv'
# 컬럼명으로 schema 만들기
schema = StructType([
    StructField("index", LongType(), True),   # index: [0]
    StructField("createdAt", DateType(), True),
    StructField("content", StringType(), True)
])

# 컬럼명이 있는 schema에 컬럼명이 없는 csv 넣기
spark = SparkSession.builder\
    .master('local[*]')\
    .appName('SparkSQL')\
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
df = spark.read.csv(PATH, schema=schema)

 

 

[2] 직접 입력한 값으로 dataframe 만들기

[1]번의 경우는 csv 파일을 가지고 와서 dataframe으로 만든 것이고, 이번에는 직접 만든 값으로 dataframe을 만들어보자.

content = 'abc'
data = [Row(content)]

spark = SparkSession.builder.getOrCreate()
# toDF: dataframe으로 변환해준다, "content": 컬럼명 지정
df3 = spark.createDataFrame(data).toDF("content")

 

 

[3] dataframe의 특정 열 값 바꾸기(no "replace")

2022-09-21: pyspark dataframe에서 okt 사용하기, PicklingError

 

2022-09-21: pyspark dataframe에서 okt 사용하기, PicklingError

Python에서 형태소를 분석할 수 있는 라이브러리로 Konlpy가 있다. Konlpy 안에서도 여러 방법이 있는데, 그 중에서 특히 Okt는 형태소 분석에 사용되는 사전을 커스터마이징할 수 있고 오타도 대략적

jeong57.tistory.com

위 포스팅에도 언급한 것처럼, 구글에 나와있는 대부분의 코드는 dataframe의 특정 열 "전체"를 "같은 값"으로 바꾸는 코드이다.

나는 특정 열의 각 행의 값을 "다른 값"으로 바꾸고자 했고, map을 썼다.

map을 돌리면 한 번에 하나의 행이 나오는데, 거기서 필요한 값들만 가져와서 다른 dataframe에 넣거나 아니면 값을 바꿔서 넣으면 된다.

# dataframe -> rdd -> dataframe
df2 = df.rdd.map(lambda x:	# for example, x is a row (x=[1, '나는', 2022-09-21 00:00:00])
	(x[0], morpheme_analysis(x[1]), x[2])	# 1, ['나', '는'], 2022-09-21 00:00:00
    )
df3 = df2.toDF(["index", "content", "date"])
df3.show()

참고 replace는 어떠한 "단어"를 원하는 단어로 바꾸는 것이기 때문에 이 경우에는 적절하지 않다.

 

 

[4] pyspark.sql.functions.explode

다음과 같은 dataframe이 있다고 가정하자. 현재 'content' 열에 모여있는 데이터들을 쪼개고 싶다면 explode를 사용한다.

idx content
1 a, b, c, d, e
2 f, g, h, i
df.select(f.explode(f.col("content").alias('contents').show()
contents
a
b
c
d
e
f
g
h
i

 

 

[5] groupBy, agg, concat_ws, collect_list, alias

1. groupBy

  • 특정 열에서 같은 값들을 기준으로 묶는다.

2. agg

  • [공식문서] GroupedData.agg(*exprs)
  • Compute aggregates(집합) and returns the result as a DataFrame.

3. pyspark.sql.functions.concat_ws

  • [공식문서] pyspark.sql.functions.concat_ws(sep, *cols)
  • 여러 column들의 값을 sep(구분자)로 묶는다.

4. pyspark.sql.functions.collect_list

  • [공식문서] pyspark.sql.functions.collect_list(col)
  • 한 개의 기준에 여러 가지 값이 있을 때 해당하는 값들을 array 형식으로 묶어준다.

5. alias

  • 열 이름을 바꿔준다.

 

예시

1. 원본

index content
1 a, b, c
1 d, e
2 f, g

2. 결과

index contents
1 [a, b, c d, e]
2 [f, g]

 

# index 기준으로 값을 묶는다
df.groupBy(f.col("index"))\
    .agg(f.concat_ws(" ", f.collect_list(f.col("content")))).alias("contents")\
    .sort("index")\
    .show()

 

 

[6] pyspark.sql.functions.regexp_replace

이것을 알기 위해서는 먼저 select와 withColumn을 알아야 한다.

select는 dataframe에서 특정 열만 선택하는 것인데, df2 = df.select(f.col('index'))와 같이 다른 dataframe으로 넘길 수 없기 때문에 나는 잘 사용하지 않았다. (방법이 있는데 내가 모르는 것일 수도 있다)

withColumn은 dataframe에 새로운 행을 추가하고 싶을 때 쓰는데, 기존에 있는 행을 선택할 때 쓰기도 하는 것 같다. 후자의 경우 select와 달리 그 결과가 dataframe으로 저장된다.

 

[공식문서] pyspark.sql.functions.regexp_replace(str, pattern, replacement)

Replace all substrings of the specified string value that match regexp with rep.

 

regexp_replace의 사용법은 다음과 같다.

아래 코드는 content 열에 "{" 또는 "}" 값이 있다면 ""로 바꾸라는 의미이다.

regexp_replace에서 여러 개의 문자를 동시에 바꾸고 싶을 때는 "|"를 쓰는데, 여기서는 OR이라는 뜻으로 쓰인다.

df2 = df.withColumn("content", f.regexp_replace("content", "\\{|\\}", ""))

 

결론

구글링을 해보니까 보통 pandas를 많이 쓰고 pyspark dataframe에 대한 코드는 많이 없었다.

필요할 때마다 코드를 하나씩 긁어 모았고, 때때로 구글에 코드가 정 나오지 않을 때는 혼자서 이것저것 해보다 답이 나온 경우도 있어서 내가 적어둔 것보다 더 깔끔하게 짜는 코드가 있을 수도 있다.

하지만 적어도 나는 프로젝트를 하면서 정말 간절하게 필요했었던 부분들이고, 앞으로도 수십 번은 더 사용할 것이다. 그렇기 때문에 기록으로 남겨둔다.

Comments