1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
| import logging import requests import json from typing import Dict, Any
TOKEN = "" BASE_URL = f"https://api.telegram.org/bot{TOKEN}"
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) logger = logging.getLogger(__name__)
def send_message(chat_id: int, text: str) -> Dict[str, Any]: url = f"{BASE_URL}/sendMessage" payload = { "chat_id": chat_id, "text": text } response = requests.post(url, json=payload) return response.json()
def send_invoice(chat_id: int) -> Dict[str, Any]: """ 发送支付发票 """ url = f"{BASE_URL}/sendInvoice" payload = { "chat_id": chat_id, "title": "商品名称", "description": "商品描述", "payload": "Custom-Payload", "provider_token": "", "currency": "XTR", "prices": [{"label": "商品名称", "amount": 500}], "start_parameter": "start_parameter" } response = requests.post(url, json=payload) return response.json()
def answer_pre_checkout_query(pre_checkout_query_id: str, ok: bool, error_message: str = None) -> Dict[str, Any]: """ 响应预结账查询 """ url = f"{BASE_URL}/answerPreCheckoutQuery" payload = { "pre_checkout_query_id": pre_checkout_query_id, "ok": ok } if error_message and not ok: payload["error_message"] = error_message
response = requests.post(url, json=payload) return response.json()
def handle_update(update: Dict[str, Any]): """ 处理更新消息 """ try: if "message" in update: message = update["message"] chat_id = message["chat"]["id"]
if "text" in message: text = message["text"]
if text == "/start": send_message(chat_id, "你好!选择 /buy 命令进行购买。") elif text == "/buy": send_invoice(chat_id)
if "successful_payment" in message: payment = message["successful_payment"] charge_id = payment["telegram_payment_charge_id"] send_message(chat_id, f"支付成功!您的ID:{charge_id}")
elif "pre_checkout_query" in update: query = update["pre_checkout_query"] if query["invoice_payload"] == "Custom-Payload": answer_pre_checkout_query(query["id"], True) else: answer_pre_checkout_query(query["id"], False, "出现错误...")
except Exception as e: logger.error(f"处理更新时出错: {str(e)}")
def main(): """ 主函数 - 使用长轮询获取更新 """ offset = None
while True: try: params = {"timeout": 30} if offset: params["offset"] = offset
response = requests.get(f"{BASE_URL}/getUpdates", params=params) updates = response.json()
if updates.get("ok") and updates.get("result"): for update in updates["result"]: handle_update(update) offset = update["update_id"] + 1
except Exception as e: logger.error(f"主循环出错: {str(e)}") continue
if __name__ == "__main__": main()
|