먼저 멀티프로세싱이란 여러 개의 프로세스가 작업을 병렬처리하면서 협렵적으로 일을 처리하는 것!
즉, 1개의 프로세스가 돌아가는게 아닌 돌리고 싶은 코어의 개수만큼 한번에 프로세스를 실행시키는 것이다.
노예를 부리고 싶은만큼 부려 일을 빨리 처리한다는 개념정도로 적어놓고,
자세한건 밑에 게시물에 적어놓았다.
우선 직장에서 데이터 파싱을 할 일이 생겨 난생 처음 파이썬과 판다스를 사용해 다양하게
데이터를 조작해보고 있는 중에 하달된 새로운 미션.
약 5,700개의 데이터를 찾아내리는데 단순히 for, if문으로만 처리했더니 데이터가 내려지는데만 30분 이상이 소요됬기에
멀티프로세싱을 사용하여 데이터를 읽고 쓰는 이 시간을 최대한 단축 시켜야 했다.
파이썬 멀티프로세싱의 종류가 굉장히 많은 것을 알고 다양한 방법을 검색해보던 중
다들 말을 너무 어렵게 해서 나같은 감자가 읽기엔 외계어 같길래 그냥 내가 이해한대로 조금은 쉽게 적어보려고 한다.
일단 멀티프로세스를 적용하는 위치는 중요하지 않다.
예제를 보면 main메서드를 호출해서 출력하는[if __name__ == "__main__"]에 냅다 적용을 했지만,
나같은 경우는 main 메서드 안 또는 멀티프로세싱을 돌려 줄 함수를 만들어서 적용시켰다.
결국 개발자가 필요한 위치에 멀티프로세싱을 지정해야하기 때문에 판단을 잘 해야한다.
만약 읽기와 쓰기가 있다면 데이터를 읽는 일이 더 오래걸리기 때문에 읽는 작업에 멀티프로세스를 적용시키고,
이 때 데이터를 쪼개지 않고 할당하게 되면 같은 데이터를 코어 수 만큼 돌게되니 오히려 성능면에서 문제가 생길 수 있다.
각 프로세스 당 한 로직을 다 돌리지 않으려면 작업 할당에 대해서는 꼭 생각하고 멀티프로세싱을 사용해야 할 것 같다.
multiprocessing.Pool 클래스는 파이썬 표준 라이브러리인 multiprocessing 모듈에 포함된 클래스이기 때문에
따로 모듈 설치를 할 필요가 없고, 이 클래스를 사용하여 병렬 처리를 간편하게 할 수 있다.
우선 멀티프로세싱을 사용하기 위해서는 import를 해주어야 한다.
from multiprocessing import Pool
[Multi Pool]
if __name__ == "__main__":
p = Pool(4) #사용 할 코어 수 지정
main()
p.close()
p.join()
main함수를 돌리는데 멀티 프로세스를 적용하고 싶다면 먼저 프로세스 풀을 생성해서 사용 할 코어 수를 지정한 후
main함수가 호출되면 멀티프로세스 작업이 시작된다.
p.close()는 더 이상 풀에 추가 작업이 들어가지 않으니 지금 수행 중인 작업이 모두 끝나면 풀을 종료하라는 의미이며
p.join()은 모든 프로세스들이 종료가 완료될 때 까지 대기시켜주는 역할을 한다.
이 예제 코드에서는 사용되지 않았지만 terminate()라는 것도 있는데,
이건 현재 진행 중인 작업이 있더라도 즉시 풀의 프로세스들을 종료시킬 수 있으니 상황에 따라 맞춰서 써주면 된다.
[map]
result = p.map(func, iterable)
내가 제일 먼저 적용했던 건 map이었다.
multi Pool로 시도를 했지만 잘 안되는 것 같은 기분(?)이 들어서 다른걸 찾아보다가 발견한 map메서드.
주어진 함수를 순회 가능한(iterable) 객체의 각 요소에 적용하여 새로운 결과를 반환하는 함수이다.
쉽게 이야기하면 iterable 입력 리스트의 요소 하나하나를 지정한 함수로 처리하는 구조이다.
이 포스팅을 하면서 찾아보니 map은 map객체를 리턴해서 list(p.map()) 이런 형식으로 프린트 해야한다곤 하는데,
원래도 list라고 알고 있어서 이 부분은 잘 모르겠다. 알게되면 수정하거나 다른 포스팅을 올려봐야겠다.
map은 각 작업이 순차적으로 처리되도록 작동이 되는 방식으로
iterable의 각 요소에 함수를 적용하고 그 결과를 반환하는 방식으로 동작된다.
나는 쿼리를 찾는 함수를 따로 작성하고 df의 특정 컬럼에 iterable를 매칭해서 컬럼을 채워야 했기 때문에 아래와 같이 코드를 작성했다.
with Pool(4) as p :
df ['column'] = p.map(func,iterable)
쿼리를 불러오는 로직을 작성한 함수를 func 자리에서 호출해주고,
iterable은 어떤 값을 기준으로 매칭해서 컬럼을 채울건지를 df['column '] 이런 형식으로 알려줬다.
iterable로 찾아 온 요소를 적용해야 했기 때문에 result대신 특정 칼럼으로 직접 받아준 코드다.
[map_async]
map을 쓰려고 찾아보다가 알게된 함수인데 map_async라는 메서드이다.
사용방법은 map과 동일한 매개변수를 사용하기 때문에 메서드 명만 변경해주면 된다.
result = pool.map_async(func, iterable)
result.get()
대신 결과값을 가져오기 위해서는 .get() 메서드를 사용해야 값을 받아올 수 있으니 참고해두자.
map과 map_async 의 가장 큰 차이점은 비동기적으로 작동하여 작업을 프로세스 풀에 배치한 후 다른 작업을 수행한다는 점이다.
즉, 작업이 다 끝나지 않아도 메인 프로세스의 다음 줄을 실행하기 때문에 map보다 유연성이 높은 메서드라고 할 수 있다.
그렇기 때문에 결과를 가져오기 전에 다른 작업을 수행할 수 있으므로, 결과가 필요한 부분에서 적절한 동기화 작업이 필수적이다.
요약하자면, map()은 동기적으로 작동하여 모든 작업이 순차적으로 이루어지는 반면,
map_async()은 비동기적으로 작동하여 작업을 프로세스 풀에 배치한 후 다른 작업을 수행한다고 생각하면 쉽다!
그냥 map말고도 starmap()/starmap_async()이나 imap(), imap_unordered()등의 다양한 함수가 있는데
가볍게 설명하고 넘어가보자면
starmap()/starmap_async()은 인자를 두개 받을 수 있다는 점이 map(), map_async()와 같고
map은 결과값이 list지만 imap()은 결과물이 iterator이다.
기본 chunksize는 1이지만 1대신 적절한 큰 값을 쓰면 훨씬 신속한 처리가 가능하다.
결과물의 길이가 길어 list로 나타낼 때 메모리 부담이 가기 때문에 이럴 때 imap을 사용해주면 적절하다.
imap_unordered()은 순서가 보장되지 않기 때문에 순서에 상관없는 데이터를 다룰 때 사용한다.
[apply]
프로세스 풀에서 많이 쓰는 apply.
특정 함수를 행 또는 열에 적용하여 작업을 수행하는 데 사용한다.
일반적으로 데이터프레임의 각 행 또는 열에 함수를 적용하고자 할 때 사용하는데,
이를 통해 데이터프레임의 각 요소에 함수를 적용하고 결과 반환이 가능하다.
예를 들어, 데이터프레임의 열에 대해 평균 값을 계산하거나, 특정 조건을 기반으로 데이터를 필터링하거나,
사용자 정의 함수를 적용하여 데이터를 변환하는 등의 작업을 할 수 있게 되는 것이다.
df.apply(func, axis=0, raw=False, result_type=None, args=(), **kwds)
func : 적용 할 함수를 호출한다.
axis : 함수를 적용할 축을 지정하는 옵션으로 기본값은 0. 열에 대해 함수를 적용하고 값을 1로 바꿔주면 행에 대해 함수를 적용한다.
raw : 함수에 넘겨질 객체가 축의 값을 유지할 지 여부를 지정하는 옵션으로 기본값은 False.
객체가 축의 값을 보정한 후 함수에 전달된다.
result_type : 결과로 반환되는 객체의 유형을 이 안에서 지정해줄 수 있는데, 기본값은 None으로
함수의 반환 유형을 따른다.
args : 함수에 전달 할 위치 인수값을 적는다.
**kwds : 추가로 함수에 전달할 키워드 인수값을 적는다.
모든 옵션이 필수는 아니고, 기본값으로 유지 할 경우 생략도 가능하다.
말로만으로는 너무 어려운 것 같아서 예시 코드를 하나 적어봤다.
import pandas as pd
# 데이터프레임 생성
data = {'A': [1, 2, 3, 4],
'B': [5, 6, 7, 8],
'C': [9, 10, 11, 12]}
df = pd.DataFrame(data)
# 각 열에 대해 평균을 구하는 함수 정의
def mean_of_column(column):
return column.mean()
# apply() 함수를 사용하여 각 열에 함수 적용
result = df.apply(mean_of_column, axis=0)
print(result)
mean_of_column 메서드에서는 각 열의 평균을 계산한다..
생성한 데이터 프레임을 출력해보면
A B C
0 1 5 9
1 2 6 10
2 3 7 11
3 4 8 12
이런 형태로 만들어져 있을 것이고, 이걸 평균값을 계산하는 메서드에 넣고 돌리면
result는 각 a, b, c 칼럼에 평균값이 계산되어 아래처럼 출력이 될 것 이다.
A 2.5
B 6.5
C 10.5
dtype: float64
apply_async()메서드도 있는데, 이것도 map에서의 async메서드와 비슷하다.
프로세스 풀에게 작업을 하나 시킨 뒤 AsyncResult를 반환받고,
반환받은 AsyncResult에서 get()을 호출하면 작업의 반환 값이 나온다.
단, 반환받은 AsyncResult의 get()을 호출하면 그 작업이 끝나기 전까지는 메인 프로세스에서도 다음 줄로 넘어갈 수가 없다.
그 밖에도 apply_progress 등을 사용하면 tqdm처럼 진척도를 볼 수 있다던가,
굉장히 다양한 함수들이 많이 존재하고 있다.
내가 최종적으로 적용한건 apply와 map이었다.
적용한 코드를 리뷰할 겸 복습할 겸 적어본다.
def parallelize_dataframe(df, func) :
df_split = np.array_split(df, num_cores)
pool = Pool(num_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
poo.join()
return df
먼저 병렬처리 검색하면 나오는 국룰(?)코드를 기준으로 parallelize_dataframe함수를 하나 생성했다.
def parallelize_dataframe (df, func, n_cores=5) :
df_split = np.array_split(df, n_cores)
pool = multiprocessing.Pool(n_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df
인수 안에서 코어 수를 5개로 미리 지정했고, 코드를 하나씩 살펴보면 아래와 같다.
df_split = np.array_split(df, n_cores) #코어의 개수만큼 df 나눠주고
pool = multiprocessing.Pool(n_cores) # Pool을 코어만큼 생성해서
df = pd.concat(pool.map(func, df_split)) #나눠진 df를 func으로 넘겨 수행한 뒤 concat으로 쪼개진 데이터를 다시 합쳐준다.
이 코드를 어디서 사용하냐면 main메서드 안에서 호출한다.
main 메서드 안에서는 df_processed 변수라는 이름으로 위에 작성한 멀티프로세싱 함수를 받고 있으며
인수로는 내가 사용하고 있는 데이터프레임 엑셀원본과 processing이라는 메서드를 받는다.
processing이 parallelize_dataframe의 func에, df_excel이 df 값에 전달이 되는거다.
df_processed = parallelize_dataframe(df_excel, processing)
processing 메서드를 따라가보면 난 아래와 같이 작성했다.
def processing(df) :
df[['A', 'B', 'C', 'D', 'E', 'F']] = df.apply(lambda x: get_info(x), axis=1, result_type='expand')
return df
람다식을 이용해 데이터프레임의 각 행에 대해 get_info 함수를 인수로 전달하고 있으며
result_type='expand'를 사용하여 결과를 확장된 형태로 반환하도록 지정했다.
apply 함수를 사용해서 반환하는 결과는 각 요소를 열로 갖는 데이터프레임 형태로 반환할 수 있도록 코드를 작성해줬다.
전체 결과값은 선택한 열에 할당이 되어 내가 원하는 값이 매칭되어 데이터프레임으로 리턴이 되어 내가 원하는 값을 받아볼 수 있었다.
'💻BackEnd > 🔵Python' 카테고리의 다른 글
[Python] Python 에서 괄호 - (), [], {} (0) | 2024.07.06 |
---|