Got this idea from Matthew Berman on YouTube.
I do not want to pay for GPT, Anthropic etc. This is a free way to get close to GPT4-o results.
You can apply this to anything, not just trading...

Take this idea and fill your boots!

Introduction:
This is an advanced MT5 (MetaTrader 5) trading bot that uses AI-powered market analysis to make trading decisions in the forex market. The bot leverages large language models (LLMs) to analyze market data, predict future price movements, and execute trades automatically. It's designed to work with multiple currency pairs and timeframes, adapting its strategy based on real-time market conditions and its own performance.
Instructions to use:

Ensure you have MetaTrader 5 installed and set up with a demo or live account.
Install the required Python libraries, including MetaTrader5, pandas, loguru, and any others mentioned in the import statements.
Set up your MT5 account credentials in the login_to_mt5() function.
Customize the SYMBOLS and TIMEFRAMES constants if you want to focus on specific currency pairs or timeframes.
Ensure you have access to the specified AI models (like "llama3-70b-8192") and set up the necessary API keys or endpoints.
Run the script using Python, optionally specifying command-line arguments for the model, reference models, temperature, and max tokens. For example:
python script_name.py --model llama3-70b-8192 --temperature 0.1 --max-tokens 2048
The bot will run continuously, performing analysis cycles every 5 minutes. Monitor the logs for insights and trading activities.

Summary of Strategy and Logic:

Data Collection: The bot collects real-time and historical price data for specified currency pairs and timeframes from MT5.
Initial Analysis: Using an AI model, the bot analyses the market data, identifying trends, support/resistance levels, and generating initial trading recommendations.
Price Prediction: The bot predicts the next candle's price movement and trend.
Performance Evaluation: It compares previous predictions to actual outcomes to assess and improve its predictive accuracy.
Final Analysis: Considering all previous steps, the bot generates a final trading decision, including whether to buy, sell, hold, or close positions.
Risk Management: The bot calculates position sizes based on account equity and predefined risk parameters. It also sets stop-loss and take-profit levels for each trade.
Trade Execution: Based on the final analysis, the bot automatically executes trades through the MT5 platform.
Adaptive Strategy: The bot can adjust its risk parameters and trading strategy based on its performance and market conditions.
Continuous Learning: By storing predictions and trade outcomes, the bot aims to improve its decision-making over time.

The bot incorporates several advanced features, including rate limiting for API calls, circuit breakers to handle errors, and caching of analysis results to optimize performance. It also provides detailed logging for monitoring and debugging purposes.

Rate Limiting: This is built in and there will be times that the bot will need to 'retry' passing the data through a LLM, but it will happen. There is enough time allocated between the 5 minute data gathering timeframe and the 15 minutes to execute trades, 600 seconds (10 minutes).

Enhancements, think prompt chaining where you only need to store historical data, account data and the MoE (LLM) through a number of prompts makes the trading decisions which limits the needs for any additional libraries and long scripts.

Required:
'trading bot'.py (script below)
ultils.py (script below)
.env (file) you need add in your own API keys:
OPENAI_API_KEY="sk-proj-SOME RANDOM NUMBER"
GROQ_API_KEY="gsk_SOME RANDOM NUMBER"

##'trading bot'.py##

import os
import pandas as pd
from collections import defaultdict, deque
from functools import wraps
from loguru import logger
import logging
import time
import re
import queue
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
import MetaTrader5 as mt5
from datetime import datetime, timedelta
import argparse
import json

from utils import generate_with_references

# Constants
SYMBOLS = ['EURUSD', 'GBPUSD', 'AUDUSD', 'GBPJPY', 'NZDJPY', 'USDJPY', 'EURGBP']
TIMEFRAMES = [mt5.TIMEFRAME_M5, mt5.TIMEFRAME_M15, mt5.TIMEFRAME_H1]
HISTORY_HOURS = 4
MAX_CANDLES = HISTORY_HOURS * 12 # 4 hours * 12 (5-minute candles per hour)
MAX_RISK_PER_TRADE = 0.03 # 3% of equity
MAX_DRAWDOWN = 0.15 # 15% max drawdown
DEFAULT_STOP_LOSS_PERCENT = 0.01 # 1% from entry price
DEFAULT_TAKE_PROFIT_PERCENT = 0.02 # 2% from entry price

