#!/usr/bin/env python3 """ 检查新股/新债并添加到滴答清单 """ import akshare as ak import requests import json from datetime import datetime # 滴答清单配置 DIDA_TOKEN = "dp_9a8e7eccb01b44559e061dc58a669037" DIDA_API = "https://api.dida365.com/open/v1" def get_bond_ipo(): """获取可转债数据""" try: df = ak.bond_cb_jsl() bonds = [] for _, row in df.iterrows(): bonds.append({ "code": str(row['代码']), "name": str(row['转债名称']), "price": row['现价'], "premium": row['转股溢价率'] }) return bonds except Exception as e: print(f"获取可转债失败: {e}") return [] def get_stock_ipo(): """获取A股新股数据""" try: df = ak.stock_new_ipo_cninfo() ipo_list = [] for _, row in df.iterrows(): ipo_list.append({ "code": str(row['证劵代码']), "name": str(row['证券简称']), "date": str(row['申购日期']), "price": row['发行价'] }) return ipo_list except Exception as e: print(f"获取新股失败: {e}") return [] def add_to_dida365(title, due_date=None, tags=None): """添加到滴答清单""" url = f"{DIDA_API}/project/tasks" headers = { "Authorization": f"Bearer {DIDA_TOKEN}", "Content-Type": "application/json" } # 构建任务数据 task = { "title": title, "status": 2 # 2 = 待办 } if due_date: task["dueDate"] = { "startDate": due_date, "endDate": due_date, "isAllDay": True } try: response = requests.post(url, headers=headers, json=task, timeout=10) if response.status_code in [200, 201]: print(f"✅ 添加成功: {title}") return True else: print(f"❌ 添加失败 ({response.status_code}): {response.text[:100]}") return False except Exception as e: print(f"❌ 请求失败: {e}") return False def main(): print(f"\n{'='*60}") print("📋 检查新股/新债并添加到滴答清单") print(f"{'='*60}\n") today = datetime.now().strftime("%Y-%m-%d") added_count = 0 # 检查可转债 print("🔍 检查可转债...") bonds = get_bond_ipo() if bonds: print(f" 找到 {len(bonds)} 只可转债") for bond in bonds[:5]: title = f"可转债打新: {bond['name']} ({bond['code']})" if add_to_dida365(title, tags=["可转债"]): added_count += 1 else: print(" 暂无可转债数据") print() # 检查A股新股 print("🔍 检查A股新股...") ipo_list = get_stock_ipo() if ipo_list: print(f" 找到 {len(ipo_list)} 只新股") for ipo in ipo_list[:5]: if ipo['date'] and ipo['date'] >= today: title = f"A股打新: {ipo['name']} ({ipo['code']})" due_date = ipo['date'] if add_to_dida365(title, due_date=due_date, tags=["A股"]): added_count += 1 else: print(" 暂无新股数据") print(f"\n{'='*60}") print(f"✅ 完成!共添加 {added_count} 个任务到滴答清单") print(f"{'='*60}\n") if __name__ == "__main__": main()