我有一个大的(约1.6亿行)数据帧,我已经存储到磁盘上,如下所示:
def fillStore(store, tablename): files = glob.glob('201312*.csv') names = ["ts", "c_id", "f_id","resp_id","resp_len", "s_id"] for f in files: df = pd.read_csv(f, parse_dates=True, index_col=0, names=names) store.append(tablename, df, format='table', data_columns=['c_id','f_id'])
该表有一个时间索引,我将使用c_id
和f_id
除了时间(通过索引)查询.
我有另一个包含~18000个"事件"的数据帧.每个事件都包含一些(少至数百,多达数十万)个人记录.我需要为每个事件收集一些简单的统计信息并存储它们以收集一些汇总统计信息.目前我这样做:
def makeQueryString(c, f, start, stop): return "c_id == {} & f_id == {} & index >= Timestamp('{}') & index < Timestamp('{}')".format(c, f , str(pd.to_datetime(start)),str(pd.to_datetime(stop))) def getIncidents(inc_times, store, tablename): incidents = pd.DataFrame(columns = ['c_id','f_id','resp_id','resp_len','s_id','incident_id']) for ind, row in inc_times.iterrows(): incidents = incidents.append(store.select(tablename, makeQueryString(row.c_id, row.f_id, row.start, row.stop))).fillna(ind) return incidents
这一切都很好,除了每个store.select()
语句大约需要5秒钟,这意味着处理整个月的数据需要大约24-30小时的处理时间.同时,我需要的实际统计数据相对简单:
def getIncidentStats(df): incLen = (df.index[-1]-df.index[0]).total_seconds() if incLen == 0: incLen = .1 rqsts = len(df) rqstRate_s = rqsts/incLen return pd.Series({'c_id':df.c_id[0], 'f_id':df.fqdn_id[0], 'Length_sec':incLen, 'num_rqsts':rqsts, 'rqst_rate':rqstRate_s, 'avg_resp_size':df.response_len.mean(), 'std_resp_size':df.response_len.std()}) incs = getIncidents(i_times, store, tablename) inc_groups = incs.groupby('incident_id') inc_stats = inc_groups.apply(getIncidentStats)
我的问题是:我如何才能提高这项工作流程的任何部分的绩效或效率? (请注意,我实际上批量处理大部分作业,以便一次一天地获取和存储事件,因为我想限制即使发生崩溃也会丢失已处理数据的风险.为了简单起见,我将此代码留在这里因为我实际上需要处理整个月的数据.)
有没有办法处理数据,因为我从商店收到数据,这有什么好处?我会从使用store.select_as_index中受益吗?如果我收到索引,我仍然需要访问数据才能获得正确的统计信息?
其他说明/问题:我比较了在SSD和普通硬盘上存储我的HDFStore的性能,并没有注意到SSD的任何改进.这是预期的吗?
我还想到了创建查询字符串的大型连接并一次性请求它们的想法.当总查询字符串太大(~5-10个查询)时,这会导致内存错误.
编辑1如果重要,我使用表版本3.1.0和pandas版本0.13.1
编辑2以下是一些更多信息:
ptdump -av store.h5 / (RootGroup) '' /._v_attrs (AttributeSet), 4 attributes: [CLASS := 'GROUP', PYTABLES_FORMAT_VERSION := '2.0', TITLE := '', VERSION := '1.0'] /all_recs (Group) '' /all_recs._v_attrs (AttributeSet), 14 attributes: [CLASS := 'GROUP', TITLE := '', VERSION := '1.0', data_columns := ['c_id', 'f_id'], encoding := None, index_cols := [(0, 'index')], info := {1: {'type': 'Index', 'names': [None]}, 'index': {'index_name': 'ts'}}, levels := 1, nan_rep := 'nan', non_index_axes := [(1, ['c_id', 'f_id', 'resp_id', 'resp_len', 'dns_server_id'])], pandas_type := 'frame_table', pandas_version := '0.10.1', table_type := 'appendable_frame', values_cols := ['values_block_0', 'c_id', 'f_id']] /all_recs/table (Table(161738653,)) '' description := { "index": Int64Col(shape=(), dflt=0, pos=0), "values_block_0": Int64Col(shape=(3,), dflt=0, pos=1), "c_id": Int64Col(shape=(), dflt=0, pos=2), "f_id": Int64Col(shape=(), dflt=0, pos=3)} byteorder := 'little' chunkshape := (5461,) autoindex := True colindexes := { "index": Index(6, medium, shuffle, zlib(1)).is_csi=False, "f_id": Index(6, medium, shuffle, zlib(1)).is_csi=False, "c_id": Index(6, medium, shuffle, zlib(1)).is_csi=False} /all_recs/table._v_attrs (AttributeSet), 19 attributes: [CLASS := 'TABLE', FIELD_0_FILL := 0, FIELD_0_NAME := 'index', FIELD_1_FILL := 0, FIELD_1_NAME := 'values_block_0', FIELD_2_FILL := 0, FIELD_2_NAME := 'c_id', FIELD_3_FILL := 0, FIELD_3_NAME := 'f_id', NROWS := 161738653, TITLE := '', VERSION := '2.6', client_id_dtype := 'int64', client_id_kind := ['c_id'], fqdn_id_dtype := 'int64', fqdn_id_kind := ['f_id'], index_kind := 'datetime64', values_block_0_dtype := 'int64', values_block_0_kind := ['s_id', 'resp_len', 'resp_id']]
以下是主表和inc_times的示例:
In [12]: df.head() Out[12]: c_id f_id resp_id resp_len \ ts 2013-12-04 08:00:00 637092486 5372764353 30 56767543 2013-12-04 08:00:01 637092486 5399580619 23 61605423 2013-12-04 08:00:04 5456242 5385485460 21 46742687 2013-12-04 08:00:04 5456242 5385485460 21 49909681 2013-12-04 08:00:04 624791800 5373236646 14 70461449 s_id ts 2013-12-04 08:00:00 1829 2013-12-04 08:00:01 1724 2013-12-04 08:00:04 1679 2013-12-04 08:00:04 1874 2013-12-04 08:00:04 1727 [5 rows x 5 columns] In [13]: inc_times.head() Out[13]: c_id f_id start stop 0 7254 196211 1385880945000000000 1385880960000000000 1 9286 196211 1387259840000000000 1387259850000000000 2 16032 196211 1387743730000000000 1387743735000000000 3 19793 196211 1386208175000000000 1386208200000000000 4 19793 196211 1386211800000000000 1386211810000000000 [5 rows x 4 columns]
关于c_id和f_id,我想从完整商店中选择的ID集与商店中的ID总数相比相对较少.换句话说,在inc_times中有一些流行的ID,我会反复查询,同时完全忽略完整表中存在的一些ID.我估计我关心的ID大约是总ID的10%,但这些是最受欢迎的ID,因此他们的记录占据了整个ID.
我有16GB的RAM.完整存储是7.4G,完整数据集(作为csv文件)仅为8.7 GB.最初我相信我能够将整个内容加载到内存中,至少对它进行一些有限的操作,但是在加载整个内容时出现内存错误.因此,将其批处理为日常文件(完整文件包含一个月的数据).