알고리즘: 데이터 조작 문제 모음

Data Manipulation — Level 1~5 (DS)

DS 트랙 데이터 조작 유형 문제 풀이 모음. 부서별 평균 급여, IQR 이상치 제거 + 이동평균, K-Way Merge + 이중 힙 러닝 메디안 + 이상 구간 탐지.

Code Test
Algorithm Test
저자

Kwangmin Kim

공개

2026년 03월 31일

1 문제 1: 부서별 평균 급여

1.1 문제 정보

항목 내용
출처 튜터 생성
난이도 Level 1
트랙 DS
유형 데이터 기초 조작 (Dict/Counter)

1.2 문제 설명

회사 직원 데이터가 (이름, 부서, 급여) 튜플의 리스트로 주어진다. 각 부서별 평균 급여를 계산하여, 평균 급여가 높은 순서로 정렬된 리스트를 반환한다. 같은 평균 급여인 부서가 있으면 부서명 오름차순으로 정렬한다. 평균 급여는 소수점 첫째 자리에서 반올림한 정수이다.

def solution(employees: list[tuple[str, str, int]]) -> list[tuple[str, int]]:
    pass

제한사항: 1 <= len(employees) <= 10,000, 급여는 양의 정수

입출력 예시:

employees = [
    ("Alice", "Engineering", 5000),
    ("Bob", "Engineering", 6000),
    ("Charlie", "Marketing", 4000),
    ("Diana", "Marketing", 4500),
    ("Eve", "HR", 3500),
]
# 결과: [("Engineering", 5500), ("Marketing", 4250), ("HR", 3500)]

1.3 최종 풀이

def solution(employees: list[tuple[str, str, int]]) -> list[tuple[str, int]]:
    dept_stats = {}

    for _, dept, salary in employees:
        if dept not in dept_stats:
            dept_stats[dept] = [0, 0]
        dept_stats[dept][0] += salary
        dept_stats[dept][1] += 1

    avg_salaries = []
    for dept, (total_salary, count) in dept_stats.items():
        avg = int(total_salary / count + 0.5)
        avg_salaries.append((dept, avg))

    avg_salaries.sort(key=lambda x: (-x[1], x[0]))

    return avg_salaries
  • 시간 복잡도: \(O(n + d \log d)\)\(n\) 은 직원 수, \(d\) 는 부서 수. 집계 \(O(n)\) + 정렬 \(O(d \log d)\)
  • 공간 복잡도: \(O(d)\) — 부서 수만큼 dict 저장

1.4 Tips

  • 핵심 패턴: dict로 그룹별 집계 \(\to\) 정렬. pandas의 groupby().agg().sort_values() 를 순수 Python으로 구현하는 기초 체력이다.
  • 반올림 주의: int(x + 0.5) 는 양수에서만 동작한다. 실무에서는 round() 가 안전하다 (banker’s rounding 주의).
  • Python 팁: defaultdict(lambda: [0, 0]) 을 쓰면 if not in 분기를 제거할 수 있다.

2 문제 2: 이상치 제거 후 그룹별 트렌드 분석

2.1 문제 정보

항목 내용
출처 튜터 생성
난이도 Level 3
트랙 DS
유형 시뮬레이션 + 통계 구현

2.2 문제 설명

센서 측정 데이터가 (sensor_id, timestamp, value) 튜플의 리스트로 주어진다. 다음 3단계를 수행한다:

  1. IQR 기반 이상치 제거 (센서별): Q1/Q3을 선형 보간법으로 계산하고, Q1 - 1.5*IQR ~ Q3 + 1.5*IQR 범위 밖의 값을 제거한다.
  2. 센서별 시간순 3-point 이동평균: 현재 값과 앞뒤 값의 평균. 처음/끝은 가능한 값만 사용한다.
  3. 결과 반환: {sensor_id: [이동평균값들]} 딕셔너리. 소수점 둘째 자리 반올림.

선형 보간법: 크기 \(n\) 인 정렬 배열에서 \(p\) 번째 백분위수의 위치는 \(idx = (n-1) \times p / 100\) 이다. 정수가 아니면 \(\lfloor idx \rfloor\)\(\lceil idx \rceil\) 값을 선형 보간한다.

def solution(data: list[tuple[str, int, float]]) -> dict[str, list[float]]:
    pass

