该代码将无法原样编译。但这是给定用例的方法。
val conf = new SparkConf().setAppName("App name").setMaster("yarn") val ssc = new StreamingContext(conf, Seconds(1)) val dstream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap) dstream.foreachRDD { rdd => //Write the rdd to HDFS directly rdd.saveAsTextFile("hdfs/location/to/save") //loop through each parttion in rdd rdd.foreachPartition { partitionOfRecords => //1. Create HttpClient object here //2.a POST data to API //Use it if you want record level control in rdd or partion partitionOfRecords.foreach { record => //2.b Post the the date to API record.toString } } //Use 2.a or 2.b to POST data as per your req } ssc.start() ssc.awaitTermination()
大多数HttpClient(用于REST调用)都支持请求超时。
使用Apache HttpClient的超时示例Http POST调用
val CONNECTION_TIMEOUT_MS = 20000; // Timeout in millis (20 sec). val requestConfig = RequestConfig.custom() .setConnectionRequestTimeout(CONNECTION_TIMEOUT_MS) .setConnectTimeout(CONNECTION_TIMEOUT_MS) .setSocketTimeout(CONNECTION_TIMEOUT_MS) .build(); HttpClientBuilder.create().build(); val client: CloseableHttpClient = HttpClientBuilder.create().build(); val url = "https://selfsolve.apple.com/wcResults.do" val post = new HttpPost(url); //Set config to post post.setConfig(requestConfig) post.setEntity(EntityBuilder.create.setText("some text to post to API").build()) val response: HttpResponse = client.execute(post)