티스토리 뷰

발생일: 2017.01.09

키워드: Readline, WriteStream, ReadStream, highWaterMark

문제:
한 행에 JSON 포맷의 데이터를 포함한 파일을 CSV 파일로 변환하려고 한다.
파일 사이즈가 커서 readline을 사용해 스트림으로 읽어 변환 후 다시 스트림으로 쓸 생각이다.

WriteStream에 'drain' 이벤트가 있던데, 정확히 어떤 이벤트일까?


해결책:

highWaterMark(내부 버퍼에 저장할 최대 바이트)에 지정한 값보다 큰 버퍼를 쓰려고 할 땐, write()가 false를 리턴한다.

스트림이 빠져나가는 동안 write()를 호출하면, 청크가 버퍼링되고 false가 리턴된다.
현재 버퍼링된 모든 청크가 빠져나가면 'drain' 이벤트가 발생한다.

write()가 false를 리턴했을 땐, drain 이벤트가 발생하기 전까지 write()를 호출하지 않는 것이 좋다.
스트림이 빠져나가지 않은 상태에서 write()가 계속 발생되면, node.js는 최대 메모리 에러가 발생할 때까지 쓰려는 데이터를 모두 버퍼링하기 때문이다.

이렇게 되면 불필요하게 GC가 동작할 수 있고, 심지어 메모리가 다시 반환되지 않는 문제가 발생할 가능성도 있다고 한다.


아래는 변환하는 코드의 일부이다.


function convertToCsvFile(options) {
    const deferred = Q.defer();
    const inputFilePath = options.inputFilePath;
    const outputFilePath = options.outputFilePath || (inputFilePath + '.csv');
    const headers = options.headers;
    const keyNames = options.keyNames || [];
    const filter = options.filter;
    const transform = options.transform;

    if (!inputFilePath) {
        return Q.rejecct('inputFilePath는 필수 값입니다');
    }

    const ws = fs.createWriteStream(outputFilePath, {
        flags: 'w',
        defaultEncoding: 'utf8'
    });

    ws.on('open', () => {
        if (headers) {
            ws.write(headers.join(',') + '\n');
        }

        const rl = readline.createInterface({
            input: fs.createReadStream(inputFilePath)
        });

        rl.on('line', (line) => {
            let data = JSON.parse(line);

            if (!_.isFunction(filter) || filter(data)) { // 필터가 정의되어 있다면 사용한다.
                if (_.isFunction(transform)) {
                    data = transform(data);
                }
                _writeToStream(ws, data, keyNames);
            }
        });

        rl.on('close', () => {
            ws.end();
        });
    });

    ws.on('finish', () => {
        ws.close();
    });

    ws.on('close', () => {
        deferred.resolve(outputFilePath);
    });

    return deferred.promise;
}

function _writeToStream(ws, data, keyNames) {
    const values = [];
    let shouldDrain = false;

    _.each(keyNames, (keyName) => {
        let value;

        try {
            value = _getValueByKeyName(data, keyName);
        } catch (e) {
            console.error('값을 가져오는데 실패했습니다');
            console.log('Data:', data);
            console.log('Key Name:', keyName);
            throw e;
        }

        values.push(value);
    });

    shouldDrain = ws.write(values.join(',') + '\n') === false;

    if (shouldDrain) {
        ws.once('drain', () => {
            _writeToStream(ws, data, keyNames);
        });
    }
}



참고:


반응형
댓글
공지사항