본문 바로가기

프로그래밍

spark stream 특징 정리 3회

반응형

spark stream을 개발하면서 시간 값 정의했던 부분을 정리합니다.

//로그 원천의 시간값을 사용할 경우의 고려사항
//kafka로 유입되는 데이터는 5개의 파티션으로 구분되어 있으며 Spark structured stream의 kafka consumer가 가져온 데이터는 일정한 순서로 들어오지 않음.
//Spark structured stream에서 60초로 trigger로 실행되고 있을때 kafka offset을 계산하는 기준시간은 10:07:59.895 ~ 10:08:58.910 로 유동적인 구간이 지정됨.  정확한 10:08:00.000 ~ 10:08:59.999 의 시간으로 처리되지 않음.

//대응 방법 1
readStream.option("includeTimestamp", true) 
//설정하면 kafka consumer가 데이터를 수집한 시간을 사용할 수 있음
//모든 row에 대한 수집 시간을 추가로 할당 할 수 있음

//대응 방법 2, 최종 적용
//withColumn을 사용하여 할당
.withColumn("current_timestamp", current_timestamp) // 매 1분 마다 정해진 시간 값을 가짐
.withColumn("DATE_INFO", functions.date_format(expr("CAST(current_timestamp AS TIMESTAMP)"), "yyyy/MM/dd hh:mm"))

 

코드 상세 설명

로그 원천 시간을 사용

로그 원천 데이터에도 시간 값이 있었지만 시간값이 정확하게 적혀 들어오지 않는 문제가 있어서 추가적인 시간 데이터가 필요하게 되었습니다.

readStream.option("includeTimestamp", true) 

이 코드는 spark stream에서 kafak로 부터 데이터를 읽어 왔을 때의 시점을 기준으로 새로운 칼럼에 시간 값을 추가하게 된다. 이 설정을 테스트해본 이유는 log에도 시간 값이 있는데 어떤 시간 값을 사용해야 정확한 결과를 얻을 수 있을 지에 대한 고민에서 시작했다. log의 시간은 각 클라이언트가 찍는 시간이다 보니 약간의 오차가 있을 수 었으며 다른 포스팅에서도 적었지만 워터마크와 윈도우의 기준 시간이 될 때 어떤 시간 값을 사용해야 유용할지에 대해 판단을 위해 적용해 봤습니다. 한건 한건 spark에서 읽는 시점을 기준으로 시간 값을 가지게 되는 것을 확인했습니다. 잘 동작은 했지만 제가 개발하는 조건에는 맞지 않아서 다른 방법을 확인해 보게 되었습니다.

.withColumn("current_timestamp", current_timestamp) 
.withColumn("DATE_INFO", functions.date_format(expr("CAST(current_timestamp AS TIMESTAMP)"), "yyyy/MM/dd hh:mm"))

최종적으로 사용한 방법 입니다. withColumn을 두 번 써서 현재 시간 값을 캐스팅하는 것으로 처리했습니다. 이렇게 하는 것의 장점은 1분 동안 수집된 모든 데이터가 같은 시간 값을 가진 다는 것입니다. 1분이 지나면 삭제되고 새로운 1분의 데이터가 연산의 대상이 되어야 하는 조건으로 처리하기 좋은 시간 값이었습니다. 개발에 필요한 조건산 전체적으로 나열되어 있는 시간은 log내부에 있으나 연산 조건에는 필요 없었으며 단지 spark stream의 워터마크와 윈도우 조건에 맞는 적합한 시간 조건이 필요했고 딱 맞는 조건의 시간을 구현하여 사용하게 되었습니다. 

반응형