股票系统gh_mirrors/st/stock数据库设计详解:MySQL与Pandas数据处理技巧

股票系统gh_mirrors/st/stock数据库设计详解:MySQL与Pandas数据处理技巧

股票系统gh_mirrors/st/stock数据库设计详解:MySQL与Pandas数据处理技巧

【免费下载链接】stock stock,股票系统。使用python进行开发。 项目地址: https://gitcode.***/gh_mirrors/st/stock

引言:股票数据处理的双重挑战

你是否曾面临过股票数据存储效率低下、Pandas数据处理与MySQL交互不畅的问题?本文将深入解析gh_mirrors/st/stock项目的数据库设计理念,展示如何通过MySQL与Pandas的高效协同,构建稳定可靠的股票数据处理系统。读完本文,你将掌握:

  • 股票系统核心数据库架构设计与表结构规范
  • MySQL与Pandas数据交互的最佳实践
  • 高并发股票数据写入的优化技巧
  • 数据缓存与性能提升的实现方案

一、数据库架构设计

1.1 整体架构概览

gh_mirrors/st/stock项目采用MySQL作为核心数据存储,结合Pandas进行数据处理与分析,通过分层设计实现数据的高效管理。系统架构如下:

1.2 数据库连接配置

项目通过环境变量实现数据库连接的灵活配置,核心配置如下:

# libs/***mon.py
MYSQL_HOST = os.environ.get('MYSQL_HOST') if (os.environ.get('MYSQL_HOST') != None) else "mysqldb"
MYSQL_USER = os.environ.get('MYSQL_USER') if (os.environ.get('MYSQL_USER') != None) else "root"
MYSQL_PWD = os.environ.get('MYSQL_PWD') if (os.environ.get('MYSQL_PWD') != None) else "mysqldb"
MYSQL_DB = os.environ.get('MYSQL_DB') if (os.environ.get('MYSQL_DB') != None) else "stock_data"
MYSQL_CONN_URL = "mysql+mysqldb://" + MYSQL_USER + ":" + MYSQL_PWD + "@" + MYSQL_HOST + ":3306/" + MYSQL_DB + "?charset=utf8mb4"

这种配置方式的优势在于:

  • 开发环境与生产环境无缝切换
  • 数据库凭证安全管理
  • 多环境部署的灵活性

1.3 核心数据表设计

虽然项目中未直接提供.sql文件,但通过代码分析可推导出核心表结构设计。以下是三个关键表的结构说明:

1.3.1 股票基本信息表(stock_zh_ah_name)
字段名 类型 描述
code VARCHAR(10) 股票代码,主键
name VARCHAR(20) 股票名称
latest_price DECIMAL(10,2) 最新价格
quote_change DECIMAL(5,2) 涨跌幅(%)
ups_downs DECIMAL(10,2) 涨跌额
volume BIGINT 成交量(手)
turnover DECIMAL(15,2) 成交额(万元)
amplitude DECIMAL(5,2) 振幅(%)
high DECIMAL(10,2) 最高价
low DECIMAL(10,2) 最低价
open DECIMAL(10,2) 开盘价
closed DECIMAL(10,2) 昨收价
quantity_ratio DECIMAL(5,2) 量比
turnover_rate DECIMAL(5,2) 换手率(%)
pe_dynamic DECIMAL(10,2) 动态市盈率
pb DECIMAL(5,2) 市净率
date INT 日期(YYYYMMDD),主键

该表通过datecode组合作为主键,确保数据唯一性:

# jobs/18h_daily_job.py
***mon.insert_db(data, "stock_zh_ah_name", True, "`date`,`code`")
1.3.2 龙虎榜个股上榜统计表(stock_sina_lhb_ggtj)
字段名 类型 描述
code VARCHAR(10) 股票代码,主键
name VARCHAR(20) 股票名称
ranking_times INT 上榜次数
sum_buy DECIMAL(15,2) 买入金额(万元)
sum_sell DECIMAL(15,2) 卖出金额(万元)
***_amount DECIMAL(15,2) 净额(万元)
buy_seat VARCHAR(255) 买入营业部
sell_seat VARCHAR(255) 卖出营业部
date INT 日期(YYYYMMDD),主键
1.3.3 大宗交易每日统计表(stock_dzjy_mrtj)
字段名 类型 描述
code VARCHAR(10) 股票代码,主键
name VARCHAR(20) 股票名称
quote_change DECIMAL(5,2) 涨跌幅(%)
close_price DECIMAL(10,2) 收盘价
average_price DECIMAL(10,2) 成交均价
overflow_rate DECIMAL(5,4) 折溢率(%)
trade_number INT 成交笔数
sum_volume BIGINT 成交数量(股)
sum_turnover DECIMAL(15,2) 成交金额(万元)
turnover_market_rate DECIMAL(7,6) 成交额/流通市值(%)
date INT 日期(YYYYMMDD),主键

