出于结帐目的,我尝试将Amazon S3存储桶设置为检查点文件.
val checkpointDir = "s3a://bucket-name/checkpoint.txt" val sc = new SparkContext(conf) sc.setLocalProperty("spark.default.parallelism", "30") sc.hadoopConfiguration.set("fs.s3a.access.key", "xxxxx") sc.hadoopConfiguration.set("fs.s3a.secret.key", "xxxxx") sc.hadoopConfiguration.set("fs.s3a.endpoint", "bucket-name.s3-website.eu-central-1.amazonaws.com") val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint(checkpointDir)
但它会因此异常而停止
Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 9D8E8002H3BBDDC7, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: Qme5E3KAr/KX0djiq9poGXPJkmr0vuXAduZujwGlvaAl+oc6vlUpq7LIh70IF3LNgoewjP+HnXA= at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528) at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031) at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994) at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:154) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:232) at com.misterbell.shiva.StreamingApp$.main(StreamingApp.scala:89) at com.misterbell.shiva.StreamingApp.main(StreamingApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
我不明白为什么我得到这个错误,我找不到任何例子.
此消息对应于"错误端点"或错误的签名版本支持.
在这里看到法兰克福是唯一一个不支持签名版本2的人.我是我选择的那个.
当然,毕竟我的研究不能说什么是签名版本,但在文档中并不明显.但V2似乎与s3a一起使用.
在S3界面中看到的端点不是真正的端点,它只是Web端点.
你有一次用论文端点像
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3-eu-west-1.amazonaws.com")
但默认情况下它与US端点一起工作