입출력 예시:

data = [
    ("S1", 1, 10.0), ("S1", 2, 12.0), ("S1", 3, 11.0),
    ("S1", 4, 13.0), ("S1", 5, 100.0),  # 이상치
    ("S2", 1, 20.0), ("S2", 2, 22.0), ("S2", 3, 21.0),
]
# S1: 100.0 제거 후 이동평균 → [11.0, 11.0, 12.0, 12.0]
# S2: 이상치 없음 → [21.0, 21.0, 21.5]
# 결과: {"S1": [11.0, 11.0, 12.0, 12.0], "S2": [21.0, 21.0, 21.5]}

2.3 최종 풀이

def solution(data: list[tuple[str, int, float]]) -> dict[str, list[float]]:
    from collections import defaultdict
    import math

    sensor_groups = defaultdict(list)
    for s_id, ts, val in data:
        sensor_groups[s_id].append((ts, val))

    result = {}

    for s_id in sensor_groups:
        group = sensor_groups[s_id]
        values = sorted([v for _, v in group])
        n = len(values)

        # 1단계: IQR 기반 이상치 제거
        def get_percentile(p):
            idx = (n - 1) * p / 100
            f_idx = math.floor(idx)
            c_idx = math.ceil(idx)
            if f_idx == c_idx:
                return values[int(idx)]
            return values[f_idx] + (idx - f_idx) * (values[c_idx] - values[f_idx])

        q1 = get_percentile(25)
        q3 = get_percentile(75)
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr

        filtered = sorted(
            [item for item in group if lower_bound <= item[1] <= upper_bound]
        )
        v_filtered = [item[1] for item in filtered]
        m = len(v_filtered)

        if m == 0:
            result[s_id] = []
            continue

        # 2단계: 3-point 이동평균
        moving_avgs = []
        for i in range(m):
            if m == 1:
                avg = v_filtered[0]
            elif m == 2:
                avg = (v_filtered[0] + v_filtered[1]) / 2
            else:
                if i == 0:
                    avg = (v_filtered[0] + v_filtered[1]) / 2
                elif i == m - 1:
                    avg = (v_filtered[m - 2] + v_filtered[m - 1]) / 2
                else:
                    avg = (v_filtered[i - 1] + v_filtered[i] + v_filtered[i + 1]) / 3
            moving_avgs.append(round(float(avg), 2))

        result[s_id] = moving_avgs

    return result
  • 시간 복잡도: \(O(n \log n)\) — 정렬이 지배적
  • 공간 복잡도: \(O(n)\) — 센서별 데이터 저장

2.4 Tips

  • 핵심 패턴: 그룹화 \(\to\) 통계 필터 \(\to\) 시계열 변환의 3단계 파이프라인. pandas groupby \(\to\) clip/filter \(\to\) rolling 과 동일한 흐름이다.
  • IQR 선형 보간: (n-1) * p / 100 위치에서 floor/ceil 값을 보간한다. pandas describe() 가 내부적으로 수행하는 로직과 같다.
  • 이동평균 경계 처리: 첫/끝 원소는 사용 가능한 이웃만으로 평균을 구한다. m=1, m=2 특수 케이스를 분리한다.

3 문제 3: 외부 정렬 스트림에서 이상 구간 탐지

3.1 문제 정보

항목 내용
출처 튜터 생성
난이도 Level 5
트랙 DS
유형 K-Way Merge + Heap + Streaming Statistics

3.2 문제 설명

데이터가 메모리에 한꺼번에 올라가지 않아 K개의 정렬된 청크(chunk)로 나뉘어 있다. 이 청크들을 병합하면서 실시간으로 러닝 메디안을 계산하고, 메디안 대비 이상치가 연속으로 나타나는 이상 구간(anomaly window)을 탐지한다.

3단계 처리:

  1. K-Way Merge: K개 정렬 청크를 heapq 로 O(N log K) 병합
  2. 이중 힙 러닝 메디안: max-heap(부호 반전) + min-heap으로 매 시점 메디안 계산
  3. 이상 구간 탐지: \(|value - median| > threshold \times median\) 이면 이상치. 연속 이상치가 min_window 개 이상이면 이상 구간으로 기록
def solution(
    chunks: list[list[tuple[int, float]]],
    threshold: float,
    min_window: int,
) -> list[tuple[int, int, float]]:
    pass

