Files
server-configs/check_ipo_and_add_to_dida.py
2026-02-13 22:24:27 +08:00

124 lines
3.4 KiB
Python
Executable File

#!/usr/bin/env python3
"""
检查新股/新债并添加到滴答清单
"""
import akshare as ak
import requests
import json
from datetime import datetime
# 滴答清单配置
DIDA_TOKEN = "dp_9a8e7eccb01b44559e061dc58a669037"
DIDA_API = "https://api.dida365.com/open/v1"
def get_bond_ipo():
"""获取可转债数据"""
try:
df = ak.bond_cb_jsl()
bonds = []
for _, row in df.iterrows():
bonds.append({
"code": str(row['代码']),
"name": str(row['转债名称']),
"price": row['现价'],
"premium": row['转股溢价率']
})
return bonds
except Exception as e:
print(f"获取可转债失败: {e}")
return []
def get_stock_ipo():
"""获取A股新股数据"""
try:
df = ak.stock_new_ipo_cninfo()
ipo_list = []
for _, row in df.iterrows():
ipo_list.append({
"code": str(row['证劵代码']),
"name": str(row['证券简称']),
"date": str(row['申购日期']),
"price": row['发行价']
})
return ipo_list
except Exception as e:
print(f"获取新股失败: {e}")
return []
def add_to_dida365(title, due_date=None, tags=None):
"""添加到滴答清单"""
url = f"{DIDA_API}/project/tasks"
headers = {
"Authorization": f"Bearer {DIDA_TOKEN}",
"Content-Type": "application/json"
}
# 构建任务数据
task = {
"title": title,
"status": 2 # 2 = 待办
}
if due_date:
task["dueDate"] = {
"startDate": due_date,
"endDate": due_date,
"isAllDay": True
}
try:
response = requests.post(url, headers=headers, json=task, timeout=10)
if response.status_code in [200, 201]:
print(f"✅ 添加成功: {title}")
return True
else:
print(f"❌ 添加失败 ({response.status_code}): {response.text[:100]}")
return False
except Exception as e:
print(f"❌ 请求失败: {e}")
return False
def main():
print(f"\n{'='*60}")
print("📋 检查新股/新债并添加到滴答清单")
print(f"{'='*60}\n")
today = datetime.now().strftime("%Y-%m-%d")
added_count = 0
# 检查可转债
print("🔍 检查可转债...")
bonds = get_bond_ipo()
if bonds:
print(f" 找到 {len(bonds)} 只可转债")
for bond in bonds[:5]:
title = f"可转债打新: {bond['name']} ({bond['code']})"
if add_to_dida365(title, tags=["可转债"]):
added_count += 1
else:
print(" 暂无可转债数据")
print()
# 检查A股新股
print("🔍 检查A股新股...")
ipo_list = get_stock_ipo()
if ipo_list:
print(f" 找到 {len(ipo_list)} 只新股")
for ipo in ipo_list[:5]:
if ipo['date'] and ipo['date'] >= today:
title = f"A股打新: {ipo['name']} ({ipo['code']})"
due_date = ipo['date']
if add_to_dida365(title, due_date=due_date, tags=["A股"]):
added_count += 1
else:
print(" 暂无新股数据")
print(f"\n{'='*60}")
print(f"✅ 完成!共添加 {added_count} 个任务到滴答清单")
print(f"{'='*60}\n")
if __name__ == "__main__":
main()