使用 Prometheus 监控 Flink
这篇文章介绍了如何利用 Apache Flink 的内置指标系统以及如何使用 Prometheus 来高效地监控流式应用程序。
为什么选择 Prometheus?
随着深入地了解 Prometheus,你会发现一些非常好的功能:
- 服务发现使配置更加容易。Prometheus 支持 consul,etcd,kubernetes 以及各家公有云厂商自动发现。对于监控目标动态发现,这点特别契合 Cloud 时代,应用动态扩缩的特点。我们无法想象,在 Cloud 时代,需要运维不断更改配置。
- 开源社区建立了数百个 exporter。基本上涵盖了所有基础设施和主流中间件。
- 工具库可从您的应用程序获取自定义指标。基本上主流开发语言都有对应的工具库。
- 它是 CNCF 旗下的 OSS,是继 Kubernetes 之后的第二个毕业项目。Kubernetes 已经与 Promethues 深度结合,并在其所有服务中公开了 Prometheus 指标。
- Pushgateway,Alermanager 等组件,基本上涵盖了一个完整的监控生命周期。
- Flink 官方已经提供了对接 Prometheus 的 jar 包,很方便就可以集成。由于本系列文章重点在 Flink on Kubernetes, 因此我们所有的操作都是基于这点展开。
部署 Prometheus
对 k8s 不熟悉的同学,可以查阅 k8s 相关文档。由于部署不是本博客的重点,所以我们直接贴出 yaml 文件:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
|
apiVersion: v1
kind: ServiceAccount
metadata:
name: monitor
namespace: kube-system
labels:
kubernetes.io/cluster-service: “true”
addonmanager.kubernetes.io/mode: Reconcile
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: monitor
labels:
kubernetes.io/cluster-service: “true”
addonmanager.kubernetes.io/mode: Reconcile
rules:
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- watch
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: monitor
labels:
kubernetes.io/cluster-service: “true”
addonmanager.kubernetes.io/mode: Reconcile
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: monitor
subjects:
- kind: ServiceAccount
name: monitor
namespace: kube-system
apiVersion: v1
kind: ConfigMap
metadata:
labels:
app: monitor
name: monitor
namespace: kube-system
data:
prometheus.yml: |-
global:
scrape_interval: 10s
evaluation_interval: 10s
scrape_configs:
- job_name: kubernetes-pods
kubernetes_sd_configs:
- role: pod
relabel_configs:
- action: keep
regex: true
source_labels:
- __meta_kubernetes_pod_annotation_prometheus_io_scrape
- action: replace
regex: (.+)
source_labels:
- __meta_kubernetes_pod_annotation_prometheus_io_path
target_label: metrics_path
- action: replace
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
source_labels:
- address
- __meta_kubernetes_pod_annotation_prometheus_io_port
target_label: address
- action: labelmap
regex: _meta_kubernetes_pod_label(.+)
- action: replace
source_labels:
- __meta_kubernetes_namespace
target_label: kubernetes_namespace
- action: replace
source_labels:
- __meta_kubernetes_pod_name
target_label: kubernetes_pod_name
apiVersion: apps/v1
kind: StatefulSet
metadata:
labels:
app: monitor
name: monitor
namespace: kube-system
spec:
serviceName: monitor
selector:
matchLabels:
app: monitor
replicas: 1
template:
metadata:
labels:
app: monitor
spec:
containers:
- args:
- –config.file=/etc/prometheus/prometheus.yml
- –storage.tsdb.path=/data/prometheus
- –storage.tsdb.retention.time=10d
image: prom/prometheus:v2.19.0
imagePullPolicy: IfNotPresent
name: prometheus
ports:
- containerPort: 9090
protocol: TCP
readinessProbe:
httpGet:
path: /-/ready
port: 9090
initialDelaySeconds: 30
timeoutSeconds: 30
livenessProbe:
httpGet:
path: /-/healthy
port: 9090
initialDelaySeconds: 30
timeoutSeconds: 30
resources:
limits:
cpu: 1000m
memory: 2018Mi
requests:
cpu: 1000m
memory: 2018Mi
volumeMounts:
- mountPath: /etc/prometheus
name: config-volume
- mountPath: /data
name: monitor-persistent-storage
restartPolicy: Always
priorityClassName: system-cluster-critical
serviceAccountName: monitor
initContainers:
- name: “init-chown-data”
image: “busybox:latest”
imagePullPolicy: “IfNotPresent”
command: [“chown”, “-R”, “65534:65534”, “/data”]
volumeMounts:
- name: monitor-persistent-storage
mountPath: /data
subPath: ""
volumes:
- configMap:
defaultMode: 420
name: monitor
name: config-volume
volumeClaimTemplates:
- metadata:
name: monitor-persistent-storage
namespace: kube-system
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 20Gi
storageClassName: gp2
apiVersion: v1
kind: Service
metadata:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: nlb
labels:
app: monitor
name: monitor
namespace: kube-system
spec:
ports:
- name: http
port: 9090
protocol: TCP
targetPort: 9090
selector:
app: monitor
type: LoadBalancer
|
这里我们简单说下,由于我们想利用 Prometheus 的 Kubernetes 的服务发现的方式,所以需要 RBAC 授权,授权 prometheus 实例对集群中的 pod 有一些读取权限。
为什么我们要使用自动发现的方式那?
相比配置文件的方式,自动发现更加灵活。尤其是当你使用的是 flink on native kubernetes,整个 job manager 和 task manager 是根据作业的提交自动创建的,这种动态性,显然是配置文件无法满足的。
由于我们的集群在 eks 上,所以大家在使用其他云的时候,需要略做调整。
定制镜像
这里我们基本上使用上一篇文章介绍的 demo 上,增加监控相关,所以 Dockerfile 如下:
|
1
2
3
4
|
FROM flink
COPY /plugins/metrics-prometheus/flink-metrics-prometheus-1.11.0.jar /opt/flink/lib
RUN mkdir -p $FLINK_HOME/usrlib
COPY ./examples/streaming/WordCount.jar $FLINK_HOME/usrlib/my-flink-job.jar
|
Flink 的 Classpath 位于 /opt/flink/lib,所以插件的 jar 包需要放到该目录下。
作业提交
由于我们的 Pod 必须增加一定的标识,从而让 Prometheus 实例可以发现。所以提交命令稍作更改,如下:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
./bin/flink run-application -p 8 -t kubernetes-application \
-Dkubernetes.cluster-id=my-first-cluster \
-Dtaskmanager.memory.process.size=2048m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image=iyacontrol/flink-world-count:v0.0.2 \
-Dkubernetes.container.image.pull-policy=Always \
-Dkubernetes.namespace=stream \
-Dkubernetes.jobmanager.service-account=flink \
-Dkubernetes.rest-service.exposed.type=LoadBalancer \
-Dkubernetes.rest-service.annotations=service.beta.kubernetes.io/aws-load-balancer-type:nlb,service.beta.kubernetes.io/aws-load-balancer-internal:true \
-Dkubernetes.jobmanager.annotations=prometheus.io/scrape:true,prometheus.io/port:9249 \
-Dkubernetes.taskmanager.annotations=prometheus.io/scrape:true,prometheus.io/port:9249 \
-Dmetrics.reporters=prom \
-Dmetrics.reporter.prom.class=org.apache.flink.metrics.prometheus.PrometheusReporter \
local:///opt/flink/usrlib/my-flink-job.jar
|
- 给 jobmanager 和 taskmanager 增加了 annotations
- 增加了 metrcis 相关的配置,指定使用 prometheus reporter
关于 prometheus reporter:
参数:
-
port - 可选, Prometheus 导出器监听的端口,默认为 9249。为了能够在一台主机上运行报告程序的多个实例(例如,当一个 TaskManager 与 JobManager 并置时),建议使用这样的端口范围 9250-9260。
-
filterLabelValueCharacters - 可选, 指定是否过滤标签值字符。如果启用,则将删除所有不匹配 [a-zA-Z0-9:_] 的字符,否则将不删除任何字符。禁用此选项之前,请确保您的标签值符合 Prometheus 要求。
效果
提交任务后,我们看下实际效果。
首先查看 Prometheus 是否发现了我们的 Pod。
然后查看具体的 metrics,是否被准确抓取。
指标已经收集,后续大家就可以选择 grafana 绘图了。或是增加相应的报警规则。例如:
总结
当然除了 Prometheus 主动发现 Pod,然后定期抓取 metrcis 的方式,flink 也支持向 PushGateway 主动 push metrcis。
Flink 通过 Reporter 来向外部系统提供 metrcis。通过在 conf/flink-conf.yaml 中配置一个或多个 Reporter ,可以将 metrcis 公开给外部系统。这些 Reporter 在启动时将在每个作业和任务管理器上实例化。
所有 Reporter 都必须至少具有 class 或 factory.class 属性。可以 / 应该使用哪个属性取决于 Reporter 的实现。有关更多信息,请参见各个 Reporter 配置部分。一些 Reporter 允许指定报告间隔。
指定多个 Reporter 的示例配置:
|
1
2
3
4
5
6
7
8
9
|
metrics.reporters: my_jmx_reporter,my_other_reporter
metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_jmx_reporter.scope.variables.excludes:job_id;task_attempt_num
metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000
|
启动 Flink 时,必须可以访问包含 reporter 的 jar。支持 factory.class 属性的 reporter 可以作为插件加载。否则,必须将 jar 放在 /lib 文件夹中。
你可以通过实现 org.apache.flink.metrics.reporter.MetricReporter 接口来编写自己的 Reporter。
如果 reporter 定期发送报告,则还必须实现 Scheduled 接口。通过额外实现 MetricReporterFactory,你的 reporter 也可以作为插件加载。