使用Filebeat和Logstash集中归档游戏日志

背景说明

由于游戏项目日志目前不够规范,不太容易根据字段结构化数据,开发又有实时查看生产和测试环境服务运行日志需求;如果写入ES通过Kibana查看,对于非分析类查看还是不太友好,当然也可以通过LogTrail插件

方 案

  • Filebeat->Logstash->Files
  • Filebeat->Redis->Logstash->Files
  • Nxlog(Rsyslog、Logstash)->Kafka->Flink(Logstash->ES-Kibana)
  • 其他方案(可根据自己需求,选择合适的架构,作者选择了第二种方案)

注释: 由于Logstash无法处理输出到文件乱序的问题,可通过不同的文件使用不同的Logstash,单线程归档;或者直接写入ES(基于@timestamp)、通过Flink输出到文件

部 署

系统环境
  • Debian8 x64
  • logstash-6.1.1
  • filebeat-6.1.1-amd64
  • Redis-3.2
Filebeat配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/etc/filebeat/filebeat.yml
filebeat.prospectors:
- type: log
paths:
- /home/data/log/*
- /home/data/*.log
scan_frequency: 20s
encoding: utf-8
tail_files: true
harvester_buffer_size: 5485760
fields:
ip_address: 192.168.2.2
env: qa
output.redis:
hosts: ["192.168.1.1:6379"]
password: "geekwolf"
key: "filebeat"
db: 0
timeout: 5
max_retires: 3
worker: 2
bulk_max_size: 4096
Logstash配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
input {
#Filebeat
# beats {
# port => 5044
# }
#Redis
redis {
batch_count => 4096
data_type => "list"
key => "filebeat"
host => "127.0.0.1"
port => 5044
password => "geekwolf"
db => 0
threads => 2
}
}
filter {
ruby {
code => 'event.set("filename",event.get("source").split("/")[-1])'
}
}
output {
if [filename] =~ "nohup" {
file {
path => "/data/logs/%{[fields][env]}/%{+YYYY-MM-dd}/%{[fields][ip_address]}/%{filename}"
flush_interval => 3
codec => line { format => "%{message}"}
}
} else {
file {
path => "/data/logs/%{[fields][env]}/%{+YYYY-MM-dd}/%{[fields][ip_address]}/logs/%{filename}"
flush_interval => 3
codec => line { format => "%{message}"}
}
}
#stdout { codec => rubydebug }
}

生产日志目录

1
2
3
4
5
6
7
8
9
10
11
12
.
├── prod
│   └── 2018-01-13
│   └── 2.2.2.2
│   ├── logs
│   │   ├── rpg_slow_db_.27075
│   └── nohup_service.log
└── qa
├── 2018-01-12
│   ├── 192.168.3.1
└── 2018-01-13
├── 192.168.3.2

总结

笔者在测试Logstash单线程输出时,依然产生乱序问题(有知晓的可以留言),最终选择通过自己开发的daemon程序实现,参考Plogstash:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# -*- coding: utf-8 -*-
# @Author: Geekwolf
# @Date: 2018-01-29 14:23:04
# @Last Modified by: Geekwolf
# @Last Modified time: 2018-01-31 10:55:01
#!/usr/bin/env python3
# daemon.py
import os
import sys
import time
import redis
import json
import re
import atexit
import signal
# import collections
class Base(object):
def __init__(self, *args, **kwargs):
self.pidfile = '/var/run/plogstash.pid'
self.service_name = 'Plogstash'
self.path = '/var/log/plogstash'
os.makedirs(self.path, exist_ok=True)
self.logfile = '%s/%s.log' % (self.path, self.service_name)
self.redis_host = '127.0.0.1'
self.redis_password = 'geekwolf'
self.redis_port = 5044
self.redis_db = 0
self.redis_key = 'filebeat'
self.batch_size = 5000
self.expires = 5 # second
self.archive_time = 1 # how long time to archive
self.base_dir = '/data/logs'
# self._tmp = '/tmp/.%s' % self.service_name
class Daemon(Base):
def __init__(self, *args, **kwargs):
super(Daemon, self).__init__(*args, **kwargs)
def daemonize(self):
# First fork (detaches from parent)
try:
if os.fork() > 0:
raise SystemExit(0) # Parent exit
except OSError as e:
raise RuntimeError('fork #1 failed.')
os.chdir('/')
# set this will 777
# os.umask(0)
os.setsid()
# Second fork (relinquish session leadership)
try:
if os.fork() > 0:
raise SystemExit(0)
except OSError as e:
raise RuntimeError('fork #2 failed.')
# Flush I/O buffers
sys.stdout.flush()
sys.stderr.flush()
# Replace file descriptors for stdin, stdout, and stderr
with open(self.logfile, 'ab', 0) as f:
os.dup2(f.fileno(), sys.stdout.fileno())
with open(self.logfile, 'ab', 0) as f:
os.dup2(f.fileno(), sys.stderr.fileno())
with open(self.logfile, 'rb', 0) as f:
os.dup2(f.fileno(), sys.stdin.fileno())
# Write the PID file
print(os.getpid())
with open(self.pidfile, 'w') as f:
print(os.getpid(), file=f)
# Arrange to have the PID file removed on exit/signal
atexit.register(lambda: os.remove(self.pidfile))
# Signal handler for termination (required)
def sigterm_handler(signo, frame):
raise SystemExit(1)
signal.signal(signal.SIGTERM, sigterm_handler)
def get_now_date(self):
return time.strftime('%Y-%m-%d', time.localtime(time.time()))
def get_now_timestamp(self):
return time.time()
def get_now_time(self):
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
def logging(self, msg):
with open(self.logfile) as f:
print('%s %s' % (self.get_now_time(), msg))
def append_log(self):
pass
def start(self):
if os.path.exists(self.pidfile):
raise RuntimeError('Already running')
else:
try:
self.daemonize()
self.append_log()
self.status()
except RuntimeError as e:
print(e, file=sys.stderr)
raise SystemExit(1)
def stop(self):
# f = os.open(self.pipe_path, os.O_RDONLY | os.O_NONBLOCK)
# ret = os.read(f, 1024).decode('utf-8')
# print(ret.split('\n'))
# os.close(f)
if os.path.exists(self.pidfile):
# with open(self._tmp) as f:
# _data = f.read()
# if _data is not None and len(eval(_data)) > 0:
# for k, v in eval(_data).items():
# v = v['fd'].rstrip('\n')
# v.close()
with open(self.pidfile) as f:
os.kill(int(f.read()), signal.SIGTERM)
print('Plogstash is stopped')
else:
print('Not running', file=sys.stderr)
raise SystemExit(1)
def restart(self):
self.stop()
self.start()
def status(self):
try:
with open(self.pidfile, 'r') as f:
pid = int(f.read().strip())
except:
pid = None
if pid:
print('%s is running as pid:%s' % (self.service_name, pid))
else:
print('%s is not running' % self.service_name)
class Worker(Daemon):
def __init__(self, *args, **kwargs):
super(Worker, self).__init__(self, *args, **kwargs)
def _redis(self):
pool = redis.ConnectionPool(host=self.redis_host, password=self.redis_password, port=self.redis_port, db=self.redis_db, socket_timeout=10000)
rc = redis.StrictRedis(connection_pool=pool)
return rc
def get_redis_data(self):
_data = self._redis().lrange(self.redis_key, 0, self.batch_size - 1)
# 删除数据(可考虑处理完再删除)
return _data
def del_redis_data(self):
_data = self._redis().ltrim(self.redis_key, self.batch_size, -1)
def append_log(self):
file_meta = {}
# file_handler = collections.defaultdict(dict)
# try:
# os.mkfifo(self.pipe_path)
# except Exception as e:
# print(str(e))
# pipe_ins = os.open(self.pipe_path, os.O_SYNC | os.O_CREAT | os.O_RDWR)
while True:
time.sleep(self.archive_time)
_data = self.get_redis_data()
if _data:
for _d in _data:
try:
_d = json.loads(_d.decode('utf-8'))
_path = '%s/%s/%s/%s' % (self.base_dir, _d['fields']['env'], self.get_now_date(), _d['fields']['ip_address'])
os.makedirs(_path + '/logs', exist_ok=True)
file_name = _d['source'].split('/')[-1]
# _path = '%s/%s/%s/%s' % (self.base_dir, _d['fields']['env'],self.get_now_date(), _d['fields']['ip_address'])
if re.match('nohup', file_name):
file_path = '%s/%s' % (_path, file_name)
else:
file_path = '%s/logs/%s' % (_path, file_name)
with open(file_path, 'a') as f:
f.write(_d['message'] + '\n')
# if 'fd' not in file_handler[file_path]:
# f = open(file_path, 'a', buffering=1024000)
# file_handler[file_path]['fd'] = str(f)
# file_handler[file_path]['time'] = self.get_now_timestamp()
except Exception as e:
self.logging(str(e))
self.del_redis_data()
# with open(self._tmp, 'w') as f:
# f.write(json.dumps(file_handler))
if __name__ == '__main__':
if len(sys.argv) != 2:
print('Usage: {} [start|stop|restart|status]'.format(sys.argv[0]), file=sys.stderr)
raise SystemExit(1)
daemon = Worker()
if sys.argv[1] == 'start':
daemon.start()
elif sys.argv[1] == 'stop':
daemon.stop()
elif sys.argv[1] == 'restart':
print("Restart ...")
daemon.restart()
elif sys.argv[1] == 'status':
daemon.status()
else:
print('Unknown command {!r}'.format(sys.argv[1]), file=sys.stderr)
raise SystemExit(1)

坚持原创分享,您的支持将鼓励我继续创作