使用 Prometheus 监控 Flink

这篇文章介绍了如何利用 Apache Flink 的内置指标系统以及如何使用 Prometheus 来高效地监控流式应用程序。

为什么选择 Prometheus?

随着深入地了解 Prometheus,你会发现一些非常好的功能:

  • 服务发现使配置更加容易。Prometheus 支持 consul,etcd,kubernetes 以及各家公有云厂商自动发现。对于监控目标动态发现,这点特别契合 Cloud 时代,应用动态扩缩的特点。我们无法想象,在 Cloud 时代,需要运维不断更改配置。
  • 开源社区建立了数百个 exporter。基本上涵盖了所有基础设施和主流中间件。
  • 工具库可从您的应用程序获取自定义指标。基本上主流开发语言都有对应的工具库。
  • 它是 CNCF 旗下的 OSS,是继 Kubernetes 之后的第二个毕业项目。Kubernetes 已经与 Promethues 深度结合,并在其所有服务中公开了 Prometheus 指标。
  • Pushgateway,Alermanager 等组件,基本上涵盖了一个完整的监控生命周期。
  • Flink 官方已经提供了对接 Prometheus 的 jar 包,很方便就可以集成。由于本系列文章重点在 Flink on Kubernetes, 因此我们所有的操作都是基于这点展开。

部署 Prometheus

对 k8s 不熟悉的同学,可以查阅 k8s 相关文档。由于部署不是本博客的重点,所以我们直接贴出 yaml 文件:

|

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

|


    apiVersion: v1

    kind: ServiceAccount

    metadata:

      name: monitor

      namespace: kube-system

      labels:

        kubernetes.io/cluster-service: “true”

        addonmanager.kubernetes.io/mode: Reconcile


    apiVersion: rbac.authorization.k8s.io/v1

    kind: ClusterRole

    metadata:

      name: monitor

      labels:

        kubernetes.io/cluster-service: “true”

        addonmanager.kubernetes.io/mode: Reconcile

    rules:

      - apiGroups:

          - ""

        resources:

          - pods

        verbs:

          - get

          - list

          - watch


    apiVersion: rbac.authorization.k8s.io/v1

    kind: ClusterRoleBinding

    metadata:

      name: monitor

      labels:

        kubernetes.io/cluster-service: “true”

        addonmanager.kubernetes.io/mode: Reconcile

    roleRef:

      apiGroup: rbac.authorization.k8s.io

      kind: ClusterRole

      name: monitor

    subjects:

    - kind: ServiceAccount

      name: monitor

      namespace: kube-system


        apiVersion: v1

        kind: ConfigMap

        metadata:

          labels:

            app: monitor

          name: monitor

          namespace: kube-system

        data:

          prometheus.yml: |-

            global:

                scrape_interval: 10s

                evaluation_interval: 10s

            scrape_configs:

              - job_name: kubernetes-pods

                kubernetes_sd_configs:

                - role: pod

                relabel_configs:

                - action: keep

                  regex: true

                  source_labels:

                  - __meta_kubernetes_pod_annotation_prometheus_io_scrape

                - action: replace

                  regex: (.+)

                  source_labels:

                  - __meta_kubernetes_pod_annotation_prometheus_io_path

                  target_label: metrics_path

                - action: replace

                  regex: ([^:]+)(?::\d+)?;(\d+)

                  replacement: $1:$2

                  source_labels:

                  - address

                  - __meta_kubernetes_pod_annotation_prometheus_io_port

                  target_label: address

                - action: labelmap

                  regex: _meta_kubernetes_pod_label(.+)

                - action: replace

                  source_labels:

                  - __meta_kubernetes_namespace

                  target_label: kubernetes_namespace

                - action: replace

                  source_labels:

                  - __meta_kubernetes_pod_name

                  target_label: kubernetes_pod_name


        apiVersion: apps/v1

        kind: StatefulSet

        metadata:

          labels:

            app: monitor

          name: monitor

          namespace: kube-system

        spec:

          serviceName: monitor

          selector:

            matchLabels:

              app: monitor

          replicas: 1

          template:

            metadata:

              labels:

                app: monitor

            spec:

              containers:

              - args:

                - –config.file=/etc/prometheus/prometheus.yml

                - –storage.tsdb.path=/data/prometheus

                - –storage.tsdb.retention.time=10d

                image: prom/prometheus:v2.19.0

                imagePullPolicy: IfNotPresent

                name: prometheus

                ports:

                - containerPort: 9090

                  protocol: TCP

                readinessProbe:

                  httpGet:

                    path: /-/ready

                    port: 9090

                  initialDelaySeconds: 30

                  timeoutSeconds: 30

                livenessProbe:

                  httpGet:

                    path: /-/healthy

                    port: 9090

                  initialDelaySeconds: 30

                  timeoutSeconds: 30

                resources:

                  limits:

                    cpu: 1000m

                    memory: 2018Mi

                  requests:

                    cpu: 1000m

                    memory: 2018Mi

                volumeMounts:

                - mountPath: /etc/prometheus

                  name: config-volume

                - mountPath: /data

                  name: monitor-persistent-storage

              restartPolicy: Always

              priorityClassName: system-cluster-critical

              serviceAccountName: monitor

              initContainers:

                - name: “init-chown-data”

                  image: “busybox:latest”

                  imagePullPolicy: “IfNotPresent”

                  command: [“chown”, “-R”, “65534:65534”, “/data”]

                  volumeMounts:

                    - name: monitor-persistent-storage

                      mountPath: /data

                      subPath: ""

              volumes:

              - configMap:

                  defaultMode: 420

                  name: monitor

                name: config-volume

          volumeClaimTemplates:

            - metadata:

                name: monitor-persistent-storage

                namespace: kube-system

              spec:

                accessModes:

                  - ReadWriteOnce

                resources:

                  requests:

                    storage: 20Gi

                storageClassName: gp2


        apiVersion: v1

        kind: Service

        metadata:

          annotations:

            service.beta.kubernetes.io/aws-load-balancer-type: nlb

          labels:

            app: monitor

          name: monitor

          namespace: kube-system

        spec:

          ports:

          - name: http

            port: 9090

            protocol: TCP

            targetPort: 9090

          selector:

            app: monitor

          type: LoadBalancer