입출력 예시:

chunks = [
    [(1, 10.0), (3, 12.0), (7, 11.0)],
    [(2, 11.0), (5, 50.0), (6, 55.0)],
    [(4, 13.0), (8, 12.0), (9, 10.0)],
]
threshold = 1.0
min_window = 2
# 병합: (1,10)(2,11)(3,12)(4,13)(5,50)(6,55)(7,11)(8,12)(9,10)
# ts=5,6에서 이상치 연속 → 결과: [(5, 6, 43.0)]

3.3 최종 풀이

import heapq

def solution(
    chunks: list[list[tuple[int, float]]],
    threshold: float,
    min_window: int,
) -> list[tuple[int, int, float]]:

    # 1단계: K-Way Merge
    # heap: (timestamp, value, chunk_idx, element_idx)
    merge_heap = []
    for i, chunk in enumerate(chunks):
        if chunk:
            ts, val = chunk[0]
            heapq.heappush(merge_heap, (ts, val, i, 0))

    # 2단계: 이중 힙 러닝 메디안
    left = []   # max-heap (부호 반전) --- 메디안 이하
    right = []  # min-heap --- 메디안 초과

    def add_number(num: float) -> None:
        heapq.heappush(left, -num)
        heapq.heappush(right, -heapq.heappop(left))
        if len(right) > len(left):
            heapq.heappush(left, -heapq.heappop(right))

    def get_median() -> float:
        if not left:
            return 0.0
        if len(left) > len(right):
            return float(-left[0])
        return (-left[0] + right[0]) / 2.0

    # 3단계: 병합 스트림 처리 + 이상 구간 탐지
    current_window = []  # [(timestamp, deviation), ...]
    results = []

    def flush_window():
        if len(current_window) >= min_window:
            start_ts = current_window[0][0]
            end_ts = current_window[-1][0]
            max_dev = max(d for _, d in current_window)
            results.append((start_ts, end_ts, max_dev))

    while merge_heap:
        ts, val, c_idx, e_idx = heapq.heappop(merge_heap)

        # K-Way Merge: 같은 청크의 다음 원소를 힙에 추가
        next_idx = e_idx + 1
        if next_idx < len(chunks[c_idx]):
            next_ts, next_val = chunks[c_idx][next_idx]
            heapq.heappush(merge_heap, (next_ts, next_val, c_idx, next_idx))

        # 현재 값을 먼저 추가한 뒤 메디안을 구한다
        add_number(val)
        m = get_median()

        # 이상치 판정
        deviation = abs(val - m)
        is_anomaly = (m != 0) and (deviation > threshold * m)

        # 상태 머신: 이상 구간 추적
        if is_anomaly:
            current_window.append((ts, deviation))
        else:
            flush_window()
            current_window = []

    # 스트림 종료 시 남은 구간 처리
    flush_window()

    return results
  • 시간 복잡도: \(O(N \log K + N \log N)\) — K-Way Merge \(O(N \log K)\) , 이중 힙 삽입 \(O(N \log N)\)
  • 공간 복잡도: \(O(N)\) — 양쪽 힙에 전체 원소 저장

3.4 Tips

  • K-Way Merge: heapq(ts, val, chunk_idx, elem_idx) 4-tuple을 저장하면 자연스럽게 timestamp 순 병합된다. 전체 합쳐서 정렬하면 \(O(N \log N)\) 이지만, K-Way Merge는 \(O(N \log K)\)\(K \ll N\) 일 때 효율적이다.
  • 이중 힙 러닝 메디안: left(max-heap, 부호 반전) + right(min-heap). add \(\to\) rebalance 패턴으로 len(left) >= len(right) 불변조건을 유지한다. Python은 min-heap만 지원하므로 부호 반전이 관용적 패턴이다.
  • 연산 순서가 핵심: add_number(val) \(\to\) get_median() \(\to\) 판정 순서가 올바르다. “현재 값 포함 메디안”과 “이전까지 메디안”은 결과가 완전히 달라진다.
  • 상태 머신 구간 탐지: current_window 에 연속 이상치를 쌓고, 정상이 오면 flush. 스트림 종료 시 잔여 flush를 잊으면 마지막 구간이 누락된다.

4 관련 문제

Subscribe

Enjoy this blog? Get notified of new posts by email: