// 1. 指标计算:
// - 计算多种技术指标,如移动平均线(包含5日、20日、60日均线)、相对强弱指标(RSI)、布林带(Bollinger Bands)、MACD等,
// 并能针对不同时间级别(5分钟、15分钟、30分钟、1小时、4小时)分别进行计算,以提供多维度的市场分析视角。
// 2. 交易信号判断:
// - 根据多种指标的组合情况以及K线组合形态等判断买入(黄色向上箭头)和卖出(绿色向下箭头)信号,
// 例如结合均线、布林线、MACD、RSI、箱体轨道、趋势轨道、黄金分割等指标的特定条件来确定交易时机。
// 3. 图形绘制:
// - 绘制箱体,通过寻找一定时间段内的价格高低点确定箱体范围,并绘制上下轨线,辅助判断价格波动区间。
// - 绘制趋势线,示例中以连接特定低点的方式绘制上升趋势线(可根据实际需求进一步完善趋势线绘制逻辑),帮助分析市场趋势。
// - 绘制黄金分割线,依据近期价格的高低点确定黄金分割范围,并绘制出不同比例(如0.236、0.382、0.5、0.618、1等)的黄金分割线,
// 用于预测价格可能的支撑和阻力位。
// 4. 警报提示:
// - 当出现买入或卖出箭头信号且与前一根K线信号状态发生变化时,会发出相应的警报提示,提醒用户关注交易机会。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.patches import Rectangle
import matplotlib.ticker as mticker
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
# 设置样式
plt.style.use('seaborn-darkgrid')
sns.set_palette("deep")
# 生成模拟金融数据
def generate_market_data(start_date, periods=500, freq='5T'):
np.random.seed(42)
date_range = pd.date_range(start=start_date, periods=periods, freq=freq)
base_price = 100
prices = [base_price]
volatility = 0.008
for _ in range(1, periods):
change = np.random.normal(0, volatility)
if np.random.rand() < 0.2: # 20% 的概率出现大幅波动
change *= 3
new_price = prices[-1] * (1 + change)
prices.append(new_price)
df = pd.DataFrame(index=date_range)
df['close'] = prices
df['open'] = df['close'].shift(1) * (1 + np.random.normal(0, volatility/2, periods))
df['high'] = df[['open', 'close']].max(axis=1) * (1 + abs(np.random.normal(0, volatility/3, periods)))
df['low'] = df[['open', 'close']].min(axis=1) * (1 - abs(np.random.normal(0, volatility/3, periods)))
df.iloc[0, 0] = df.iloc[0, 1] # 修正第一行的open值
return df.dropna()
# 技术指标计算
def calculate_technical_indicators(df, timeframes=['5T', '15T', '30T', '1H', '4H']):
# 计算简单移动平均线
df['MA5'] = df['close'].rolling(window=5).mean()
df['MA20'] = df['close'].rolling(window=20).mean()
df['MA60'] = df['close'].rolling(window=60).mean()
# 计算RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['RSI'] = 100 - (100 / (1 + rs))
# 计算布林带
df['SMA20'] = df['close'].rolling(window=20).mean()
df['stddev'] = df['close'].rolling(window=20).std()
df['UpperBand'] = df['SMA20'] + (df['stddev'] * 2)
df['LowerBand'] = df['SMA20'] - (df['stddev'] * 2)
# 计算MACD
exp12 = df['close'].ewm(span=12, adjust=False).mean()
exp26 = df['close'].ewm(span=26, adjust=False).mean()
df['MACD'] = exp12 - exp26
df['Signal'] = df['MACD'].ewm(span=9, adjust=False).mean()
df['Histogram'] = df['MACD'] - df['Signal']
# 多时间级别指标
for tf in timeframes:
resampled = df.resample(tf).agg({'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last'})
resampled[f'MA20_{tf}'] = resampled['close'].rolling(window=20).mean()
resampled[f'RSI_{tf}'] = 100 - (100 / (1 + (resampled['close'].diff().where(lambda x: x>0, 0).rolling(14).mean() /
resampled['close'].diff().where(lambda x: x<0, 0).abs().rolling(14).mean())))
df = df.join(resampled[[f'MA20_{tf}', f'RSI_{tf}']], how='left')
return df.dropna()
# 交易信号判断
def generate_trading_signals(df):
df['Signal'] = 0 # 0: 无信号, 1: 买入, -1: 卖出
# 均线交叉信号
df.loc[(df['MA5'] > df['MA20']) & (df['MA5'].shift(1) <= df['MA20'].shift(1)), 'Signal'] = 1
df.loc[(df['MA5'] < df['MA20']) & (df['MA5'].shift(1) >= df['MA20'].shift(1)), 'Signal'] = -1
# RSI超买超卖信号
df.loc[(df['RSI'] < 30) & (df['RSI'].shift(1) >= 30), 'Signal'] = 1
df.loc[(df['RSI'] > 70) & (df['RSI'].shift(1) <= 70), 'Signal'] = -1
# MACD交叉信号
df.loc[(df['MACD'] > df['Signal']) & (df['MACD'].shift(1) <= df['Signal'].shift(1)), 'Signal'] = 1
df.loc[(df['MACD'] < df['Signal']) & (df['MACD'].shift(1) >= df['Signal'].shift(1)), 'Signal'] = -1
# 布林带突破信号
df.loc[(df['close'] < df['LowerBand']) & (df['close'].shift(1) >= df['LowerBand'].shift(1)), 'Signal'] = 1
df.loc[(df['close'] > df['UpperBand']) & (df['close'].shift(1) <= df['UpperBand'].shift(1)), 'Signal'] = -1
# 避免连续信号
df.loc[df['Signal'] == df['Signal'].shift(1), 'Signal'] = 0
return df
# 绘制技术图表
def plot_technical_analysis(df):
fig = plt.figure(figsize=(16, 14))
gs = fig.add_gridspec(6, 1)
# K线图区域
ax1 = fig.add_subplot(gs[0:4, 0])
# 计算箱体范围
box_start = df.index[-100]
box_df = df.loc[box_start:]
box_high = box_df['high'].max()
box_low = box_df['low'].min()
box_range = box_high - box_low
# 绘制箱体
rect = Rectangle((mdates.date2num(box_start), box_high - box_low, box_low,
width=len(box_df)*0.8, alpha=0.1, color='blue')
ax1.add_patch(rect)
# 绘制K线
for idx, row in df[-200:].iterrows():
color = 'green' if row['close'] >= row['open'] else 'red'
ax1.plot([idx, idx], [row['low'], row['high']], color=color, linewidth=1)
ax1.plot([idx, idx], [row['open'], row['close']], color=color, linewidth=4)
# 绘制移动平均线
ax1.plot(df.index[-200:], df['MA5'][-200:], label='MA5', color='orange', alpha=0.8)
ax1.plot(df.index[-200:], df['MA20'][-200:], label='MA20', color='blue', alpha=0.8)
ax1.plot(df.index[-200:], df['MA60'][-200:], label='MA60', color='purple', alpha=0.8)
# 绘制布林带
ax1.plot(df.index[-200:], df['UpperBand'][-200:], label='Upper Band', color='red', linestyle='--', alpha=0.7)
ax1.plot(df.index[-200:], df['SMA20'][-200:], label='SMA20', color='green', alpha=0.7)
ax1.plot(df.index[-200:], df['LowerBand'][-200:], label='Lower Band', color='blue', linestyle='--', alpha=0.7)
ax1.fill_between(df.index[-200:], df['UpperBand'][-200:], df['LowerBand'][-200:], color='gray', alpha=0.1)
# 绘制趋势线
low_points = df['low'][-100:].nsmallest(3).index
if len(low_points) >= 2:
x = mdates.date2num(low_points.sort_values()[:2])
y = df.loc[low_points.sort_values()[:2], 'low']
coeffs = np.polyfit(x, y, 1)
trend_x = np.linspace(min(x), max(df.index[-100:].map(mdates.date2num)), 100)
trend_y = np.polyval(coeffs, trend_x)
ax1.plot(trend_x, trend_y, 'c--', linewidth=2, label='Trend Line')
# 绘制黄金分割线
swing_high = df['high'][-100:].max()
swing_low = df['low'][-100:].min()
levels = [0, 0.236, 0.382, 0.5, 0.618, 1]
for level in levels:
price_level = swing_low + (swing_high - swing_low) * level
ax1.axhline(y=price_level, color='gold' if level in [0.382, 0.618] else 'goldenrod',
linestyle='--', alpha=0.7)
ax1.text(df.index[-1], price_level, f'{level*100:.1f}%',
verticalalignment='bottom', fontsize=8, color='goldenrod')
# 绘制交易信号
buy_signals = df[df['Signal'] == 1][-200:]
sell_signals = df[df['Signal'] == -1][-200:]
ax1.scatter(buy_signals.index, buy_signals['low']*0.98,
marker='^', color='gold', s=150, label='Buy Signal', zorder=10)
ax1.scatter(sell_signals.index, sell_signals['high']*1.02,
marker='v', color='lime', s=150, label='Sell Signal', zorder=10)
ax1.set_title('Technical Analysis Dashboard', fontsize=16)
ax1.set_ylabel('Price', fontsize=12)
ax1.legend(loc='upper left')
ax1.grid(True, linestyle='--', alpha=0.7)
# 格式化x轴日期
ax1.xaxis.set_major_formatter(mdates.DateFormatter('%m-%d %H:%M'))
# MACD区域
ax2 = fig.add_subplot(gs[4, 0], sharex=ax1)
ax2.plot(df.index[-200:], df['MACD'][-200:], label='MACD', color='blue')
ax2.plot(df.index[-200:], df['Signal'][-200:], label='Signal', color='red')
# 绘制MACD柱状图
colors = ['green' if h >= 0 else 'red' for h in df['Histogram'][-200:]]
ax2.bar(df.index[-200:], df['Histogram'][-200:], width=0.8, color=colors, alpha=0.6)
ax2.axhline(0, color='gray', linestyle='--', alpha=0.7)
ax2.set_ylabel('MACD', fontsize=12)
ax2.legend(loc='upper left')
ax2.grid(True, linestyle='--', alpha=0.7)
# RSI区域
ax3 = fig.add_subplot(gs[5, 0], sharex=ax1)
ax3.plot(df.index[-200:], df['RSI'][-200:], label='RSI', color='purple')
ax3.axhline(70, color='red', linestyle='--', alpha=0.7)
ax3.axhline(30, color='green', linestyle='--', alpha=0.7)
ax3.fill_between(df.index[-200:], 70, 30, color='gray', alpha=0.1)
ax3.set_ylabel('RSI', fontsize=12)
ax3.set_xlabel('Date', fontsize=12)
ax3.set_ylim(0, 100)
ax3.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()
return fig
# 警报系统
def alert_system(df):
last_signal = df['Signal'].iloc[-1]
prev_signal = df['Signal'].iloc[-2]
if last_signal != prev_signal:
if last_signal == 1:
print(f"\n\033[93mALERT: BUY SIGNAL at {df.index[-1]}\033[0m")
print("Recommended Action: Consider entering a long position")
elif last_signal == -1:
print(f"\n\033[92mALERT: SELL SIGNAL at {df.index[-1]}\033[0m")
print("Recommended Action: Consider exiting long positions or shorting")
else:
print(f"\nNo new trading signals at {df.index[-1]}")
# 多时间框架分析
def plot_multi_timeframe_analysis(df):
fig, axes = plt.subplots(5, 1, figsize=(14, 12))
timeframes = ['5T', '15T', '30T', '1H', '4H']
colors = plt.cm.viridis(np.linspace(0, 1, len(timeframes)))
for i, tf in enumerate(timeframes):
resampled = df.resample(tf).agg({'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last'})
resampled['MA20'] = resampled['close'].rolling(window=20).mean()
axes[i].plot(resampled.index, resampled['close'], label=f'{tf} Close', color=colors[i], alpha=0.7)
axes[i].plot(resampled.index, resampled['MA20'], label=f'{tf} MA20', color='red', linestyle='--')
axes[i].set_title(f'{tf} Timeframe Analysis', fontsize=10)
axes[i].legend(loc='upper left', fontsize=8)
axes[i].grid(True, linestyle='--', alpha=0.5)
plt.tight_layout()
return fig
# 主函数
def main():
# 生成模拟数据
start_date = datetime.now() - timedelta(days=30)
df = generate_market_data(start_date, periods=1000, freq='5T')
# 计算技术指标
df = calculate_technical_indicators(df)
# 生成交易信号
df = generate_trading_signals(df)
# 警报系统
alert_system(df)
# 绘制主图表
main_fig = plot_technical_analysis(df)
# 绘制多时间框架分析
multi_fig = plot_multi_timeframe_analysis(df)
plt.show()
if __name__ == "__main__":
main()
|