无法使用Flume twitter代理读取和解析通过流式传输Twitter数据创建的文件,既不使用Java也不使用Avro Tools.我的要求是将avro格式转换为JSON格式.
当使用任何一种方法时,我得到例外: org.apache.avro.AvroRuntimeException: java.io.IOException: Block size invalid or too large for this implementation: -40
我在伪节点集群中使用Hadoop vanilla配置,而hadoop版本是2.7.1
Flume版本是1.6.0
twitter代理的flume配置文件和解析avro文件的java代码如下:
TwitterAgent.sources=Twitter TwitterAgent.channels=MemChannel TwitterAgent.sinks=HDFS TwitterAgent.sources.Twitter.type=org.apache.flume.source.twitter.TwitterSource TwitterAgent.sources.Twitter.channels=MemChannel TwitterAgent.sources.Twitter.consumerKey=xxxxxxxxxxxxxx TwitterAgent.sources.Twitter.consumerSecret=xxxxxxxxxxxxxxxx TwitterAgent.sources.Twitter.accessToken=xxxxxxxxxxxxxxxx TwitterAgent.sources.Twitter.accessTokenSecret=xxxxxxxxxxxxxx TwitterAgent.sources.Twitter.keywords=Modi,PMO,Narendra Modi,BJP TwitterAgent.sinks.HDFS.channel=MemChannel TwitterAgent.sinks.HDFS.type=hdfs TwitterAgent.sinks.HDFS.hdfs.path=hdfs://localhost:9000/user/ashish/Twitter_Data TwitterAgent.sinks.HDFS.hdfs.fileType=DataStream TwitterAgent.sinks.HDFS.hdfs.writeformat=Text TwitterAgent.sinks.HDFS.hdfs.batchSize=100 TwitterAgent.sinks.HDFS.hdfs.rollSize=0 TwitterAgent.sinks.HDFS.hdfs.rollCount=10 TwitterAgent.sinks.HDFS.hdfs.rollInterval=30 TwitterAgent.channels.MemChannel.type=memory TwitterAgent.channels.MemChannel.capacity=10000 TwitterAgent.channels.MemChannel.transactionCapacity=100
import org.apache.avro.file.DataFileReader; import org.apache.avro.file.FileReader; import org.apache.avro.file.SeekableInput; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.mapred.FsInput; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import java.io.IOException; public class AvroReader { public static void main(String[] args) throws IOException { Path path = new Path("hdfs://localhost:9000/user/ashish/Twitter_Data/FlumeData.1449656815028"); Configuration config = new Configuration(); SeekableInput input = new FsInput(path, config); DatumReaderreader = new GenericDatumReader<>(); FileReader fileReader = DataFileReader.openReader(input, reader); for (GenericRecord datum : fileReader) { System.out.println("value = " + datum); } fileReader.close(); } }
我得到的异常堆栈跟踪是:
2015-12-09 17:48:19,291 WARN [main] util.NativeCodeLoader (NativeCodeLoader.java:(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable value = {"id": "674535686809120768", "user_friends_count": 1260, "user_location": "????", "user_description": "???????????????bot???????????????????????????????????????????????????", "user_statuses_count": 47762, "user_followers_count": 1153, "user_name": "??", "user_screen_name": "zazie_bot", "created_at": "2015-12-09T15:56:54Z", "text": "@ill_akane_bot ???????\u2026??????????\u2026", "retweet_count": 0, "retweeted": false, "in_reply_to_user_id": 204695477, "source": "twittbot.net<\/a>", "in_reply_to_status_id": 674535430423887872, "media_url_https": null, "expanded_url": null} Exception in thread "main" org.apache.avro.AvroRuntimeException: java.io.IOException: Block size invalid or too large for this implementation: -40 at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:275) at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197) at avro.AvroReader.main(AvroReader.java:24) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: java.io.IOException: Block size invalid or too large for this implementation: -40 at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:266) ... 7 more
我是否还需要为Avro文件提供正确读取的Avro架构,如果是这样的话?