|

这里我们简单说下,由于我们想利用 Prometheus 的 Kubernetes 的服务发现的方式,所以需要 RBAC 授权,授权 prometheus 实例对集群中的 pod 有一些读取权限。

为什么我们要使用自动发现的方式那?

相比配置文件的方式,自动发现更加灵活。尤其是当你使用的是 flink on native kubernetes,整个 job manager 和 task manager 是根据作业的提交自动创建的,这种动态性,显然是配置文件无法满足的。

由于我们的集群在 eks 上,所以大家在使用其他云的时候,需要略做调整。

定制镜像

这里我们基本上使用上一篇文章介绍的 demo 上,增加监控相关,所以 Dockerfile 如下:

|

1

2

3

4

|

FROM flink

COPY /plugins/metrics-prometheus/flink-metrics-prometheus-1.11.0.jar /opt/flink/lib

RUN mkdir -p $FLINK_HOME/usrlib

COPY ./examples/streaming/WordCount.jar $FLINK_HOME/usrlib/my-flink-job.jar

|

Flink 的 Classpath 位于 /opt/flink/lib,所以插件的 jar 包需要放到该目录下。

作业提交

由于我们的 Pod 必须增加一定的标识,从而让 Prometheus 实例可以发现。所以提交命令稍作更改,如下:

|

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

|

./bin/flink run-application -p 8 -t kubernetes-application \

  -Dkubernetes.cluster-id=my-first-cluster \

  -Dtaskmanager.memory.process.size=2048m \

  -Dkubernetes.taskmanager.cpu=2 \

  -Dtaskmanager.numberOfTaskSlots=4 \

  -Dkubernetes.container.image=iyacontrol/flink-world-count:v0.0.2 \

  -Dkubernetes.container.image.pull-policy=Always \

  -Dkubernetes.namespace=stream \

  -Dkubernetes.jobmanager.service-account=flink \

  -Dkubernetes.rest-service.exposed.type=LoadBalancer \

  -Dkubernetes.rest-service.annotations=service.beta.kubernetes.io/aws-load-balancer-type:nlb,service.beta.kubernetes.io/aws-load-balancer-internal:true \

  -Dkubernetes.jobmanager.annotations=prometheus.io/scrape:true,prometheus.io/port:9249 \

  -Dkubernetes.taskmanager.annotations=prometheus.io/scrape:true,prometheus.io/port:9249 \

  -Dmetrics.reporters=prom \

  -Dmetrics.reporter.prom.class=org.apache.flink.metrics.prometheus.PrometheusReporter \

  local:///opt/flink/usrlib/my-flink-job.jar

|

  • 给 jobmanager 和 taskmanager 增加了 annotations
  • 增加了 metrcis 相关的配置,指定使用 prometheus reporter

关于 prometheus reporter:

参数:

  • port - 可选, Prometheus 导出器监听的端口,默认为 9249。为了能够在一台主机上运行报告程序的多个实例(例如,当一个 TaskManager 与 JobManager 并置时),建议使用这样的端口范围 9250-9260。

  • filterLabelValueCharacters - 可选, 指定是否过滤标签值字符。如果启用,则将删除所有不匹配 [a-zA-Z0-9:_] 的字符,否则将不删除任何字符。禁用此选项之前,请确保您的标签值符合 Prometheus 要求。

效果

提交任务后,我们看下实际效果。

首先查看 Prometheus 是否发现了我们的 Pod。

然后查看具体的 metrics,是否被准确抓取。

指标已经收集,后续大家就可以选择 grafana 绘图了。或是增加相应的报警规则。例如:

总结

当然除了 Prometheus 主动发现 Pod,然后定期抓取 metrcis 的方式,flink 也支持向 PushGateway 主动 push metrcis。

Flink 通过 Reporter 来向外部系统提供 metrcis。通过在 conf/flink-conf.yaml 中配置一个或多个 Reporter ,可以将 metrcis 公开给外部系统。这些 Reporter 在启动时将在每个作业和任务管理器上实例化。

所有 Reporter 都必须至少具有 class 或 factory.class 属性。可以 / 应该使用哪个属性取决于 Reporter 的实现。有关更多信息,请参见各个 Reporter 配置部分。一些 Reporter 允许指定报告间隔。

指定多个 Reporter 的示例配置:

|

1

2

3

4

5

6

7

8

9

|

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory

metrics.reporter.my_jmx_reporter.port: 9020-9040

metrics.reporter.my_jmx_reporter.scope.variables.excludes:job_id;task_attempt_num

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter

metrics.reporter.my_other_reporter.host: 192.168.1.1

metrics.reporter.my_other_reporter.port: 10000

|

启动 Flink 时,必须可以访问包含 reporter 的 jar。支持 factory.class 属性的 reporter 可以作为插件加载。否则,必须将 jar 放在 /lib 文件夹中。

你可以通过实现 org.apache.flink.metrics.reporter.MetricReporter 接口来编写自己的 Reporter。

如果 reporter 定期发送报告,则还必须实现 Scheduled 接口。通过额外实现 MetricReporterFactory,你的 reporter 也可以作为插件加载。