bot_token = 'your_bot_api_key'
url = f"https://api.telegram.org/bot{bot_token}/getUpdates"

response = requests.get(url)
data = response.json()

if data['ok']:
if len(data['result']) > 0:
for update in data['result']:
chat_id = update['message']['chat']['id']
print(f"Chat ID: {chat_id}")
else:
print("No new updates found.")
else:
print("Failed to get updates. Error:", data)

使用金融建模準備(Financial Modeling Prep,簡稱FMP)獲取實時加密貨幣更新

要獲取實時加密貨幣更新并將它們發送到Telegram,我們需要將金融建模準備(FMP)API與Python腳本集成。為了清晰起見,我們將這個過程分為幾個小標題。

  1. 安裝所需的庫
    首先,確保您已安裝必要的Python庫:
pip install websocket-client requests
  1. 設置WebSocket客戶端
    我們將創建一個類來處理WebSocket連接,獲取實時數據,并將更新發送到Telegram。FMPWebSocketClient。
class FMPWebSocketClient:
def __init__(self, api_key, symbol, telegram_token, chat_id):
self.api_key = api_key
self.symbol = symbol
self.telegram_token = telegram_token
self.chat_id = chat_id
self.ws_url = "wss://crypto.financialmodelingprep.com"
self.data = []

api_key:您的FMP API密鑰。
symbol:您想要跟蹤的加密貨幣符號(例如,SOLUSD)。
telegram_token:您的Telegram機器人API令牌。
chat_id:更新將被發送到的聊天ID。

  1. 獲取實時數據
    我們將創建方法來處理WebSocket事件,處理傳入的消息,并發送通知。
 def on_message(self, ws, message):
data = json.loads(message)
if 'price' in data and 'volume' in data:
price = float(data['price'])
volume = float(data['volume'])
timestamp = datetime.utcnow()

data_point = {"price": price, "volume": volume, "timestamp": timestamp}
self.data.append(data_point)

# Keep only the data for the last 5 minutes
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
self.data = [x for x in self.data if x['timestamp'] >= five_minutes_ago]

# Simplified condition to send a message for every new data point
message = f"Symbol: {self.symbol}\nPrice: {price}\nVolume: {volume}\nTime: {timestamp}"
self.send_telegram_message(message)

這個方法處理傳入的WebSocket消息,并將數據點添加到列表中。

  1. 發送通知到Telegram
    我們將創建一個方法,將消息發送到Telegram聊天中。
def send_telegram_message(self, message):
url = f"https://api.telegram.org/bot{self.telegram_token}/sendMessage"
payload = {
"chat_id": self.chat_id,
"text": message
}
response = requests.post(url, data=payload)
print("Response from Telegram API:", response.json())

這個方法構建負載并使用機器人API發送消息到Telegram。 它打印出來自Telegram API的響應以供驗證。

5. 啟動WebSocket客戶端
最后,我們將創建一個方法來啟動WebSocket客戶端并持續獲取數據。

  def on_error(self, ws, error):
print("WebSocket Error:", error)

def on_close(self, ws, close_status_code, close_msg):
print("WebSocket Closed:", close_msg)

def on_open(self, ws):
login_message = json.dumps({
"event": "login",
"data": {"apiKey": self.api_key}
})
ws.send(login_message)
subscribe_message = json.dumps({
"event": "subscribe",
"data": {"symbol": self.symbol}
})
ws.send(subscribe_message)
print("Subscribed to:", self.symbol)

on_open:向WebSocket服務器發送登錄和訂閱消息。
啟動客戶端:

 def start(self):
