2021-01-22

    技术2022-07-11  90

    2400万数据11个字段

    内存12G笔记本测试

    存入数据耗时: 89 秒 读取数据耗时: 148 秒

    内存16G台式机测试

    存入数据耗时: 52 秒 读取数据耗时: 42 秒 耗时与当时电脑的可用内存大小相关

    # -*- coding: utf-8 -*- """ 千万级数据管道存储 思路: 1.压缩 -> 减小内存空间大小: 1.pandas 的DataFrame被设计成可以适应内存, 让系统更好的运行起来 2.将 DataFrame 压缩成字节流 2.分块 -> 大数据切成小份存储: 通过判断数据占用内存的大小来进行切割 """ import time import redis import pandas as pd class DataFrameRedis: """DataFrame与Redis的存储与读取""" def __init__(self, redis_host: str = None, password: str = None): """ :param redis_host: IP地址 :param password: 密码 """ self.redis_host = redis_host self.password = password def df_to_redis(self, df: pd.DataFrame, key_name: str) -> None: """ :param df: pandas 的 DataFrame :param key_name: Redis键名 """ if self.password: # 建立Redis连接 rs = redis.StrictRedis(host=self.redis_host, password=self.password) else: rs = redis.StrictRedis(host=self.redis_host) storage_size = (df.memory_usage().sum() / (1024 ** 2)) + 1 # 查看数据占用的内存消耗 # 计算切割大小 if storage_size > 512: # 如果大于512MB cutting_n = int(storage_size // 512 + 2) # 切割成多少份 df_row = df.shape[0] // cutting_n # df.shape[0]:数据的行数 df_row_start = 0 # 切割的起始行数 # 循环切割入库 for i in range(cutting_n): # 循环切割的份数 df_row_end = df_row_start + df_row # 找到结束的行数 df_cutting = df.iloc[df_row_start: df_row_end, :] # 切割DataFrame df_row_start = df_row_end # 开始的值等于结束的值 df_bytes = df_cutting.to_msgpack() # DataFrame压缩成字节流 rs.set(key_name + ':' + str(i), df_bytes) # 存入Redis df_cutting = df.iloc[df_row_end:, :] # 获取最后一部分数据 df_bytes = df_cutting.to_msgpack() # DataFrame压缩成字节流 rs.set(key_name + ':' + str(i+1), df_bytes) # 存入Redis else: df_bytes = df.to_msgpack() rs.set(key_name, df_bytes) def get_redis_df(self, key_name: str) -> pd.DataFrame: """ :param key_name: 数据库名 :return: DataFrame """ if self.password: # 建立Redis连接 rs = redis.StrictRedis(host=self.redis_host, password=self.password) else: rs = redis.StrictRedis(host=self.redis_host) keys = rs.keys() # 获取所有Redis键名 redis_df_all: pd.DataFrame = pd.DataFrame(columns=[]) # 创建空DataFrame for key in keys: key = str(key, 'utf-8') # 转str utf-8 if key_name in str(key): # 如果key_name在键名里 df_bytes_from_redis = rs.get(key) # 读取数据 df_from_redis = pd.read_msgpack(df_bytes_from_redis) # 字节流转DataFrame redis_df_all = pd.concat( # 添加到变量redis_df_all中 [redis_df_all, df_from_redis], ignore_index=True) return redis_df_all def main(): """主函数""" d = DataFrameRedis('127.0.0.1') # d = DataFrameRedis('127.0.0.1', 'password') # 读取csv文件 df=pd.read_csv('data.csv', chunksize=100000) # chunksize 表示分十万行一个批次读取 start_time = time.time() df = pd.read_csv('data.csv', encoding="gbk") end__time = time.time() print('读取文件耗时耗时:', end__time - start_time, '秒') # 存入数据 start_time = time.time() d.df_to_redis(df, 'ReadRedisDataFrame') end__time = time.time() print('存入数据耗时:', end__time - start_time, '秒') # 读取数据 start_time = time.time() df_get = d.get_redis_df('ReadRedisDataFrame') end__time = time.time() print('读取数据耗时:', end__time - start_time, '秒') print(df_get) if __name__ == '__main__': main()
    Processed: 0.010, SQL: 9