Spark借助Alluxio保障在k8s中的数据本地性

    技术2023-06-24  99

    Spark借助Alluxio保障在k8s中的数据本地性

    继续接着之前的文章谈一谈,Spark在k8s中的数据本地性。上一篇文章详见Spark在k8s中的数据本地性,这里开始实际操作。

    主要参考官网文档,下面主要记录几个需要注意的地方:

    开启短读【short-cuit】特性【默认开启】,短读特性是指client在读取数据时,如果数据在worker本地节点存在的话,client会直接读取数据而不是通过worker读取数据,对性能有较好的提升。

    # 用户自定义的config.yaml中,在worker定义部分设置 properties: alluxio.worker.data.server.domain.socket.address=/opt/domain alluxio.worker.data.server.domain.socket.as.uuid=true domainHostPath: "/tmp/alluxio-domain" worker: shortCircuit: enabled: true shortCiruitPolicy: "uuid" domainHostPath: "/tmp/alluxio-domain"

    设置Alluxio Worker和Alluxio fuse所在的pod宿主机使用相同的ip

    # 在worker定义部分设置 worker: hostNetWork: true fuse: hostNetWork: true

    在Spark Driver和Executor中,挂载domain 相同的path,同时设置Executor pod和宿主机使用相同的IP

    # 在specSpark-operator定义中设置 spec: executor: # spark源码中,在计算数据本地性时,在计算NODE_LOCAL这一级别时,会校验task执行的Executor的hostIP是否包含对于数据所在位置的hostIp,包含则数据本地性级别为NODE_LOCAL,详见我的另一篇文章。 hostNetwork: true sparkConf: spark.kubernetes.executor.volumes.hostPath.alluxio-domain.mount.path=/opt/domain spark.kubernetes.executor.volumes.hostPath.alluxio-domain.mount.readOnly=true spark.kubernetes.executor.volumes.hostPath.alluxio-domain.options.path=/tmp/alluxio-domain spark.kubernetes.executor.volumes.hostPath.alluxio-domain.options.type=Directory

    设置selector选取部分k8s node节点,用于部署Alluxio

    # 在Alluxio用户定义文件config.yaml中,为master、worker、fuse、fuse-client设置nodeSeletor # 由于在helm charts中,Alluxio master、worker、fuse除了使用各自的nodeSeletor外,还会统一使用主配置,这里直接在主配置上设置nodeSeletor。同时,修改templetes/fuse/client-daemonset.yaml ,为fuse-client单独添加nodeSeletor # 首先为k8s node 打标签 kubectl label node node1 alluxio=true ... # 修改confi.yaml nodeSeletor:{"alluxio": "true"} # 修改 templetes/fuse/client-daemonset.yaml spec: template: spec: # fuse 和 fuse-client 使用相同的 nodeseletor ,主要是两者功能相同。20200702 nodeSelector: {{- if .Values.fuse.nodeSelector }} {{ toYaml .Values.fuse.nodeSelector | trim | indent 8 }} {{- else if .Values.nodeSelector }} {{ toYaml .Values.nodeSelector | trim | indent 8 }} {{- end }}

    修改 templates/config/alluxio-conf.yaml,修复Alluxio 在k8s中,分层存储时,不能定义多层存储的bug。主要添加 -Dalluxio.worker.tieredstore.level{{ .level }}.alias={{ .mediumtype }} 到 ALLUXIO_WORKER_JAVA_OPTS 中。

    ALLUXIO_WORKER_JAVA_OPTS: |- -Dalluxio.worker.bind.host=0.0.0.0 {{ .Values.worker.jvmOptions }} {{- if eq .Values.worker.shortCircuitPolicy "uuid" }} -Dalluxio.worker.data.server.domain.socket.address=/opt/domain -Dalluxio.worker.data.server.domain.socket.as.uuid=true {{- end}} {{- if .Values.worker.resources }} {{- if .Values.worker.resources.requests }} {{- if .Values.worker.resources.requests.memory }} -Dalluxio.worker.memory.size={{ .Values.worker.resources.requests.memory }} {{- end}} {{- end}} {{- end}} -Dalluxio.worker.rpc.port={{ .Values.worker.resources.ports.rpc }} -Dalluxio.worker.web.port={{ .Values.worker.resources.ports.web }} {{- range $key, $val := .Values.worker.properties }} -D{{ $key }}={{ $val }} {{- end}} -Dalluxio.worker.hostname=${ALLUXIO_WORKER_HOSTNAME} {{- if .Values.tieredstore }} -Dalluxio.worker.tieredstore.levels={{ len .Values.tieredstore.levels }} {{- range .Values.tieredstore.levels }} -Dalluxio.worker.tieredstore.level{{ .level }}.alias={{ .mediumtype }} -Dalluxio.worker.tieredstore.level{{ .level }}.dirs.mediumtype={{ .mediumtype }} {{- if .path }} -Dalluxio.worker.tieredstore.level{{ .level }}.dirs.path={{ .path }} {{- end}} {{- if .quota }} -Dalluxio.worker.tieredstore.level{{ .level }}.dirs.quota={{ .quota }} {{- end}} {{- if .high }} -Dalluxio.worker.tieredstore.level{{ .level }}.watermark.high.ratio={{ .high }} {{- end}} {{- if .low }} -Dalluxio.worker.tieredstore.level{{ .level }}.watermark.low.ratio={{ .low }} {{- end}} {{- end}} {{ end }}

    修改templates/fuse/daemonset.yaml,修复Alluxio在k8s中,alluxio-fuse pod失败后自动重启不能成功的问题。使用pv挂载的方式替代之前的hostpath。

    apiVersion: v1 kind: PersistentVolumeClaim metadata: name: alluxio-fuse-mount-pvc namespace: spark spec: storageClassName: standard-alluxio accessModes: ["ReadWriteMany"] resources: requests: storage: 20Gi --- {{ if .Values.fuse.enabled -}} apiVersion: apps/v1 kind: DaemonSet metadata: name: {{ template "alluxio.fullname" . }}-fuse labels: app: {{ template "alluxio.name" . }} chart: {{ template "alluxio.chart" . }} release: {{ .Release.Name }} heritage: {{ .Release.Service }} role: alluxio-fuse spec: selector: matchLabels: app: {{ template "alluxio.name" . }} chart: {{ template "alluxio.chart" . }} release: {{ .Release.Name }} heritage: {{ .Release.Service }} role: alluxio-fuse template: metadata: labels: app: {{ template "alluxio.name" . }} chart: {{ template "alluxio.chart" . }} release: {{ .Release.Name }} heritage: {{ .Release.Service }} role: alluxio-fuse spec: hostNetwork: {{ .Values.fuse.hostNetwork }} dnsPolicy: {{ .Values.fuse.dnsPolicy }} nodeSelector: {{- if .Values.fuse.nodeSelector }} {{ toYaml .Values.fuse.nodeSelector | trim | indent 8 }} {{- else if .Values.nodeSelector }} {{ toYaml .Values.nodeSelector | trim | indent 8 }} {{- end }} securityContext: runAsUser: {{ .Values.fuse.user }} runAsGroup: {{ .Values.fuse.group }} fsGroup: {{ .Values.fuse.fsGroup }} affinity: podAffinity: RequiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: app operator: In values: - {{ template "alluxio.name" . }} - key: role operator: In values: - alluxio-worker containers: - name: alluxio-fuse image: {{ .Values.fuse.image }}:{{ .Values.fuse.imageTag }} imagePullPolicy: {{ .Values.fuse.imagePullPolicy }} {{- if .Values.fuse.resources }} resources: {{- if .Values.fuse.resources.limits }} limits: cpu: {{ .Values.fuse.resources.limits.cpu }} memory: {{ .Values.fuse.resources.limits.memory }} {{- end }} {{- if .Values.fuse.resources.requests }} requests: cpu: {{ .Values.fuse.resources.requests.cpu }} memory: {{ .Values.fuse.resources.requests.memory }} {{- end }} {{- end }} command: ["/entrypoint.sh"] {{- if .Values.fuse.args }} args: {{ toYaml .Values.fuse.args | trim | indent 12 }} {{- end }} env: - name: ALLUXIO_CLIENT_HOSTNAME valueFrom: fieldRef: fieldPath: status.hostIP - name: ALLUXIO_CLIENT_JAVA_OPTS value: " -Dalluxio.user.hostname=$(ALLUXIO_CLIENT_HOSTNAME) " securityContext: privileged: true capabilities: add: - SYS_ADMIN # lifecycle: # preStop: # exec: # command: ["/opt/alluxio/integration/fuse/bin/alluxio-fuse", "unmount", "/alluxio-fuse"] envFrom: - configMapRef: name: {{ template "alluxio.fullname" . }}-config volumeMounts: - name: alluxio-fuse-device mountPath: /dev/fuse - name: alluxio-fuse-mount mountPath: /alluxio-fuse mountPropagation: Bidirectional {{- if eq .Values.worker.shortCircuitPolicy "uuid" }} - name: alluxio-domain mountPath: /opt/domain {{- end }} {{- if eq .Values.worker.shortCircuitPolicy "local" }} {{- if .Values.tieredstore }} {{- if .Values.tieredstore.levels }} {{- range .Values.tieredstore.levels }} {{- if .mediumtype }} {{- if contains "," .mediumtype }} {{- $type := .type }} {{- $path := .path }} {{- $split := split "," .mediumtype }} {{- range $key, $val := $split }} {{- if eq $type "hostPath"}} - mountPath: {{ index ($path | split ",") $key }} name: {{ $val | lower }}-{{ $key | replace "_" "" }} {{- end}} {{- end}} {{- else}} {{- if eq .type "hostPath"}} - name: {{ .mediumtype | replace "," "-" | lower }} mountPath: {{ .path }} {{- else }} # report error and exit {{ .name }} with {{ .type }} is not supported in shortCircuitPolicy local {{- end }} {{- end }} {{- end}} {{- end }} {{- end }} {{- end }} {{- end }} restartPolicy: Always volumes: - name: alluxio-fuse-device hostPath: path: /dev/fuse type: File - name: alluxio-fuse-mount persistentVolumeClaim: claimName: alluxio-fuse-mount-pvc # hostPath: # path: /alluxio-fuse # type: DirectoryOrCreate {{- if eq .Values.worker.shortCircuitPolicy "uuid" }} - name: alluxio-domain hostPath: path: {{ .Values.domainHostPath }} type: "Directory" {{- end }} {{- if eq .Values.worker.shortCircuitPolicy "local" }} {{- if .Values.tieredstore }} {{- if .Values.tieredstore.levels }} {{- range .Values.tieredstore.levels }} {{- if .mediumtype }} {{- if contains "," .mediumtype }} {{- $split := split "," .mediumtype }} {{- $type := .type }} {{- $path := .path }} {{- range $key, $val := $split }} {{- if eq $type "hostPath"}} - hostPath: path: {{ index ($path | split ",") $key }} type: DirectoryOrCreate name: {{ $val | lower }}-{{ $key | replace "_" "" }} {{- else }} - name: {{ $val | lower }}-{{ $key | replace "_" "" }} emptyDir: medium: "Memory" {{- if .quota }} sizeLimit: {{ .quota }} {{- end}} {{- end}} {{- end}} {{- else}} {{- if eq .type "hostPath"}} - hostPath: path: {{ .path }} type: DirectoryOrCreate name: {{ .mediumtype | replace "," "-" | lower }} {{- end }} {{- end }} {{- end}} {{- end }} {{- end }} {{- end }} {{- end }} {{- end }}

    最后,贴出完整配置:

    Alluxio config.yaml文件:

    [root@master1 kubernetes]# cat config.yaml # This should not be modified in the usual case. fullnameOverride: alluxio ## Common ## # Docker Image image: alluxio/alluxio #imageTag: 2.2.1 imageTag: 2.1.2 imagePullPolicy: IfNotPresent # Security Context user: 0 group: 0 fsGroup: 0 # Site properties for all the components properties: # alluxio.user.metrics.collection.enabled: 'true' # shutdown security options while read data on worker alluxio.security.authorization.permission.enabled: "false" alluxio.user.file.metadata.sync.interval: "30min" # imporve cold read alluxio.user.ufs.block.read.location.policy: "alluxio.client.block.policy.DeterministicHashPolicy" alluxio.user.ufs.block.read.location.policy.deterministic.hash.shards: "3" # decrease case size on worker alluxio.user.file.passive.cache.enabled: "false" # for short-cuit info colletion alluxio.user.metrics.collection.enabled: "true" # Writes data only to Alluxio before returning a successful write alluxio.user.file.writetype.default: "ASYNC_THROUGH" # Does not persist the data automatically to the underlying storage, this is important because # only the final committed data is necessary to persist alluxio.user.file.persistence.initial.wait.time: "-1" # Hints that Alluxio should treat renaming as committing data and trigger a persist operation alluxio.user.file.persist.on.rename: "true" # Determines the number of copies in Alluxio when files are not yet persisted, increase this to # a larger number to ensure fault tolerance in case of Alluxio worker failures alluxio.user.file.replication.durable: "2" alluxio.master.metastore.dir: "/mnt/data/alluxio-k8s/metastore" # conf for master root ufs, here is s3 alluxio.master.mount.table.root.ufs: "your s3 bucket" alluxio.master.mount.table.root.option.aws.accessKeyId: "your s3 akey" alluxio.master.mount.table.root.option.aws.secretKey: "your s3 skey" alluxio.master.mount.table.root.option.alluxio.underfs.s3.endpoint: "your s3 endpoint " alluxio.master.mount.table.root.option.alluxio.underfs.s3.disable.dns.buckets: "true" alluxio.master.mount.table.root.option.alluxio.underfs.s3.inherit.acl: "true" # Blacklists persisting files which contain the string "_temporary" anywhere in their path alluxio.master.persistence.blacklist: ".staging,_temporary" # worker props alluxio.worker.data.server.domain.socket.address: /opt/domain alluxio.worker.data.server.domain.socket.as.uuid: true # Recommended JVM Heap options for running in Docker # Ref: https://developers.redhat.com/blog/2017/03/14/java-inside-docker/ # These JVM options are common to all Alluxio services # jvmOptions: "-XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2 " # Mount Persistent Volumes to all components # mounts: # - name: <persistentVolume claimName> # path: <mountPath> # Use labels to run Alluxio on a subset of the K8s nodes nodeSelector: {"alluxio": "true"} ## Master ## master: count: 3 # Controls the number of StatefulSets. For multiMaster mode increase this to >1. replicas: 1 # Controls #replicas in a StatefulSet and should not be modified in the usual case. args: # Arguments to Docker entrypoint - master-only - --no-format # Properties for the master component properties: # Example: use ROCKS DB instead of Heap resources: limits: cpu: "1" memory: "1G" requests: cpu: "1" memory: "1G" ports: embedded: 19200 rpc: 19998 web: 19999 # JVM options specific to the master container jvmOptions: "" nodeSelector: {} hostNetwork: false dnsPolicy: ClusterFirst metastore: medium: "" size: 1G # mountPath: /metastore mountPath: /mnt/data/alluxio-k8s/metastore jobMaster: args: - job-master # Properties for the jobMaster component properties: resources: limits: cpu: "1" memory: "1G" requests: cpu: "1" memory: "1G" ports: embedded: 20003 rpc: 20001 web: 20002 # JVM options specific to the jobMaster container jvmOptions: "" # Alluxio supports journal type of UFS and EMBEDDED # UFS journal with HDFS example # journal: # type: "UFS" # folder: "hdfs://{$hostname}:{$hostport}/journal" # EMBEDDED journal to /journal example # journal: # type: "EMBEDDED" # folder: "/journal" journal: storageClass: "standard-alluxio" size: 1G accessModes: - ReadWriteOnce type: "EMBEDDED" # "UFS" or "EMBEDDED" #ufsType: "local" # Ignored if type is "EMBEDDED". "local" or "HDFS" #!!!!! 在 initContainers 中,地址写死了,更改地址会导致master服务启动失败 folder: "/journal" # Master journal folder # folder: "/mnt/data/alluxio-k8s/journal" # Master journal folder # Configuration for journal formatting job format: runFormat: true # Change to true to format journal # runFormat: false # Change to true to format journal job: activeDeadlineSeconds: 30 ttlSecondsAfterFinished: 10 resources: limits: cpu: "1" memory: "1G" requests: cpu: "1" memory: "1G" ## Worker ## # Tiered Storage tieredstore: levels: - level: 0 mediumtype: MEM path: /mnt/data/alluxio-k8s/ramdisk type: hostPath quota: 1G high: 0.9 low: 0.7 - level: 1 mediumtype: HDD path: /mnt/data/alluxio-k8s/hdd-disk1 type: hostPath quota: 10G high: 0.9 low: 0.7 # The domain hostPath for uuid mode in short-circuit #domainHostPath: "/tmp/alluxio-domain" domainHostPath: "/mnt/data/alluxio-k8s/tmp/alluxio-domain" worker: args: - worker-only - --no-format # Properties for the worker component properties: # evictor class # alluxio.worker.evictor.class: alluxio.worker.block.evictor.LRUEvictor resources: limits: cpu: "1" memory: "2G" requests: cpu: "1" memory: "2G" ports: rpc: 29999 web: 30000 # JVM options specific to the worker container jvmOptions: "" nodeSelector: {} # domainHostPath: "/tmp/alluxio-domain" domainHostPath: "/mnt/data/alluxio-k8s/tmp/alluxio-domain" # for data locality hostNetwork: true dnsPolicy: ClusterFirstWithHostNet ramdisk: size: 1G # shortCircuitPolicy can be local or uuid, # local means the cache directory is in the same mount namespace, # uuid means interact with domain socket shortCircuitPolicy: "uuid" jobWorker: args: - job-worker # Properties for the jobWorker component properties: resources: limits: cpu: "1" memory: "1G" requests: cpu: "1" memory: "1G" ports: rpc: 30001 data: 30002 web: 30003 # JVM options specific to the jobWorker container jvmOptions: "" ## FUSE ## fuse: image: alluxio/alluxio-fuse # imageTag: 2.2.1 imageTag: 2.1.2 imagePullPolicy: IfNotPresent # Change both to true to deploy FUSE enabled: true clientEnabled: true # Properties for the jobWorker component properties: # Customize the MaxDirectMemorySize # These options are specific to the FUSE daemon # jvmOptions: " -XX:MaxDirectMemorySize=2g " jvmOptions: " -XX:MaxDirectMemorySize=2g " hostNetwork: true dnsPolicy: ClusterFirstWithHostNet user: 0 group: 0 fsGroup: 0 args: - fuse - --fuse-opts=allow_other resources: requests: cpu: "1" memory: "1G" limits: cpu: "1" memory: "1G" ## Secrets ## # Format: (<name>:<mount path under /secrets/>): # secrets: # master: # Shared by master and jobMaster containers # alluxio-hdfs-config: hdfsConfig # worker: # Shared by worker and jobWorker containers # alluxio-hdfs-config: hdfsConfig

    spark-operator应用模板示例:

    [root@master1 kubernetes]# cat /home/jqwu/examples/k8s-spark-alluxio-ink8s-logaggr-nodeselector.yaml apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: k8s-spark-alluxio-nodeselector-debug-01 namespace: spark spec: type: Scala mode: cluster image: "your spark image" imagePullPolicy: Always mainClass: your app main class mainApplicationFile: "local:///opt/spark/jars/k8s-spark-0.0.1-SNAPSHOT.jar" sparkVersion: "2.4.4" restartPolicy: type: Never volumes: - name: "k8s-spark-temp" emptyDir: {} driver: cores: 1 coreLimit: "2" memory: "2g" labels: version: 2.4.4 serviceAccount: spark nodeSelector: "alluxio": "true" volumeMounts: - name: "k8s-spark-temp" mountPath: "/tmp/spark-temp" executor: cores: 2 instances: 2 memory: "3g" hostNetwork: true labels: version: 2.4.4 nodeSelector: "alluxio": "true" volumeMounts: - name: "k8s-spark-temp" mountPath: "/tmp/spark-temp" sparkConf: spark.kubernetes.executor.volumes.hostPath.alluxio-domain.mount.path: "/opt/domain" spark.kubernetes.executor.volumes.hostPath.alluxio-domain.mount.readOnly: "true" spark.kubernetes.executor.volumes.hostPath.alluxio-domain.options.path: "/mnt/data/alluxio-k8s/tmp/alluxio-domain" spark.kubernetes.executor.volumes.hostPath.alluxio-domain.options.type: "Directory" spark.eventLog.enabled: "true" spark.eventLog.dir: "alluxio://alluxio-master-0:19998;alluxio-master-1:19998;alluxio-master-2:19998/spark/spark-events" # debug log option # spark.driver.extraJavaOptions: "-Dlog4j.configuration=file:///opt/spark/jars/log4j.properties" # spark.executor.extraJavaOptions: "-Dlog4j.configuration=file:///opt/spark/jars/log4j.properties"
    Processed: 0.011, SQL: 9