spark stream을 개발하면서 적용한 코드 일부를 정리합니다.
.withWatermark("current_timestamp", "1 minutes") // 지정한 시간 이후의 데이터는 삭제됨.
//group by 조건에 windows를 사용할 수 있음
window($"current_timestamp", "1 minutes", "1 minutes")
// -> 매 1분마다 지난 1분 데이터가 대상, window($"timestamp", "10 minutes", "5 minutes") -> 매 5분 마다 지난 10분이 대상 데이터.
.trigger(Trigger.ProcessingTime("60 seconds")) //를 지정하면 실행한 시간에 관계 없이 정각을 기준으로 trigger됨
//10분 50초에 실행하면 30초의 동작 시간이 시난 후 바로 이어서 11분 00초 trigger가 동작하게됨.
//최초 3번 정도는 kafka로부터 1분 데이터 전체를 가져오지 못할 수 있음.
옵션 상세 설명
.withWatermark("current_timestamp", "1 minutes")
설정 은 지정한 시간이 지난 데이터는 삭제하도록 한다. spark 2.x가 되면서 1.x와의 차이점이 생겼다. 2.x가 되면서 과거 데이터를 명시적으로 삭제해야 한다. 전체적으로 window 연산의 기능을 세분화하고 조건들이 많이 달려 있으며 바뀐 개념이 처음에는 생소해서 이해하는데 시간이 오래 걸렸는데 보고 또 보고 다시 보면서 이해하고 나니 실시간 처리에 유용한 개념인 거 같다. 최근에 인터넷 방문 로그 분석을 위한 설루션을 잠시 써보고 있는데 이런 개념은 다양한 분야에서 사용되고 있었는데 나만 몰랐었나 보다. 어쨌든 이번에 개발하면서 최근 1분 데이터만 대상으로 aggregation 하는 게 목적이니 1분이 지난 데이터에 대해서는 삭제하도록 설정을 했다.
window($"current_timestamp", "1 minutes", "1 minutes")
spark stream의 2.x의 핵심이다. 1분마다 지난 1분 데이터를 대상으로 연산을 하겠다는 조건이다. 이글 상단 코드 옆에 주석을 참고하면 1분마다 지난 5분을 대상으로 하는 설정도 참고 할 수 있다. 지난 5분 데이터를 연산하려면 Watermark값을 5분 이상으로 해놔야 한다. 이 윈도우 연산을 통해 수집된 데이터의 시간 순서가 미리 들어오고 나중에 들어오는 데이터에 대해서도 모두 제 위치의 시간으로 계산을 할 수 있다. spark 문서를 보면서 꼭 이해를 해야 하는 부분이다. 개발하면서 데이터 출력 결과를 보면서 테스트해본다면 개념을 이해하고 실제 적용하는데 큰 어려움이 없을 것이다. 매번 계산마다 중복 데이터 없이 계산을 하기 위해서는 aggregation 처리를 4~5분 사이 데이터만을 처리하거나 변경되거나 추가된 값이 경우 결과를 쓸 때 업데이트하는 방법이 있을 것이고 1분마다 지난 1분, 2분마다 지난 2분처럼 새로운 데이터만을 연산하도록 조건을 걸어야 한다. stream이라는 특징이 있기에 데이터를 다룰 때 한번 더 고민해야 하는 부분이다.
.trigger(Trigger.ProcessingTime("60 seconds"))
트리거는 다음 연산을 언제할지를 정하게 된다. 이 기준이 가장 높은 기준이고 앞서 설명한 기준은 데이터를 연산할 때 시간 기준을 어떻게 잡을지에 대한 조건이다. 어느 타이밍에 미니 배치가 돌지는 트리거를 통해 설정하게 된다. spark stream의 트리거의 특징은 정각에 동작한다는 것이다 이글의 상단에 적은 주석을 부연 설명하자면 spark stream job을 실행하면 초기 작업 후 실제 동작하는데 30초 정도가 걸렸다. 10분 50초에 실행이 되었다면 실제로 트리거의 기준은 11분, 12분 13분이 될 것이다. 하지만 30초의 초기 작업이 있기 때문에 11분 20초에 job이 동작하고 11분 job이 돌았을 때 kafak에서 이전 offset부터 11분 20초까지의 데이터를 첫 번째로 가져올 것이고 12분에 11분 20초 offset부터 12분까지의 데이터를 가져오고 13분에 12분 offset부터 13분까지의 데이터를 가져오게 된다. 그래서 1분 기준 트리거인 경우 최초 1번 미니 배치 동안에 데이터는 정확한 1분 데이터가 아니다.