我是R背景的Spark和Scala的新手,在对RDD进行了一些转换之后,我得到了RDD类型
Description: RDD[(String, Int)]
现在,我想在字符串RDD上应用正则表达式,并从字符串中提取子字符串,然后仅在新列中添加子字符串。
输入数据 :
BMW 1er Model,278 MINI Cooper Model,248
我正在寻找的输出:
Input | Brand | Series BMW 1er Model,278, BMW , 1er MINI Cooper Model ,248 MINI , Cooper
其中Brand和Series是来自字符串RDD的新计算的子字符串
到目前为止我所做的。
我可以使用正则表达式为String实现此功能,但是我可以将其应用于所有行。
val brandRegEx = """^.*[Bb][Mm][Ww]+|.[Mm][Ii][Nn][Ii]+.*$""".r //to look for BMW or MINI
那我可以用
brandRegEx.findFirstIn("hello this mini is bmW testing")
但是,如何将它用于RDD的所有行并应用不同的正则表达式来实现上述输出。
我阅读了有关此代码段的信息,但不确定如何将其完全放在一起。
val brandRegEx = """^.*[Bb][Mm][Ww]+|.[Mm][Ii][Nn][Ii]+.*$""".r def getBrand(Col4: String) : String = Col4 match { case brandRegEx(str) => case _ => "" return 'substring }
任何帮助,将不胜感激 !
谢谢
要将正则表达式应用于RDD中的每一项,您应使用RDD map
函数,该函数使用某些函数(在本例中为Partial Function来转换RDD中的每一行,以便提取到组成每个元组的两部分)行):
import org.apache.spark.{SparkContext, SparkConf} object Example extends App { val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("Example")) val data = Seq( ("BMW 1er Model",278), ("MINI Cooper Model",248)) val dataRDD = sc.parallelize(data) val processedRDD = dataRDD.map{ case (inString, inInt) => val brandRegEx = """^.*[Bb][Mm][Ww]+|.[Mm][Ii][Nn][Ii]+.*$""".r val brand = brandRegEx.findFirstIn(inString) //val seriesRegEx = ... //val series = seriesRegEx.findFirstIn(inString) val series = "foo" (inString, inInt, brand, series) } processedRDD.collect().foreach(println) sc.stop() }
请注意,我认为您的正则表达式存在一些问题,并且还需要一个正则表达式来查找序列。此代码输出:
(BMW 1er Model,278,BMW,foo) (MINI Cooper Model,248,NOT FOUND,foo)
但是,如果您根据需要更正了正则表达式,这就是将它们应用于每行的方法。