# Model-specific parameters
DEFAULT_MODEL = "llama3-70b-8192"
DEFAULT_REFERENCE_MODELS = [
"llama3-8b-8192",
"llama3-70b-8192",
"mixtral-8x7b-32768",
"gemma-7b-it",
]
DEFAULT_TEMPERATURE = 0.1
DEFAULT_MAX_TOKENS = 2048

# Model-specific rate limiting parameters
MODEL_RATE_LIMITS = {
"llama3-70b-8192": 6000,
"llama3-8b-8192": 6000,
"mixtral-8x7b-32768": 5000,
"gemma-7b-it": 6000,
}

DELAY_BETWEEN_CALLS = 10

# Retry parameters
MAX_RETRIES = 5
BASE_WAIT = 1
MAX_WAIT = 60

# Global variables for storing historical data
candle_history = {symbol: {tf: deque(maxlen=MAX_CANDLES) for tf in TIMEFRAMES} for symbol in SYMBOLS}
prediction_history = {symbol: {tf: deque(maxlen=2000) for tf in TIMEFRAMES} for symbol in SYMBOLS}
trade_history = {symbol: deque(maxlen=2000) for symbol in SYMBOLS}

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger.add("trading_bot.log", rotation="500 MB")

# Cache global variables
analysis_cache = {}
CACHE_EXPIRY = 300 # 5 minutes in seconds

class RateLimitException(Exception):
pass

class ServiceUnavailableException(Exception):
pass

class CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.last_failure_time = None
self.is_open = False

def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
if self.is_open:
if time.time() - self.last_failure_time > self.reset_timeout:
self.is_open = False
self.failure_count = 0
else:
raise ServiceUnavailableException("Circuit is open")

try:
result = func(*args, **kwargs)
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.is_open = True
raise e

return wrapper

@CircuitBreaker(failure_threshold=5, reset_timeout=300)
def generate_with_rate_limit(model, messages, references, temperature, max_tokens):
try:
output = generate_with_references(
model=model,
messages=messages,
references=references,
temperature=temperature,
max_tokens=max_tokens,
)
update_token_usage(model, max_tokens)
return output
except Exception as e:
error_message = str(e)
if "rate_limit_exceeded" in error_message:
retry_after = max(1, extract_retry_time(error_message) + 1)
logger.info(f"Rate limit exceeded for {model}. Retrying after {retry_after} seconds.")
time.sleep(retry_after)
raise RateLimitException(error_message)
elif "service_unavailable" in error_message:
logger.error(f"Service unavailable for {model}: {error_message}")
raise ServiceUnavailableException(error_message)
else:
logger.error(f"Unexpected error for {model}: {error_message}")
raise

# Token usage tracking
token_usage = defaultdict(lambda: {"tokens": 0, "last_reset": time.time()})

def extract_retry_time(error_message):
"""Extract the retry time from the error message."""
match = re.search(r"Please try again in (\d+\.?\d*)s", error_message)
if match:
return float(match.group(1))
return BASE_WAIT

def update_token_usage(model, tokens):
"""Update token usage for a specific model."""
current_time = time.time()
if current_time - token_usage[model]["last_reset"] >= 60:
token_usage[model] = {"tokens": 0, "last_reset": current_time}
token_usage[model]["tokens"] += tokens

def check_rate_limit(model, tokens):
"""Check if the request would exceed the rate limit."""
current_usage = token_usage[model]["tokens"]
limit = MODEL_RATE_LIMITS.get(model, 5000) # Default to 5000 if model not found
return (current_usage + tokens) > limit

@retry(
wait=wait_exponential(multiplier=1, min=5, max=120),
stop=stop_after_attempt(MAX_RETRIES),
retry=retry_if_exception_type(RateLimitException)
)
def generate_with_rate_limit_retry(model, messages, references, temperature, max_tokens):
estimated_tokens = max_tokens # This is a rough estimate, adjust as needed

if check_rate_limit(model, estimated_tokens):
wait_time = 60 - (time.time() - token_usage[model]["last_reset"])
logger.info(f"Rate limit for {model} would be exceeded. Waiting {wait_time:.2f} seconds.")
time.sleep(max(0, wait_time))

# If messages is a string, wrap it in a list with a single dictionary
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]

return generate_with_rate_limit(model, messages, references, temperature, max_tokens)

def login_to_mt5():
if not mt5.initialize():
logger.error("MT5 initialization failed")
return False