1.4 数据库初始化流程

项目通过basic_job.py实现数据库的自动创建与初始化:

# jobs/basic_job.py
def create_new_database():
    with MySQLdb.connect(***mon.MYSQL_HOST, ***mon.MYSQL_USER, ***mon.MYSQL_PWD, "mysql", charset="utf8") as db:
        try:
            create_sql = " CREATE DATABASE IF NOT EXISTS %s CHARACTER SET utf8 COLLATE utf8_general_ci " % ***mon.MYSQL_DB
            print(create_sql)
            db.auto***mit(on=True)
            db.cursor().execute(create_sql)
        except Exception as e:
            print("error CREATE DATABASE :", e)

初始化检查流程:

  1. 尝试连接目标数据库
  2. 连接失败时自动创建数据库
  3. 设置字符集为UTF-8确保中文支持

二、MySQL与Pandas数据交互实现

2.1 核心交互模块设计

项目在libs/***mon.py中封装了完整的MySQL与Pandas交互接口,形成数据访问层:

2.2 数据写入实现机制

数据写入采用SQLAlchemy引擎,结合Pandas的to_sql方法实现高效数据插入:

# libs/***mon.py
def insert_other_db(to_db, data, table_name, write_index, primary_keys):
    # 定义engine
    engine_mysql = engine_to_db(to_db)
    # 使用检查检查数据库表是否有主键
    insp = inspect(engine_mysql)
    col_name_list = data.columns.tolist()
    # 如果有索引,把索引增加到varchar上面
    if write_index:
        # 插入到第一个位置
        col_name_list.insert(0, data.index.name)
    print(col_name_list)
    data.to_sql(name=table_name, con=engine_mysql, schema=to_db, if_exists='append',
                dtype={col_name: NVARCHAR(length=255) for col_name in col_name_list}, index=write_index)
    
    # 自动添加主键约束
    if insp.get_pk_constraint(table_name)['constrained_columns'] == []:
        with engine_mysql.connect() as con:
            try:
                con.execute('ALTER TABLE `%s` ADD PRIMARY KEY (%s);' % (table_name, primary_keys))
            except Exception as e:
                print("################## ADD PRIMARY KEY ERROR :", e)

写入流程特点:

  • 自动处理DataFrame列类型映射
  • 支持索引写入与主键约束自动创建
  • 统一设置字符串字段长度避免截断
  • 采用append模式支持增量数据更新

2.3 数据写入前处理

在数据写入前,项目对AKShare返回的原始数据进行标准化处理:

# jobs/18h_daily_job.py
data = ak.stock_zh_a_spot_em()
data.columns = ['index', 'code', 'name', 'latest_price', 'quote_change', 'ups_downs', 'volume', 'turnover',
                'amplitude', 'high', 'low', 'open', 'closed', 'quantity_ratio', 'turnover_rate', 'pe_dynamic',
                'pb']

# 数据过滤
data = data.loc[data["code"].apply(stock_a)].loc[data["name"].apply(stock_a_filter_st)].loc[
    data["latest_price"].apply(stock_a_filter_price)]
print(data)
data['date'] = datetime_int  # 修改时间成为int类型

# 删除老数据
del_sql = " DELETE FROM `stock_zh_ah_name` where `date` = '%s' " % datetime_int
***mon.insert(del_sql)

data.set_index('code', inplace=True)
data.drop('index', axis=1, inplace=True)
print(data)
# 写入数据库
***mon.insert_db(data, "stock_zh_ah_name", True, "`date`,`code`")

数据处理流程:

  1. 标准化列名便于后续处理
  2. 过滤非目标股票数据(仅保留A股)
  3. 过滤ST股票和无价格数据
  4. 添加日期标识字段
  5. 删除指定日期的历史数据避免重复
  6. 设置股票代码为索引并写入数据库

2.4 数据查询操作实现

查询操作封装了基础的SQL执行与结果返回:

# libs/***mon.py
def select(sql, params=()):
    with conn() as db:
        print("select sql:" + sql)
        try:
            db.execute(sql, params)
        except Exception as e:
            print("error :", e)
        result = db.fetchall()
        return result
        
def select_count(sql, params=()):
    with conn() as db:
        print("select sql:" + sql)
        try:
            db.execute(sql, params)
        except Exception as e:
            print("error :", e)
        result = db.fetchall()
        # 只有一个数组中的第一个数据
        if len(result) == 1:
            return int(result[0][0])
        else:
            return 0

三、数据处理优化策略

3.1 数据缓存机制

为提高重复数据访问性能,项目实现了基于文件系统的缓存系统:

# libs/***mon.py
bash_stock_tmp = "/data/cache/hist_data_cache/%s/%s/"

def get_hist_data_cache(code, date_start, date_end):
    cache_dir = bash_stock_tmp % (date_end[0:7], date_end)
    # 如果没有文件夹创建一个。月文件夹和日文件夹。方便删除。
    if not os.path.exists(cache_dir):
        os.makedirs(cache_dir)
    cache_file = cache_dir + "%s^%s.gzip.pickle" % (date_end, code)
    # 如果缓存存在就直接返回缓存数据。压缩方式。
    if os.path.isfile(cache_file):
        print("######### read from cache #########", cache_file)
        return pd.read_pickle(cache_file, ***pression="gzip")
    else:
        print("######### get data, write cache #########", code, date_start, date_end)
        stock = ak.stock_zh_a_hist(symbol= code, start_date=date_start, end_date=date_end, adjust="")
        stock.columns = ['date', 'open', 'close', 'high', 'low', 'volume', 'amount', 'amplitude', 'quote_change',
                         'ups_downs', 'turnover']
        if stock is None:
            return None
        stock = stock.sort_index(0)  # 将数据按照日期排序下
        print(stock)
        stock.to_pickle(cache_file, ***pression="gzip")
        return stock

缓存系统特点:

  • 按年月层级组织缓存目录
  • 使用gzip压缩节省存储空间
  • 基于日期和股票代码的缓存键设计
  • 自动缓存失效机制(按日期目录)

3.2 数据重跑与去重策略

为确保数据准确性,项目实现了数据重跑机制,通过先删除后插入的方式保证数据一致性:

# jobs/18h_daily_job.py
# 删除老数据
del_sql = " DELETE FROM `stock_zh_ah_name` where `date` = '%s' " % datetime_int
***mon.insert(del_sql)

# 插入新数据
***mon.insert_db(data, "stock_zh_ah_name", True, "`date`,`code`")

这种策略的优势:

  • 保证每日数据唯一性
  • 支持历史数据重新生成
  • 避免重复数据积累

3.3 数据类型处理优化

针对股票数据特点,项目对数值类型进行了特殊处理:

# jobs/18h_daily_job.py
# 数据保留指定小数位数
stock_dzjy_mrtj["average_price"] = stock_dzjy_mrtj["average_price"].round(2)
stock_dzjy_mrtj["overflow_rate"] = stock_dzjy_mrtj["overflow_rate"].round(4)
stock_dzjy_mrtj["turnover_market_rate"] = stock_dzjy_mrtj["turnover_market_rate"].round(6)

数据类型优化:

  • 价格类数据保留2位小数
  • 比率类数据保留4-6位小数
  • 避免浮点精度问题影响分析结果

四、典型应用场景实现

4.1 股票数据每日更新

项目通过定时任务实现股票数据的每日更新,核心逻辑在18h_daily_job.py中实现:

# jobs/18h_daily_job.py
def stat_all(tmp_datetime):
    datetime_str = (tmp_datetime).strftime("%Y-%m-%d")
    datetime_int = (tmp_datetime).strftime("%Y%m%d")
    print("datetime_str:", datetime_str)
    print("datetime_int:", datetime_int)
    
    # 股票列表数据采集与处理
    try:
        data = ak.stock_zh_a_spot_em()
        # 数据处理与清洗
        # ...
        ***mon.insert_db(data, "stock_zh_ah_name", True, "`date`,`code`")
    except Exception as e:
        print("error :", e)
    
    # 龙虎榜数据采集与处理
    try:
        stock_sina_lhb_ggtj = ak.stock_sina_lhb_ggtj(recent_day="5")
        # 数据处理与清洗
        # ...
        ***mon.insert_db(stock_sina_lhb_ggtj, "stock_sina_lhb_ggtj", True, "`date`,`code`")
    except Exception as e:
        print("error :", e)
    
    # 大宗交易数据采集与处理
    try:
        stock_dzjy_mrtj = ak.stock_dzjy_mrtj(start_date=datetime_str, end_date=datetime_str)
        # 数据处理与清洗
        # ...
        ***mon.insert_db(stock_dzjy_mrtj, "stock_dzjy_mrtj", True, "`date`,`code`")
    except Exception as e:
        print("error :", e)

每日更新流程:

  1. 获取指定日期的股票数据
  2. 多数据源并行处理(股票列表、龙虎榜、大宗交易)
  3. 数据清洗与标准化
  4. 数据写入数据库
  5. 异常捕获与错误处理

4.2 股票数据过滤实现

针对股票数据的特殊性,项目实现了多维度数据过滤:

# jobs/18h_daily_job.py
# 过滤A股股票
def stock_a(code):
    if code.startswith('600') or code.startswith('6006') or code.startswith('601') or code.startswith('000') or code.startswith('001') or code.startswith('002'):
        return True
    else:
        return False

# 过滤ST股票
def stock_a_filter_st(name):
    if name.find("ST") == -1:
        return True
    else:
        return False

# 过滤无价格数据
def stock_a_filter_price(latest_price):
    # float 在 pandas 里面判断空值
    if np.isnan(latest_price):
        return False
    else:
        return True

# 应用过滤条件
data = data.loc[data["code"].apply(stock_a)].loc[data["name"].apply(stock_a_filter_st)].loc[
    data["latest_price"].apply(stock_a_filter_price)]

过滤链的优势:

  • 清晰分离不同过滤规则
  • 便于单独测试和调整
  • 支持链式组合使用
  • 提高代码可读性和维护性

五、性能优化与最佳实践

5.1 数据库性能优化措施

项目采用多种策略优化数据库性能:

  1. 合理的索引设计

    • 所有表均设置主键约束
    • 日期+股票代码的复合主键设计
    • 自动主键创建确保数据唯一性
  2. 批量操作优化

    • 使用Pandas的to_sql批量插入
    • 避免单条记录循环插入
    • 批量删除历史数据
  3. 连接池管理

    • 使用SQLAlchemy引擎管理连接
    • 避免频繁创建和销毁连接

5.2 Pandas数据处理最佳实践

  1. 数据类型优化

    # 明确指定数据类型减少内存占用
    data['date'] = data['date'].astype('int32')
    data['volume'] = data['volume'].astype('int64')
    
  2. 索引优化

    # 设置股票代码为索引加速查询
    data.set_index('code', inplace=True)
    
  3. 链式操作替代中间变量

    # 直接链式处理减少内存占用
    data = data.loc[data["code"].apply(stock_a)].drop('index', axis=1)
    

5.3 错误处理与容错机制

项目实现了完善的错误处理机制:

# libs/***mon.py
def run_with_args(run_fun):
    try:
        # 业务逻辑执行
        run_fun(tmp_datetime_show)
    except Exception as e:
        print("error :", e)
        traceback.print_exc()

错误处理策略:

  • 异常捕获与堆栈打印
  • 错误继续执行机制
  • 关键操作日志记录
  • 数据处理失败提示

六、总结与展望

gh_mirrors/st/stock项目通过精心设计的数据库架构和高效的数据处理流程,实现了股票市场数据的可靠存储与分析。项目的核心优势在于:

  1. 模块化设计:数据访问层与业务逻辑分离,便于维护和扩展
  2. 自动化机制:数据库自动创建、表结构自动生成、主键自动设置
  3. 性能优化:数据缓存、批量操作、索引优化等多重优化策略
  4. 健壮性保障:完善的错误处理、数据去重和重跑机制

未来优化方向:

  1. 实现数据库表结构的版本控制
  2. 添加数据完整性校验机制
  3. 引入时序数据库优化历史数据存储
  4. 实现数据同步的增量更新机制

通过本文介绍的数据库设计与数据处理技巧,你可以构建高效、可靠的股票数据处理系统,为量化分析和投资决策提供坚实的数据基础。

如果你觉得本文对你有帮助,请点赞、收藏并关注项目更新,下期我们将深入探讨股票技术指标的计算与应用实现。

【免费下载链接】stock stock,股票系统。使用python进行开发。 项目地址: https://gitcode.***/gh_mirrors/st/stock

转载请说明出处内容投诉
CSS教程网 » 股票系统gh_mirrors/st/stock数据库设计详解:MySQL与Pandas数据处理技巧

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买