self.ws = websocket.WebSocketApp(self.ws_url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
self.ws.run_forever()

初始化并啟動WebSocket客戶端以連接到FMP WebSocket端點。

  1. 運行完整流程
    要運行完整流程,使用您的憑證實例化FMPWebSocketClient類并啟動客戶端。
import requests
import time
from datetime import datetime, timedelta

class FMPWebSocketClient:
def __init__(self, api_key, symbol, telegram_token, chat_id):
self.api_key = api_key
self.symbol = symbol
self.telegram_token = telegram_token
self.chat_id = chat_id
self.ws_url = "wss://crypto.financialmodelingprep.com"
self.data = []

def on_message(self, ws, message):
data = json.loads(message)
if 'price' in data and 'volume' in data:
price = float(data['price'])
volume = float(data['volume'])
timestamp = datetime.utcnow()

data_point = {"price": price, "volume": volume, "timestamp": timestamp}
self.data.append(data_point)

# Keep only the data for the last 5 minutes
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
self.data = [x for x in self.data if x['timestamp'] >= five_minutes_ago]

# Simplified condition to send a message for every new data point
message = f"Symbol: {self.symbol}\nPrice: {price}\nVolume: {volume}\nTime: {timestamp}"
self.send_telegram_message(message)

def send_telegram_message(self, message):
url = f"https://api.telegram.org/bot{self.telegram_token}/sendMessage"
payload = {"chat_id": self.chat_id, "text": message}
response = requests.post(url, data=payload)
print("Response from Telegram API:", response.json())

def on_error(self, ws, error):
print("WebSocket Error:", error)

def on_close(self, ws, close_status_code, close_msg):
print("WebSocket Closed:", close_msg)

def on_open(self, ws):
login_message = json.dumps({
"event": "login",
"data": {"apiKey": self.api_key}
})
ws.send(login_message)
subscribe_message = json.dumps({
"event": "subscribe",
"data": {"symbol": self.symbol}
})
ws.send(subscribe_message)
print("Subscribed to:", self.symbol)

def start(self):
self.ws = websocket.WebSocketApp(self.ws_url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
self.ws.run_forever()

# Replace with your actual API key, token, and chat ID
api_key = "your_fmp_api_key"
symbol = "SOLUSD" # Use "ADAUSD" for Cardano or any other supported cryptocurrency symbol
telegram_token = "your_telegram_bot_token"
chat_id = "your_chat_id"

client = FMPWebSocketClient(api_key, symbol, telegram_token, chat_id)
client.start()

結論

通過將?Financial Modeling Prep (FMP) API?與 Telegram 機器人集成,您可以毫不費力地隨時了解實時加密貨幣數據。本指南向您展示了如何設置 Telegram 機器人、從?FMP?獲取實時數據以及向您的聊天發送更新。

我們首先設置 Telegram 機器人,確保它已準備好接收消息。接下來,我們在 Python 中創建了一個 WebSocket 客戶端,用于從?FMP?獲取實時數據。客戶端處理這些數據并根據預定義的條件向 Telegram 發送通知。

該系統允許您接收有關重大市場變化的即時更新,幫助您做出明智的交易決策。無論您是跟蹤 Solana、Cardano 還是任何其他加密貨幣,此設置都能確保您不會錯過重要的市場走勢。

隨意自定義條件并擴展系統以監控多種加密貨幣。這種實時數據與即時消息的集成可以顯著增強您的交易策略和市場意識。

如何找到更多同類API?

冪簡集成是國內領先的API集成管理平臺,專注于為開發者提供全面、高效、易用的API集成解決方案。冪簡API平臺可以通過以下兩種方式找到所需API:通過關鍵詞搜索API、或者從API Hub分類頁進入尋找。

原文鏈接:https://medium.datadriveninvestor.com/how-to-get-instant-cypto-updates-on-telegram-using-fmps-api-1608303501dc

上一篇:

如何記錄 AWS Lambda Go 函數的 API 調用

下一篇:

將 GraphQL 單體遷移至 Apollo Federation
#你可能也喜歡這些API文章!

我們有何不同?

API服務商零注冊

多API并行試用

數據驅動選型,提升決策效率

查看全部API→
??

熱門場景實測,選對API

#AI文本生成大模型API

對比大模型API的內容創意新穎性、情感共鳴力、商業轉化潛力

25個渠道
一鍵對比試用API 限時免費

#AI深度推理大模型API

對比大模型API的邏輯推理準確性、分析深度、可視化建議合理性

10個渠道
一鍵對比試用API 限時免費