authorized = mt5.login(ACCOUNT No, 'PASSWORD', 'SERVER')
if not authorized:
logger.error("MT5 login failed")
return False

return True

def get_account_info():
account_info = mt5.account_info()
if account_info is None:
logger.error(f"Failed to get account info. Error code: {mt5.last_error()}")
return None

info = {
'balance': account_info.balance,
'equity': account_info.equity,
'margin': account_info.margin,
'free_margin': account_info.margin_free,
'margin_level': account_info.margin_level,
'leverage': account_info.leverage,
'currency': account_info.currency,
'profit': account_info.profit,
'credit': account_info.credit
}

logger.info("Account Information:")
for key, value in info.items():
logger.info(f" {key.capitalize()}: {value}")

return info

def get_open_positions():
positions = mt5.positions_get()
if positions is None:
logger.error(f"Failed to get open positions. Error code: {mt5.last_error()}")
return {}

positions_info = {}
for position in positions:
symbol_info = mt5.symbol_info(position.symbol)
if symbol_info is None:
logger.error(f"Failed to get symbol info for {position.symbol}")
continue

positions_info[position.symbol] = {
'ticket': position.ticket,
'type': 'buy' if position.type == mt5.POSITION_TYPE_BUY else 'sell',
'volume': position.volume,
'open_price': position.price_open,
'current_price': position.price_current,
'sl': position.sl,
'tp': position.tp,
'profit': position.profit,
'swap': position.swap,
'comment': position.comment,
'magic': position.magic,
'symbol': position.symbol,
}

return positions_info

def get_candle_data(symbol, timeframe, num_candles):
rates = mt5.copy_rates_from_pos(symbol, timeframe, 0, num_candles)
if rates is not None:
df = pd.DataFrame(rates)
df['time'] = pd.to_datetime(df['time'], unit='s')
return df
return None

def update_candle_history(symbol, timeframe, new_candle):
candle_history[symbol][timeframe].append(new_candle)

def store_prediction(symbol, timeframe, prediction):
prediction_history[symbol][timeframe].append({
'timestamp': datetime.now(),
'prediction': prediction
})

def store_trade_outcome(symbol, trade_info):
trade_history[symbol].append({
'timestamp': datetime.now(),
'trade_info': trade_info
})

def get_historical_data(symbol, timeframe):
return {
'candles': list(candle_history[symbol][timeframe]),
'predictions': list(prediction_history[symbol][timeframe]),
'trades': list(trade_history[symbol])
}

def get_cached_or_new_analysis(symbol, timeframe, symbol_data, account_info, open_positions, historical_data, model, temperature, max_tokens):
cache_key = f"{symbol}_{timeframe}"
current_time = time.time()

if cache_key in analysis_cache:
cached_analysis, explanation, timestamp = analysis_cache[cache_key]
if current_time - timestamp < CACHE_EXPIRY:
logger.info(f"Using cached analysis for {symbol} on {timeframe} timeframe")
return cached_analysis, explanation

# If not in cache or expired, perform new analysis
analysis, explanation = analyze_market_data(symbol_data, account_info, open_positions, historical_data, model, temperature, max_tokens)
if analysis:
analysis_cache[cache_key] = (analysis, explanation, current_time)
return analysis, explanation

def calculate_position_size(account_info, entry_price, stop_loss, symbol):
risk_amount = account_info['equity'] * MAX_RISK_PER_TRADE
price_difference = abs(entry_price - stop_loss)

if price_difference == 0:
logger.error(f"Invalid stop loss for {symbol}. Entry and stop loss prices are the same.")
return 0

symbol_info = mt5.symbol_info(symbol)
if symbol_info is None:
logger.error(f"Failed to get symbol info for {symbol}")
return 0

tick_size = symbol_info.trade_tick_size
tick_value = symbol_info.trade_tick_value

if tick_size == 0 or tick_value == 0:
logger.error(f"Invalid tick size or value for {symbol}")
return 0

ticks_at_risk = price_difference / tick_size
value_per_tick = tick_value * (symbol_info.volume_min / symbol_info.volume_step)

position_size = risk_amount / (ticks_at_risk * value_per_tick)

# Round down to the nearest allowed trade volume step
position_size = (position_size // symbol_info.volume_step) * symbol_info.volume_step

# Ensure the position size is within allowed limits
position_size = max(symbol_info.volume_min, min(position_size, symbol_info.volume_max))

return position_size

def analyze_market_data(symbol_data, account_info, open_positions, historical_data, model, temperature, max_tokens):
"""Perform initial analysis on market data for a single symbol and timeframe."""
prompt_str = f"""You are an expert forex trading analyst. Analyze the following market data and provide your analysis in a valid JSON format:

Symbol: {symbol_data['symbol']}
Timeframe: {symbol_data['timeframe']}
Latest Candle: {symbol_data['latest_candle'].to_dict()}
Historical Candles: {[candle.to_dict() for candle in historical_data['candles'][-20:]]}

Account Info: {json.dumps(account_info, indent=2)}
Open Positions: {json.dumps(open_positions, indent=2)}

Provide a JSON object with the following structure:
{{
"trend": {{
"short_term": "bullish|bearish|ranging",
"long_term": "bullish|bearish|ranging"
}},
"support_resistance": {{
"support": [list of support levels],
"resistance": [list of resistance levels]
}},
"risk_reward_ratio": float,
"recommendation": {{
"action": "buy|sell|hold|close",
"entry_price": float or null,
"stop_loss": float or null,
"take_profit": float or null
}},
"confidence": int (0-100)
}}

After the JSON object, provide a brief explanation of your analysis.
"""

try:
output = generate_with_rate_limit_retry(model, prompt_str, [], temperature, max_tokens)
logger.info(f"Processed initial analysis for {symbol_data['symbol']} on {symbol_data['timeframe']} timeframe")

json_data, explanation = extract_json_and_explanation(output)
if json_data:
logger.debug(f"Analysis explanation for {symbol_data['symbol']} on {symbol_data['timeframe']} timeframe: {explanation}")
return json_data, explanation
else:
logger.error(f"Failed to extract valid JSON from LLM output for {symbol_data['symbol']} on {symbol_data['timeframe']} timeframe")
return None, None
except Exception as e:
logger.error(f"Error in analyze_market_data for {symbol_data['symbol']} on {symbol_data['timeframe']} timeframe: {str(e)}")
return None, None

def extract_valid_json(text):
"""Attempt to extract valid JSON from a string."""
try:
start = text.index('{')
end = text.rindex('}') + 1
return text[start:end]
except ValueError:
return None

# This function helps diagnose issues with the LLM output
def log_llm_output(output, explanation, symbol, timeframe, step):
logger.debug(f"LLM output for {symbol} on {timeframe} timeframe ({step}):")
logger.debug(json.dumps(output, indent=2))
logger.debug(f"Explanation: {explanation}")

def extract_json_from_llm_output(output):
"""Extract JSON from LLM output that may contain explanatory text."""
try:
# Find the start and end of the JSON object
start = output.find('{')
end = output.rfind('}') + 1
if start != -1 and end != -1:
json_str = output[start:end]
return json.loads(json_str)
else:
return None
except json.JSONDecodeError:
return None

def predict_next_candle(symbol_data, initial_analysis, model, temperature, max_tokens):
"""Predict the next candle data and trend based on initial analysis."""
prompt_str = f"""You are an expert forex trading analyst. Based on the following data and initial analysis, predict the next candle and trend:

Symbol: {symbol_data['symbol']}
Timeframe: {symbol_data['timeframe']}
Latest Candle: {symbol_data['latest_candle'].to_dict()}
Initial Analysis: {json.dumps(initial_analysis, indent=2)}

Provide a JSON object with the following structure:
{{
"next_candle": {{
"open": float,
"high": float,
"low": float,
"close": float
}},
"trend": "bullish|bearish|ranging",
"confidence": int (0-100)
}}

After the JSON object, provide a brief explanation of your prediction.
"""

try:
output = generate_with_rate_limit_retry(model, prompt_str, [], temperature, max_tokens)
logger.info(f"Generated next candle prediction for {symbol_data['symbol']} on {symbol_data['timeframe']} timeframe")

json_data, explanation = extract_json_and_explanation(output)
if json_data:
logger.debug(f"Prediction explanation for {symbol_data['symbol']} on {symbol_data['timeframe']} timeframe: {explanation}")
return json_data, explanation
else:
logger.error(f"Failed to extract valid JSON from LLM output for prediction of {symbol_data['symbol']} on {symbol_data['timeframe']} timeframe")
return None, None
except Exception as e:
logger.error(f"Error in predict_next_candle for {symbol_data['symbol']} on {symbol_data['timeframe']} timeframe: {str(e)}")
return None, None

def compare_prediction_to_actual(symbol_data, prediction, actual_candle, model, temperature, max_tokens):
"""Compare the predicted candle to the actual candle and analyze the accuracy."""
prompt_str = f"""You are an expert forex trading analyst. Compare the following prediction to the actual candle data:

Symbol: {symbol_data['symbol']}
Timeframe: {symbol_data['timeframe']}
Prediction: {json.dumps(prediction, indent=2)}
Actual Candle: {actual_candle.to_dict()}

Analyze:
1. The accuracy of the price prediction (open, high, low, close).
2. The accuracy of the trend prediction.
3. Identify any patterns or biases in the prediction.
4. Suggest improvements for future predictions.
5. Provide an overall accuracy score (0-100%).

Present your analysis in a clear, structured JSON format.
"""

try:
output = generate_with_rate_limit_retry(model, prompt_str, [], temperature, max_tokens)
logger.info(f"Completed prediction comparison for {symbol_data['symbol']} on {symbol_data['timeframe']} timeframe")
return json.loads(output)
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON output for prediction comparison of {symbol_data['symbol']} on {symbol_data['timeframe']} timeframe: {str(e)}")
return None
except Exception as e:
logger.error(f"Error in compare_prediction_to_actual for {symbol_data['symbol']} on {symbol_data['timeframe']} timeframe: {str(e)}")
return None

def generate_final_analysis(symbol_data, initial_analysis, prediction, comparison, trade_history, model, temperature, max_tokens):
"""Generate a final analysis and trading decision based on all previous analyses."""
prompt_str = f"""You are an expert forex trading analyst. Based on all previous analyses, generate a final trading decision:

Symbol: {symbol_data['symbol']}
Timeframe: {symbol_data['timeframe']}
Initial Analysis: {json.dumps(initial_analysis, indent=2)}
Prediction: {json.dumps(prediction, indent=2)}
Prediction Comparison: {json.dumps(comparison, indent=2)}
Recent Trade History: {json.dumps(trade_history[-5:], indent=2)}

Generate a final analysis and decision in the following JSON format:
{{
"final_decision": {{
"action": "buy|sell|hold|close",
"entry_price": float or null,
"stop_loss": float or null,
"take_profit": float or null
}},
"confidence": int (0-100),
"reasoning": string,
"risk_assessment": string,
"strategy_adjustments": {{
"max_risk_per_trade": float or null,
"default_stop_loss_percent": float or null,
"default_take_profit_percent": float or null
}}
}}

After the JSON, provide a brief explanation of your final analysis and decision.
"""

try:
output = generate_with_rate_limit_retry(model, prompt_str, [], temperature, max_tokens)
logger.info(f"Generated final analysis for {symbol_data['symbol']} on {symbol_data['timeframe']} timeframe")

json_data, explanation = extract_json_and_explanation(output)
if json_data:
logger.debug(f"Final analysis explanation for {symbol_data['symbol']} on {symbol_data['timeframe']} timeframe: {explanation}")
return json_data, explanation
else:
logger.error(f"Failed to extract valid JSON from LLM output for final analysis of {symbol_data['symbol']} on {symbol_data['timeframe']} timeframe")
return None, None
except Exception as e:
logger.error(f"Error in generate_final_analysis for {symbol_data['symbol']} on {symbol_data['timeframe']} timeframe: {str(e)}")
return None, None

def execute_trade(symbol, decision, account_info):
"""Execute a trade based on the final decision."""
logger.info(f"Executing trade for {symbol}:")
logger.info(f"Decision: {decision}")

action = decision.get('action')
if not action:
logger.error(f"No action specified for {symbol}")
return

logger.info(f" Action: {action}")

try:
if action in ['buy', 'sell']:
entry_price = mt5.symbol_info_tick(symbol).ask if action == 'buy' else mt5.symbol_info_tick(symbol).bid
stop_loss = decision.get('stop_loss')
take_profit = decision.get('take_profit')

if not stop_loss or not take_profit:
logger.error(f"Missing stop loss or take profit for {symbol}")
return

volume = calculate_position_size(account_info, entry_price, stop_loss, symbol)

request = {
"action": mt5.TRADE_ACTION_DEAL,
"symbol": symbol,
"volume": volume,
"type": mt5.ORDER_TYPE_BUY if action == 'buy' else mt5.ORDER_TYPE_SELL,
"price": entry_price,
"sl": stop_loss,
"tp": take_profit,
"deviation": 20,
"magic": 234000,
"comment": "python script open",
"type_time": mt5.ORDER_TIME_GTC,
"type_filling": mt5.ORDER_FILLING_IOC,
}
result = mt5.order_send(request)
logger.info(f" {action.capitalize()} order sent for {symbol}: {result}")

if result.retcode == mt5.TRADE_RETCODE_DONE:
trade_info = {
'action': action,
'entry_price': result.price,
'stop_loss': stop_loss,
'take_profit': take_profit,
'volume': result.volume,
'ticket': result.order
}
store_trade_outcome(symbol, trade_info)
else:
logger.error(f"Order failed. Error code: {result.retcode}")

elif action == 'modify':
position = mt5.positions_get(symbol=symbol)
if position:
request = {
"action": mt5.TRADE_ACTION_SLTP,
"symbol": symbol,
"sl": decision.get('stop_loss'),
"tp": decision.get('take_profit'),
"position": position[0].ticket,
}
result = mt5.order_send(request)
logger.info(f" Modify order sent for {symbol}: {result}")
else:
logger.warning(f"No open position found for {symbol}. Skipping modification.")

elif action == 'close':
position = mt5.positions_get(symbol=symbol)
if position:
request = {
"action": mt5.TRADE_ACTION_DEAL,
"symbol": symbol,
"volume": position[0].volume,
"type": mt5.ORDER_TYPE_BUY if position[0].type == 1 else mt5.ORDER_TYPE_SELL,
"position": position[0].ticket,
"price": mt5.symbol_info_tick(symbol).bid if position[0].type == 0 else mt5.symbol_info_tick(symbol).ask,
"deviation": 20,
"magic": 234000,
"comment": "python script close",
"type_time": mt5.ORDER_TIME_GTC,
"type_filling": mt5.ORDER_FILLING_IOC,
}
result = mt5.order_send(request)
logger.info(f" Close order sent for {symbol}: {result}")

if result.retcode == mt5.TRADE_RETCODE_DONE:
trade_info = {
'action': 'close',
'close_price': result.price,
'volume': result.volume,
'ticket': position[0].ticket
}
store_trade_outcome(symbol, trade_info)
else:
logger.error(f"Close order failed. Error code: {result.retcode}")
else:
logger.warning(f"No open position found for {symbol}. Skipping close.")

elif action == 'hold':
logger.info(f" Holding position for {symbol}")

except Exception as e:
logger.error(f"Error executing trade for {symbol}: {str(e)}")

def apply_strategy_adjustments(adjustments):
global MAX_RISK_PER_TRADE, DEFAULT_STOP_LOSS_PERCENT, DEFAULT_TAKE_PROFIT_PERCENT

if 'max_risk_per_trade' in adjustments:
MAX_RISK_PER_TRADE = adjustments['max_risk_per_trade']
logger.info(f"Adjusted max risk per trade to: {MAX_RISK_PER_TRADE}")

if 'default_stop_loss_percent' in adjustments:
DEFAULT_STOP_LOSS_PERCENT = adjustments['default_stop_loss_percent']
logger.info(f"Adjusted default stop loss percent to: {DEFAULT_STOP_LOSS_PERCENT}")

if 'default_take_profit_percent' in adjustments:
DEFAULT_TAKE_PROFIT_PERCENT = adjustments['default_take_profit_percent']
logger.info(f"Adjusted default take profit percent to: {DEFAULT_TAKE_PROFIT_PERCENT}")

def extract_json_and_explanation(output):
"""Extract JSON and explanation from LLM output."""
try:
# Find the JSON object
json_start = output.index('{')
json_end = output.rindex('}') + 1
json_str = output[json_start:json_end]
json_data = json.loads(json_str)

# Extract explanation (everything after the JSON object)
explanation = output[json_end:].strip()

return json_data, explanation
except (ValueError, json.JSONDecodeError) as e:
logger.error(f"Failed to extract JSON and explanation: {str(e)}")
return None, None

def main(model=DEFAULT_MODEL, reference_models=DEFAULT_REFERENCE_MODELS, temperature=DEFAULT_TEMPERATURE, max_tokens=DEFAULT_MAX_TOKENS):
logger.info("Starting Advanced MT5 Trading Bot")

if not login_to_mt5():
return

logger.info("Successfully connected to MT5 account.")

while True:
try:
logger.info("--- Starting new analysis cycle ---")

account_info = get_account_info()
if account_info is None:
logger.error("Failed to get account details. Skipping this cycle.")
continue

open_positions = get_open_positions()
logger.info("Current Open Positions:")
for symbol, position in open_positions.items():
logger.info(f" {symbol}: Type: {position['type']}, Volume: {position['volume']}, Profit: {position['profit']}")

for symbol in SYMBOLS:
for timeframe in TIMEFRAMES:
logger.info(f"Analyzing {symbol} on {timeframe} timeframe...")

# Step 1: Get and store candle data
latest_candles = get_candle_data(symbol, timeframe, MAX_CANDLES)
if latest_candles is not None and not latest_candles.empty:
for _, candle in latest_candles.iterrows():
update_candle_history(symbol, timeframe, candle)

symbol_data = {
'symbol': symbol,
'timeframe': timeframe,
'latest_candle': latest_candles.iloc[-1],
'historical_candles': list(candle_history[symbol][timeframe])
}

# Step 2: Initial Analysis (now with caching and explanations)
initial_analysis, analysis_explanation = get_cached_or_new_analysis(
symbol, timeframe, symbol_data, account_info, open_positions,
get_historical_data(symbol, timeframe), model, temperature, max_tokens
)
if initial_analysis is None:
logger.warning(f"Skipping further analysis for {symbol} on {timeframe} timeframe due to analysis failure")
continue

log_llm_output(initial_analysis, analysis_explanation, symbol, timeframe, "Initial Analysis")

# Step 3: Predict next candle
prediction, prediction_explanation = predict_next_candle(symbol_data, initial_analysis, model, temperature, max_tokens)
if prediction is None:
logger.warning(f"Skipping further analysis for {symbol} on {timeframe} timeframe due to prediction failure")
continue
store_prediction(symbol, timeframe, prediction)

log_llm_output(prediction, prediction_explanation, symbol, timeframe, "Prediction")

# Step 4: Compare prediction to actual (using previous prediction)
previous_prediction = prediction_history[symbol][timeframe][-2] if len(prediction_history[symbol][timeframe]) > 1 else None
if previous_prediction:
comparison, comparison_explanation = compare_prediction_to_actual(
symbol_data, previous_prediction['prediction'], symbol_data['latest_candle'],
model, temperature, max_tokens
)
log_llm_output(comparison, comparison_explanation, symbol, timeframe, "Comparison")
else:
comparison = None
comparison_explanation = "No previous prediction available for comparison."

# Step 5: Generate final analysis
final_analysis, final_explanation = generate_final_analysis(
symbol_data, initial_analysis, prediction, comparison,
list(trade_history[symbol]), model, temperature, max_tokens
)
if final_analysis is None:
logger.warning(f"Skipping trade execution for {symbol} on {timeframe} timeframe due to final analysis failure")
continue

log_llm_output(final_analysis, final_explanation, symbol, timeframe, "Final Analysis")

# Step 6: Execute trade based on final analysis
execute_trade(symbol, final_analysis, account_info)

# Apply any strategy adjustments
if 'strategy_adjustments' in final_analysis:
apply_strategy_adjustments(final_analysis['strategy_adjustments'])

time.sleep(DELAY_BETWEEN_CALLS)

logger.info("Completed analysis cycle. Waiting for next cycle...")
time.sleep(300) # Wait for 5 minutes before the next cycle

except Exception as e:
logger.error(f"An error occurred: {e}")
logger.info("Waiting for 60 seconds before retrying...")
time.sleep(60)

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Advanced MT5 Trading Bot")
parser.add_argument("--model", default=DEFAULT_MODEL, help="Main model to use")
parser.add_argument("--reference-models", nargs='+', default=DEFAULT_REFERENCE_MODELS, help="Reference models to use")
parser.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE, help="Temperature for model inference")
parser.add_argument("--max-tokens", type=int, default=DEFAULT_MAX_TOKENS, help="Max tokens for model inference")
args = parser.parse_args()

try:
logger.info("Initializing MT5 Trading Bot")
logger.info(f"Using main model: {args.model}")
logger.info(f"Reference models: {args.reference_models}")
logger.info(f"Temperature: {args.temperature}")
logger.info(f"Max tokens: {args.max_tokens}")

main(args.model, args.reference_models, args.temperature, args.max_tokens)
except KeyboardInterrupt:
logger.info("Bot stopped by user.")
except Exception as e:
logger.error(f"An unexpected error occurred: {str(e)}")
finally:
logger.info("Shutting down MT5 connection...")
mt5.shutdown()
logger.info("MT5 connection closed. Bot terminated.")

##'trading bot'.py##

##utils.py##

import os
import json
import time
import requests
import openai
import copy
from loguru import logger
from dotenv import load_dotenv
load_dotenv()

DEBUG = int(os.environ.get("DEBUG", "0"))


def generate_together(
model,
messages,
max_tokens=2048,
temperature=0.1,
streaming=False,
):

output = None

for sleep_time in [1, 2, 4, 8, 16, 32]:

try:

endpoint = "https://api.groq.com/openai/v1/chat/completions"

if DEBUG:
logger.debug(
f"Sending messages ({len(messages)}) (last message: `{messages[-1]['content'][:20]}...`) to `{model}`."
)

res = requests.post(
endpoint,
json={
"model": model,
"max_tokens": max_tokens,
"temperature": (temperature if temperature > 1e-4 else 0),
"messages": messages,
},
headers={
"Authorization": f"Bearer {os.environ.get('GROQ_API_KEY')}",
},
)
if "error" in res.json():
logger.error(res.json())
if res.json()["error"]["type"] == "invalid_request_error":
logger.info("Input + output is longer than max_position_id.")
return None

output = res.json()["choices"][0]["message"]["content"]

break

except Exception as e:
logger.error(e)
if DEBUG:
logger.debug(f"Msgs: `{messages}`")

logger.info(f"Retry in {sleep_time}s..")
time.sleep(sleep_time)

if output is None:

return output

output = output.strip()

if DEBUG:
logger.debug(f"Output: `{output[:20]}...`.")

return output


def generate_together_stream(
model,
messages,
max_tokens=2048,
temperature=0.7,
):
endpoint = "https://api.groq.com/openai/v1/"
client = openai.OpenAI(
api_key=os.environ.get("GROQ_API_KEY"), base_url=endpoint
)
endpoint = "https://api.groq.com/openai/v1/chat/completions"
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature if temperature > 1e-4 else 0,
max_tokens=max_tokens,
stream=True, # this time, we set stream=True
)

