我有大约60GB的JSON文件,我使用Python解析,然后使用Python-MySQL Connector插入MySQL数据库.每个JSON文件大约500MB
我一直在使用带有辅助卷的AWS r3.xlarge EC2实例来保存60GB的JSON数据.
然后我使用AWS RDS r3.xlarge MySQL实例.这些实例都位于相同的区域和可用区域中.EC2实例使用以下Python脚本加载JSON,解析它然后将其插入MySQL RDS.我的python:
import json import mysql.connector from mysql.connector import errorcode from pprint import pprint import glob import os os.chdir("./json_data") for file in glob.glob("*.json"): with open(file, 'rU') as data_file: results = json.load(data_file) print('working on file:', file) cnx = mysql.connector.connect(user='', password='', host='') cursor = cnx.cursor(buffered=True) DB_NAME = 'DB' def create_database(cursor): try: cursor.execute( "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(DB_NAME)) except mysql.connector.Error as err: print("Failed creating database: {}".format(err)) exit(1) try: cnx.database = DB_NAME except mysql.connector.Error as err: if err.errno == errorcode.ER_BAD_DB_ERROR: create_database(cursor) cnx.database = DB_NAME else: print(err) exit(1) add_overall_data = ("INSERT INTO master" "(_sent_time_stamp, dt, ds, dtf, O_l, O_ln, O_Ls, O_a, D_l, D_ln, d_a)" "VALUES (%(_sent_time_stamp)s, %(dt)s, %(ds)s, %(dtf)s, %(O_l)s, %(O_ln)s, %(O_Ls)s, %(O_a)s, %(D_l)s, %(D_ln)s, %(d_a)s)") add_polyline = ("INSERT INTO polyline" "(Overview_polyline, request_no)" "VALUES (%(Overview_polyline)s, %(request_no)s)") add_summary = ("INSERT INTO summary" "(summary, request_no)" "VALUES (%(summary)s, %(request_no)s)") add_warnings = ("INSERT INTO warnings" "(warnings, request_no)" "VALUES (%(warnings)s, %(request_no)s)") add_waypoint_order = ("INSERT INTO waypoint_order" "(waypoint_order, request_no)" "VALUES (%(waypoint_order)s, %(request_no)s)") add_leg_data = ("INSERT INTO leg_data" "(request_no, leg_dt, leg_ds, leg_O_l, leg_O_ln, leg_D_l, leg_D_ln, leg_html_inst, leg_polyline, leg_travel_mode)" "VALUES (%(request_no)s, %(leg_dt)s, %(leg_ds)s, %(leg_O_l)s, %(leg_O_ln)s, %(leg_D_l)s, %(leg_D_ln)s, %(leg_html_inst)s, %(leg_polyline)s, %(leg_travel_mode)s)") error_messages = [] for result in results: if result["status"] == "OK": for leg in result['routes'][0]['legs']: try: params = { "_sent_time_stamp": leg['_sent_time_stamp'], "dt": leg['dt']['value'], "ds": leg['ds']['value'], "dtf": leg['dtf']['value'], "O_l": leg['start_location']['lat'], "O_ln": leg['start_location']['lng'], "O_Ls": leg['O_Ls'], "O_a": leg['start_address'], "D_l": leg['end_location']['lat'], "D_ln": leg['end_location']['lng'], "d_a": leg['end_address'] } cursor.execute(add_overall_data, params) query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') O_l = leg['start_location']['lat'] O_ln = leg['start_location']['lng'] D_l = leg['end_location']['lat'] D_ln = leg['end_location']['lng'] _sent_time_stamp = leg['_sent_time_stamp'] cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) request_no = cursor.fetchone()[0] except KeyError, e: error_messages.append(e) params = { "_sent_time_stamp": leg['_sent_time_stamp'], "dt": leg['dt']['value'], "ds": leg['ds']['value'], "dtf": "000", "O_l": leg['start_location']['lat'], "O_ln": leg['start_location']['lng'], "O_Ls": leg['O_Ls'], "O_a": 'unknown', "D_l": leg['end_location']['lat'], "D_ln": leg['end_location']['lng'], "d_a": 'unknown' } cursor.execute(add_overall_data, params) query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') O_l = leg['start_location']['lat'] O_ln = leg['start_location']['lng'] D_l = leg['end_location']['lat'] D_ln = leg['end_location']['lng'] _sent_time_stamp = leg['_sent_time_stamp'] cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) request_no = cursor.fetchone()[0] for overview_polyline in result['routes']: params = { "request_no": request_no, "Overview_polyline": overview_polyline['overview_polyline']['points'] } cursor.execute(add_polyline, params) query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') O_l = leg['start_location']['lat'] O_ln = leg['start_location']['lng'] D_l = leg['end_location']['lat'] D_ln = leg['end_location']['lng'] _sent_time_stamp = leg['_sent_time_stamp'] cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) request_no = cursor.fetchone()[0] for summary in result['routes']: params = { "request_no": request_no, "summary": summary['summary'] } cursor.execute(add_summary, params) query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') O_l = leg['start_location']['lat'] O_ln = leg['start_location']['lng'] D_l = leg['end_location']['lat'] D_ln = leg['end_location']['lng'] _sent_time_stamp = leg['_sent_time_stamp'] cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) request_no = cursor.fetchone()[0] for warnings in result['routes']: params = { "request_no": request_no, "warnings": str(warnings['warnings']) } cursor.execute(add_warnings, params) query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') O_l = leg['start_location']['lat'] O_ln = leg['start_location']['lng'] D_l = leg['end_location']['lat'] D_ln = leg['end_location']['lng'] _sent_time_stamp = leg['_sent_time_stamp'] cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) request_no = cursor.fetchone()[0] for waypoint_order in result['routes']: params = { "request_no": request_no, "waypoint_order": str(waypoint_order['waypoint_order']) } cursor.execute(add_waypoint_order, params) query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s') O_l = leg['start_location']['lat'] O_ln = leg['start_location']['lng'] D_l = leg['end_location']['lat'] D_ln = leg['end_location']['lng'] _sent_time_stamp = leg['_sent_time_stamp'] cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp)) request_no = cursor.fetchone()[0] for steps in result['routes'][0]['legs'][0]['steps']: params = { "request_no": request_no, "leg_dt": steps['dt']['value'], "leg_ds": steps['ds']['value'], "leg_O_l": steps['start_location']['lat'], "leg_O_ln": steps['start_location']['lng'], "leg_D_l": steps['end_location']['lat'], "leg_D_ln": steps['end_location']['lng'], "leg_html_inst": steps['html_instructions'], "leg_polyline": steps['polyline']['points'], "leg_travel_mode": steps['travel_mode'] } cursor.execute(add_leg_data, params) cnx.commit() print('error messages:', error_messages) cursor.close() cnx.close() print('finished' + file)
在Linux实例上使用htop我可以看到以下内容:
关于MySQL数据库,使用MySQL Workbench我可以看到:
这个python脚本已经被淘汰了几天,但我只将大约20%的数据插入到MySQL中.
我的问题 - 如何识别瓶颈?它是Python脚本吗?它似乎使用了少量内存 - 我可以增加这个吗?我已经检查了InnoDB缓冲池大小(如何提高每秒MySQL数据库的InnoDB写入速度)并发现它很大:
SELECT @@innodb_buffer_pool_size; +---------------------------+ | @@innodb_buffer_pool_size | +---------------------------+ | 11674845184 | +---------------------------+
由于我在同一地区使用RDS和EC2实例,因此我认为不存在网络瓶颈.我应该寻找最大的节省点的指针非常受欢迎!
编辑
我想我可能偶然发现了这个问题.为了提高解析效率,我分别编写了每个级别的JSON.但是,我必须执行一个查询以匹配JSON的嵌套部分及其更高级别.使用小型数据库时,此查询的开销很低.我注意到这个db的插入速度急剧下降.这是因为它必须搜索更大且不断增长的数据库才能正确连接JSON数据.
除了等待之外,我不知道如何解决这个问题....