Search

Spark Operator

EKS_CLUSTER_NAME=skills-eks-cluster AWS_REGION=ap-northeast-2 ACCOUNTID=$(aws sts get-caller-identity --query "Account" --output text)
Shell
복사
aws ecr get-login-password \ --region ap-northeast-2 | helm registry login \ --username AWS \ --password-stdin 996579266876.dkr.ecr.ap-northeast-2.amazonaws.com
Shell
복사
helm install spark-operator-demo \ oci://996579266876.dkr.ecr.ap-northeast-2.amazonaws.com/spark-operator \ --set emrContainers.awsRegion=ap-northeast-2 \ --version 7.8.0 \ --namespace spark-operator \ --create-namespace
Shell
복사
helm list --namespace spark-operator -o yaml
Shell
복사
cat << EOF > spark-operator-job-execution-policy.json { "Version": "2012-10-17", "Statement": [ { "Action": [ "acm:DescribeCertificate", "ec2:AuthorizeSecurityGroupEgress", "ec2:AuthorizeSecurityGroupIngress", "ec2:CreateSecurityGroup", "ec2:DeleteSecurityGroup", "ec2:RevokeSecurityGroupEgress", "ec2:RevokeSecurityGroupIngress", "glue:AlterPartitions", "glue:BatchCreatePartition", "glue:CreateDatabase", "glue:CreateTable", "glue:DeletePartition", "glue:DeleteTable", "glue:GetDatabase", "glue:GetDatabases", "glue:GetPartition", "glue:GetPartitions", "glue:GetTable", "glue:GetUserDefinedFunctions", "glue:ListSchemas", "glue:UpdateTable", "s3:DeleteObject", "s3:GetObject", "s3:ListBucket", "s3:PutObject" ], "Resource": "*", "Effect": "Allow" }, { "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:DescribeLogGroups", "logs:DescribeLogStreams", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:*:*:*", "Effect": "Allow" } ] } EOF
Shell
복사
aws iam create-policy --policy-name spark-operator-emr-job-execution-policy --policy-document file://spark-operator-job-execution-policy.json
Shell
복사
eksctl create iamserviceaccount \ --cluster=$EKS_CLUSTER_NAME \ --region $AWS_REGION \ --name=spark-operator-emr-job-execution-sa \ --attach-policy-arn=arn:aws:iam::$ACCOUNTID:policy/spark-operator-emr-job-execution-policy \ --role-name=spark-operator-emr-job-execution-irsa \ --namespace=spark-operator \ --approve
Shell
복사
apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: spark-operator-emr-job-execution-role namespace: spark-operator rules: - apiGroups: ["", "batch","extensions"] resources: ["configmaps","serviceaccounts","events","pods","pods/exec","pods/log","pods/portforward","secrets","services","persistentvolumeclaims"] verbs: ["create","delete","get","list","patch","update","watch"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: spark-operator-emr-job-execution-rb namespace: spark-operator roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: spark-operator-emr-job-execution-role subjects: - kind: ServiceAccount name: spark-operator-emr-job-execution-sa namespace: spark-operator
YAML
복사
kubectl apply -f emr-job-execution-rbac.yaml
Shell
복사
aws s3 mb s3://skills-data-analytics-bucket
Shell
복사
aws s3api put-object --bucket skills-data-analytics-bucket --key logs/spark-operator/ aws s3api put-object --bucket skills-data-analytics-bucket --key emr-eks/scripts/
Shell
복사
cat << EOF > pi.py EOF
Python
복사
aws s3 cp pi.py s3://skills-data-analytics-bucket/emr-eks/scripts/pi.py
Shell
복사
apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pi namespace: spark-operator spec: type: Python pythonVersion: "3" mode: cluster # EMR optimized runtime image image: "public.ecr.aws/emr-on-eks/spark/emr-6.10.0:latest" imagePullPolicy: Always mainClass: ValueZones mainApplicationFile: s3://skills-data-analytics-bucket/emr-eks/scripts/pi.py hadoopConf: # EMRFS filesystem config fs.s3.customAWSCredentialsProvider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider fs.s3.impl: com.amazon.ws.emr.hadoop.fs.EmrFileSystem fs.AbstractFileSystem.s3.impl: org.apache.hadoop.fs.s3.EMRFSDelegate fs.s3.buffer.dir: /mnt/s3 fs.s3.getObject.initialSocketTimeoutMilliseconds: "2000" mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem: "2" mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem: "true" sparkConf: # Logging location spark.eventLog.enabled: "true" spark.eventLog.dir: "s3://skills-data-analytics-bucket/logs/spark-operator/" # Required for EMR Runtime spark.driver.extraClassPath: /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/* spark.driver.extraLibraryPath: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native spark.executor.extraClassPath: /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/* spark.executor.extraLibraryPath: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native # EMRFS commiter spark.sql.parquet.output.committer.class: com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter spark.sql.parquet.fs.optimized.committer.optimization-enabled: "true" spark.sql.emr.internal.extensions: com.amazonaws.emr.spark.EmrSparkSessionExtensions spark.executor.defaultJavaOptions: -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70 -XX:OnOutOfMemoryError='kill -9 %p' spark.driver.defaultJavaOptions: -XX:OnOutOfMemoryError='kill -9 %p' -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70 sparkVersion: "3.3.1" restartPolicy: type: Never driver: cores: 1 memory: "2g" serviceAccount: spark-operator-emr-job-execution-sa executor: cores: 2 instances: 2 memory: "2g" serviceAccount: spark-operator-emr-job-execution-sa
YAML
복사
kubectl apply -f spark-pi.yaml
Shell
복사
kubectl describe sparkapplication spark-pi --namespace spark-operator
Shell
복사