return response


def generate_openai(
model,
messages,
max_tokens=2048,
temperature=0.1,
):

client = openai.OpenAI(
api_key=os.environ.get("OPENAI_API_KEY"),
)

for sleep_time in [1, 2, 4, 8, 16, 32]:
try:

if DEBUG:
logger.debug(
f"Sending messages ({len(messages)}) (last message: `{messages[-1]['content'][:20]}`) to `{model}`."
)

completion = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
)
output = completion.choices[0].message.content
break

except Exception as e:
logger.error(e)
logger.info(f"Retry in {sleep_time}s..")
time.sleep(sleep_time)

output = output.strip()

return output


def inject_references_to_messages(
messages,
references,
):

messages = copy.deepcopy(messages)

system = f"""You have been provided with a set of responses from various open-source models to the latest user query. Your task is to synthesize these responses into a single, high-quality response. It is crucial to critically evaluate the information provided in these responses, recognizing that some of it may be biased or incorrect. Your response should not simply replicate the given answers but should offer a refined, accurate, and comprehensive reply to the instruction. Ensure your response is well-structured, coherent, and adheres to the highest standards of accuracy and reliability.

Responses from models:"""

for i, reference in enumerate(references):

system += f"\n{i+1}. {reference}"

if messages[0]["role"] == "system":

messages[0]["content"] += "\n\n" + system

else:

messages = [{"role": "system", "content": system}] + messages

return messages


def generate_with_references(
model,
messages,
references=[],
max_tokens=2048,
temperature=0.1,
generate_fn=generate_together,
):

if len(references) > 0:

messages = inject_references_to_messages(messages, references)

return generate_fn(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
)
"Or die trying...." Sonny Crockett, Miami Vice 1980's