티스토리 뷰
서문
이전 글에서 fluentd를 도입하게 된 발단부터 설계까지 알아보았다.
이번 글에서는 설계에 나온 부분 중, log forwarder 부분을 도입하는 부분에 대한 과정을 기록하였다.
forwarder는 서버내에서 생성되는 로그 및 통계 데이터를 수집하여 1차 가공하고, log aggregator로 전송하는 역할을 한다.
본문
fluentd config
forwarder를 구현하기 위하여, fluentd config 파일에 관한 간단한 설명을 먼저 진행한다.
fluentd는 config 파일을 수정하여 동작을 제어한다. (nginx와 유사)
config 파일은 plugin들의 집합으로 이루어져 있으며, 여기서 plugin은 크게 Input, Output 두가지 종류로 구분되고,
이를 보조하는 Filter, Parser, Formatter 등 다양한 종류의 필터가 존재한다.
Plugin으로 config 파일을 구성하는 가장 기초적인 방법은 3단계로 구분 할 수 있다.
- <source> 태그에, input 플러그인을 사용하여 로그, 통계와 같은 데이터를 입력받는다
- <filter> 태그에, filter 플러그인을 사용하여 데이터를 가공한다
- <match> 태그에, output 플러그인으로 데이터를 전송한다
9880 포트로 http 요청을 받아, 콘솔로 출력하며 미리 지정해둔 서버로 30초 또는 10mb 마다 데이터를 전송하는 fluentd config 파일의 예시 코드를 아래에 작성하고 설명하겠다.
예시 코드 (fluentd.conf)
<source>
@type http
port 9880
</source>
<match **>
@type copy
<store>
@type stdout
</store>
<store>
@type forward
<server>
host ${mainUrl}
port 24224
</server>
<server>
host ${subUrl}
port 24224
standby
</server>
<buffer>
flush_interval 30s
total_limit_size 10m
</buffer>
</store>
</match>
데이터 input을 처리하는 <source> 태그 부분에 @type으로 input 필터 중 http 필터를 사용하였다.
9880 포트로 오는 http 요청은 모두 입력받게 된다.
데이터 output을 처리하는 <match> 태그 부분에 @type으로 copy 필터를 사용하였다.
copy 필터는 match 내의 store 태그 전부에 데이터를 복사하여 뿌려주기 때문에, 데이터를 단일 output이 아닌 여러 output filter를 이용하여 처리할 수 있게된다.
<store> 태그는 copy로 뿌린 데이터를 받아 실제 output 필터를 사용하는 태그다.
stdout 필터는 콘솔에 데이터를 출력하는 역할, forward 필터는 다른 서버로 데이터를 포워딩하는 역할이고 기본적으로 tcp 통신(nginx 설정 시 주의)이다.
forward 필터 내의 <server> 태그는 포워딩할 서버 정보를 입력하고, standby 값이 입력되면 위에 선언된 <server>태그의 서버에 포워딩이 실패하는 경우를 대비한 예비 서버로 설정된다.
<buffer>태그는 데이터를 전송하기 전 버퍼에 얼마간 캐시되는지 결정하는 태그로, flush_interval 값은 시간, total_limit_size 값은 용량을 담당한다.
추가로 match 태그의 ** 부분을 설명하면, source로 받은 데이터의 태그를 필터링하는 정규식 부분인데, **은 모든 태그를 받는다.
source로 받은 데이터의 태그를 http 요청으로 예시로 들면,
fluentd의 주소가 https://example.com:9880 인 경우에 https://example.com:9880/counter.usage 로 데이터가 오게된다면 uri 부분인 counter.usage를 데이터의 태그로 갖게된다.
<source>
@type http
port 9880
</source>
<match counter.*>
. . .
</match>
<match **>
. . .
</match>
예를들어, 위 코드에 https://example.com:9880/counter.usage 와 같은 요청을 보낸다면, counter.*에서 데이터가 처리되어 아래 match ** 에는 데이터가 도달하지 못한다.
정리하면,
- <source> 태그에서 http 필터를 사용하여 9880 포트, http 통신으로 데이터를 수집한다.
- 이때, 데이터의 태그는 uri를 이용한다.
- <match> 태그에서 ** 정규식으로 데이터의 태그 상관 없이 모든 데이터를 캐치한다.
- copy 필터로 데이터를 복사하여 <store> 태그들에게 전달한다.
- 첫번째 <store> 태그에서 stdout 필터를 사용하여 콘솔에 데이터를 출력한다.
- 두번째 <store> 태그에서 forward 필터로 데이터를 포워딩한다.
- 포워딩하는 서버는 <server> 태그에 선언한다.
- <buffer> 태그로 포워딩하는 간격을 조절한다.
설명한 코드 이외에 config 파일을 작성하는 더 많은 정보는 https://docs.fluentd.org/ 에서 확인할 수 있다.
forwarder
fluentd config 작성법에 대해 간략히 알아보았으니, 이제 직접 forwarder에 사용할 config 파일을 작성해보았다.
forwarder config
<source>
@type http
port 9880
</source>
<filter counter.*>
@type split_array
split_key data
</filter>
<match counter.*>
@type rewrite_tag_filter
<rule>
key key
pattern /(.+)/
tag $1.counter
</rule>
</match>
<match **>
@type copy
<store>
@type stdout
</store>
<store>
@type forward
<server>
host ${url}
port ${port}
</server>
<server>
host ${urlsub}
port ${portsub}
standby
</server>
<buffer>
flush_interval 30s
total_limit_size 10m
</buffer>
</store>
</match>
<source>와 <match> 태그 부분은 위에 설명한 코드와 거의 동일하기 때문에 추가 설명은 하지않는다.
<filter> 태그에 split_array, 새로운 <match> 태그에 rewrite_tag_filter가 추가되어 이에 대한 설명을 한다.
<filter> 태그는 정규식에 잡힌 데이터를 필터링하고 다시 다음 라우터로 보내는 역할을 한다.
<filter> 태그에 사용된 split_array 필터는 공식 필터가 아닌 커스텀 필터로, array 또는 json 내의 array 값을 각각의 노드로 분리하여 다음 라우터에 넘기는 역할을 한다.
split_array 필터를 설명하기 전에 먼저 forwarder가 받아서 처리해야 될 데이터에 대해 설명한다.
데이터
forwarder에서 입력될 raw 데이터는 json 형식으로, 통계 데이터를 배열로 갖고있다.
통계 데이터는 각각 다른 DB 테이블로 입력되야하기에, 배열을 다시 각각의 json으로 분리하고 태그를 붙여주어 <match>태그에서 잡을 수 있게 해야한다.
데이터 가공 과정:
- forwarder는 1개의 json을 입력받는다.
- <filter>태그를 이용해 json에서 배열요소를 찾고 배열의 길이만큼 n개의 json으로 분리한다.
- <filter>태그 다음의 <match>태그들에게 n개의 json을 뿌린다.
raw json 예시:
{
"customer":"test",
"data":[
{
"key":"usage",
"id":"12345-12345"
},{
"key":"call",
"date":"2022-09-29"
}
]
}
변환된 json 예시:
{
"key":"usage",
"id":"12345-12345",
"customer":"test"
}
{
"key":"call",
"date":"2022-09-29",
"customer":"test"
}
이 변환 과정에서 배열을 json으로 분리하고 기존 json 내 다른 key/value를 새로운 json에 복사 해 주는 역할을 하는 필터가 split_array 필터이다.
split_array filter
Github: https://github.com/SNakano/fluent-plugin-split-array
GitHub - SNakano/fluent-plugin-split-array: Fluentd filter plugin to split array
Fluentd filter plugin to split array. Contribute to SNakano/fluent-plugin-split-array development by creating an account on GitHub.
github.com
split_array 필터는 <filter>태그에서 붙잡은 데이터 중 배열을 각각 json으로 분리하고, 분리된 json들을 따로 따로 다시 라우터에 뿌려주는 역할을 한다.
위에 설명한 바와 같이, 공식 필터가 아닌 커스텀 필터로 사용하려면 설치를 해야한다.
Readme를 보면 알 수 있겠지만, 필터로 들어온 데이터가 배열인 경우 각 값들을 여러 데이터로 분리해주거나, Json 데이터 내 배열의 키값을 split_key 로 입력해주면 그 배열을 여러 데이터로 분리해 주는 역할을 한다.
자세한 사용법은 github를 참조하기 바란다.
그런데 위에 데이터 파트에서 설명한 데이터는 기존 json에서 배열을 여러 데이터로 분리하는 것 뿐 아니고, 분리된 데이터에 기존 데이터의 배열을 제외한 나머지 값들이 똑같이 복사되어야 한다.
기존 split_array 필터는 이 기능이 구현되어있지 않고 단순히 배열을 여러 데이터로 분리하는 역할만 하고 있어서, 직접 커스텀을 진행했다.
custom split array filter : https://github.com/SehyeonGil/fluent-plugin-split-array
GitHub - SehyeonGil/fluent-plugin-split-array: Fluentd filter plugin to split array
Fluentd filter plugin to split array. Contribute to SehyeonGil/fluent-plugin-split-array development by creating an account on GitHub.
github.com
기존 split_array필터를 fork로 가져와 커스텀했고, ruby는 처음 해봐서 신기한점이 있었다.
커스텀한 코드부분을 설명하자면, 기존 코드의 배열 분리 부분에 raw data의 배열이 아닌 나머지 key/value를 새로운 json에 복사하는 부분만 추가하였다.
기존 코드 :
def filter_stream(tag, es)
new_es = Fluent::MultiEventStream.new
es.each {|time, record|
target_record = @split_key.nil? ? record : record[@split_key] || {}
split(time, target_record, new_es)
}
new_es
end
private
def split(time, record, new_es)
if record.instance_of?(Array)
record.each { |r| new_es.add(time, r) }
else
new_es.add(time, record)
end
end
커스텀 코드 :
def filter_stream(tag, es)
new_es = Fluent::MultiEventStream.new
es.each {|time, record|
target_record = @split_key.nil? ? record : record[@split_key] || {}
distributed_record = {}
if !@split_key.nil?
record.each_pair do |k, v|
if k == @split_key
next
end
distributed_record[k] = v
end
end
split(time, target_record, new_es, distributed_record)
}
new_es
end
private
def split(time, record, new_es, distributed_record)
if record.instance_of?(Array)
distributed_record.each_pair do |k, v|
record.each { |r| r[k] = v}
end
record.each { |r| new_es.add(time, r) }
else
distributed_record.each_pair do |k, v|
record[k] = v
end
new_es.add(time, record)
end
end
두 코드를 비교해보면, filter_stream 함수에서 split 함수를 실행하기 전에 새로운 코드가 추가된 걸 알 수 있다.
새롭게 분리되어 생성될 json을 생성하는 과정으로, split_key와 다른 key값의 key/value를 distributed_record에 저장하는 과정이다.
split 함수에는 새로운 파라미터로 distributed_record가 넘어간다.
split 함수도 기존과 비교해보면 distributed_record를 새로 생성되는 record에 복사하는 과정이 추가되었다.
이렇게 split_array로 raw data가 분리되어 라우트에 들어가면 바로 만나게 되는 다음 필터는 rewrite_tag_filter로 기존 태그를 변경하여 새로운 태그를 붙여 라우트에 보내주는 필터이다.
rewrite_tag_filter
<match counter.*>
@type rewrite_tag_filter
<rule>
key key
pattern /(.+)/
tag $1.counter
</rule>
</match>
분리된 데이터들이 제일 처음 만나게되는 <match>태그다.
rewrite_tag_filter를 사용하는데, 이 필터는 기존에 설정된 태그를 원하는 태그로 붙여, 다시 다음 라우트에 넘기는 역할을 수행한다. (ex : counter.usage -> usage.counter)
<match>태그 정규식을 보면 알수있듯이, counter.로 시작하는 모든 태그들 데이터를 붙잡고, <rule> 태그에 설정된 key값 (위 코드에는 key값이 key로 설정)을 데이터에 찾아 그 key 값으로 태그를 변경한다.
위 코드는 데이터의 key값의 value로 태그를 변경하고 있고,
counter.test 태그가 붙은
{
"key":"usage"
. . .
}
라는 데이터가 존재하면
counter.usage 태그로 변경하여 다음 라우터에 넘긴다고 생각하면 된다.
정리
설명한 내용들을 정리하면,
- forwarder config 작성
- 9880 포트 http 요청으로 데이터 수집
- split_array 필터로 array 데이터를 여러 데이터로 분리
- 커스텀된 split_array 필터 사용하여, 분리된 데이터에는 기존 데이터 중 array 부분을 제외한 데이터 포함
- rewrite_tag_filter 필터로 분리된 데이터들을 받아, 새로운 태그를 붙임
- match 태그에서 모든 데이터들을 수집하여 콘솔에 출력하고, aggregator 서버로 전송
6가지 정도가 된다.
마무리
fluentd에서 forwarder를 구현을 위해 config 작성법에 대해 기록해보았다.
다음 글에서는 도커 컨테이너로 forwarder를 구동하기 위하여 dockerfile 작성법에 대해 기록한다.
아마 다음 글은 많이 짧은 글이 될 것이다.
'web > backEnd' 카테고리의 다른 글
fluentd 도입기 - log aggregator (0) | 2022.10.05 |
---|---|
fluentd 도입기 - log forwarder [2] (dockerfile) (0) | 2022.10.04 |
fluentd 도입기 - 소개 및 도입 결정 (부제 : 듀랑고?) (0) | 2022.09.28 |
fluentd 도입기 - 발단 및 mysql bulk insert 성능 테스트 (0) | 2022.09.27 |
도커에서 default-libmysqlclient-dev 설치 불가 이슈 (0) | 2022.09.23 |
- Total
- Today
- Yesterday
- reverse proxy
- fluent-plugin-mysql
- mms 연동
- rewrite_tag_filter
- popbill
- nginx api cache
- fluentd-plugin-split-array
- log forwarder
- fluent-plugin-s3
- log aggregator
- 대규모 시스템 설계 기초
- rewrite-tag-filter
- uuid v1
- split_array
- dockerfile
- 뉴스피드 시스템
- bigint to number
- forwarder
- tojson
- libpaper-utils
- 혼자 공부하는 컴퓨터구조 + 운영체제
- nestjs
- log
- reverse proxy cache
- fluentd
- default-libmysqlclient-dev
- nginx cache
- mms
- uuid 중복
- 팝빌
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |