用Python开发数字货币交易机器人(附源码),


 

众所周知,币圈一天,人间一年。我们进行数字货币交易时,在交易所 APP 或者网站 盯盘并手动下单非常耗时,当币价波动非常大的时候往往会错失良机。这时我们可以创建一个简单的 telegram 交易机器人,来帮助我们进行做空和做多交易。

该机器人可以实现以下功能:

  •  做空交易 - 以指定的价格卖出持有货币并在价格下跌时回购
  •  做多交易 - 指定的价格购买货币并在价格上涨时卖出
  •  列出交易订单
  •  显示可用余额

设置 Telegram 机器人

首先需要一个 Telegram 账号,如果没有的话请自己注册一个。然后与BotFather进行对话,通过输入/newbot来新建一个telegram机器人,根据指示一步步创建并记住你的token。

获取交易所的 API keys

查找你的交易所API文档,看看如何获取对订单和账户余额的访问权限和步骤,记住你的密码和API keys。本例中我们以bitfinex为例,Bitmex交易所是目前市面上交易量最大的比特币期货交易所,交易量和交易深度非常大。

安装依赖包

我们这边用的是Python 3.6版本,同时我们还需要利用CCXT框架获取Bitmex交易所数据,CCXT是一个JavaScript / Python / PHP 开发库,用于数字货币的交易,支持众多的比特币/以太币/山寨币交易市场和交易所API。

CCXT库用于连接数字货币交易所并在世界范围内进行交易和支付处理。使用 ccxt可以快速访问数字货币市场数据,可以用于存储、分析、可视化、指标开发、 量化交易、策略回溯测试、交易机器人程序以及相关的软件工程。

然后我们将使用python-telegram-bot与Telegram进行通讯,对聊天消息做出反应并进行交易。

只需要用下面方法安装以上两个依赖包:

  1. pip install python-telegram-bot ccxt 

我们需要交易机器人实现的基本类功能:

1、获取交易所概况,允许创建订单,列出订单详情并获取余额。这将是以 ccxt 实现的装饰器。

2、交易执行者,因为我们希望自动执行做空和做多交易。

3、即时响应的telegram 机器人。

编写机器人

项目结构如下:

  1. main.py  
  2. \config  
  3. \core  
  4. \model   
  5. \util 

我们将从一个简单的模型开始。因为多空交易两者有很多共同点,可以在\ model中创建一个基类TradeDetails:

  1. import abc  
  2. class TradeDetails(metaclass=abc.ABCMeta):  
  3.     def __init__(self, start_price: float, symbol: str, amount: float, currency: str = "USD"):  
  4.         self.start_price = start_price  
  5.         self.symbol = symbol.upper()  
  6.         self.amount = amount  
  7.         self.currency = currency  
  8.     @property  
  9.     def exchange_symbol(self):  
  10.         return f"{self.symbol.upper()}/{self.currency}"  
  11.     @property  
  12.     @abc.abstractmethod  
  13.     def exit_price(self):  
  14.         pass 
  15.     def __str__(self) -> str:  
  16.         return f"order for {self.amount} {self.exchange_symbol} with enter price: {self.start_price:.5}, " \  
  17.                f"exit_price: {self.exit_price:.5}" 

具体的为:

  •  LongTrade 
  1. from fasttrade.model.trade import TradeDetails  
  2. class LongTrade(TradeDetails):  
  3.     def __init__(self, start_price: float, symbol: str, amount: float, percent_change: float = 0.5,  
  4.                  currency: str = "USD") -> None:  
  5.         super().__init__(start_price, symbol, amount, currency)  
  6.         self.end_price = start_price * (1 + percent_change / 100)  
  7.     @property  
  8.     def exit_price(self):  
  9.         return self.end_price  
  10.     def __str__(self) -> str:  
  11.         return "Long " + super().__str__() 
  •  ShortTrade 
  1. from fasttrade.model.trade import TradeDetails  
  2. class ShortTrade(TradeDetails):  
  3.     def __init__(self, start_price: float, symbol: str, amount: float, percent_change: float = 0.5,  
  4.                  currency: str = "USD") -> None:  
  5.         super().__init__(start_price, symbol, amount, currency)  
  6.         self.end_price = start_price * (1 - percent_change / 100)  
  7.     @property  
  8.     def exit_price(self):  
  9.         return self.end_price  
  10.     def __str__(self) -> str:  
  11.         return "Short " + super().__str__() 

接下来是获取交易所数据:

  1. from ccxt import Exchange, OrderNotFound  
  2. class CryptoExchange:  
  3.     def __init__(self, exchange: Exchange):  
  4.         self.exchange = exchange  
  5.         self.exchange.load_markets()  
  6.     @property  
  7.     def free_balance(self):  
  8.         balance = self.exchange.fetch_free_balance()  
  9.         # surprisingly there are balances with 0, so we need to filter these out  
  10.         return {k: v for k, v in balance.items() if v > 0}  
  11.     def fetch_open_orders(self, symbol: str = None):  
  12.         return self.exchange.fetch_open_orders(symbolsymbol=symbol)  
  13.     def fetch_order(self, order_id: int):  
  14.         return self.exchange.fetch_order(order_id)  
  15.     def cancel_order(self, order_id: int):  
  16.         try:  
  17.             self.exchange.cancel_order(order_id)  
  18.         except OrderNotFound:  
  19.             # treat as success  
  20.             pass 
  21.     def create_sell_order(self, symbol: str, amount: float, price: float):  
  22.         return self.exchange.create_order(symbolsymbol=symbol, type="limit", side="sell", amountamount=amount, priceprice=price)  
  23.     def create_buy_order(self, symbol: str, amount: float, price: float):  
  24.         return self.exchange.create_order(symbolsymbol=symbol, type="limit", side="buy", amountamount=amount, priceprice=price) 

然后,我们将执行交易程序。程序将接受交易所数据和超时情况以检查订单是否完成。当做空时,我们以设定的价格卖出,当价格下降到一定水平时回购。我们使用asyncio协程进行编码,以使等待不会阻塞:

  1. import asyncio  
  2. import logging  
  3. from ccxt import ExchangeError  
  4. from model.longtrade import LongTrade  
  5. from model.shorttrade import ShortTrade  
  6. class TradeExecutor:  
  7.     def __init__(self, exchange, check_timeout: int = 15):  
  8.         self.check_timeout = check_timeout  
  9.         self.exchange = exchange  
  10.     async def execute_trade(self, trade):  
  11.         if isinstance(trade, ShortTrade):  
  12.             await self.execute_short_trade(trade)  
  13.         elif isinstance(trade, LongTrade):  
  14.             await self.execute_long_trade(trade)  
  15.     async def execute_short_trade(self, trade: ShortTrade):  
  16.         sell_price = trade.start_price  
  17.         buy_price = trade.exit_price  
  18.         symbol = trade.exchange_symbol  
  19.         amount = trade.amount  
  20.         order = self.exchange.create_sell_order(symbol, amount, sell_price)  
  21.         logging.info(f'Opened sell order: {amount} of {symbol}. Target sell {sell_price}, buy price {buy_price}')  
  22.         await self._wait_order_complete(order['id'])  
  23.         # post buy order  
  24.         order = self.exchange.create_buy_order(symbol, amount, buy_price)  
  25.         await self._wait_order_complete(order['id'])  
  26.         logging.info(f'Completed short trade: {amount} of {symbol}. Sold at {sell_price} and bought at {buy_price}')  
  27.     async def execute_long_trade(self, trade: LongTrade):  
  28.         buy_price = trade.start_price 
  29.          sell_price = trade.exit_price  
  30.         symbol = trade.exchange_symbol  
  31.         amount = trade.amount  
  32.         order = self.exchange.create_buy_order(symbol, amount, buy_price)  
  33.         logging.info(f'Opened long trade: {amount} of {symbol}. Target buy {buy_price}, sell price {sell_price}')  
  34.         await self._wait_order_complete(order.id)  
  35.         # post sell order  
  36.         order = self.exchange.create_sell_order(symbol, amount, sell_price)  
  37.         await self._wait_order_complete(order.id)  
  38.         logging.info(f'Completed long trade: {amount} of {symbol}. Bought at {buy_price} and sold at {sell_price}')  
  39.     async def _wait_order_complete(self, order_id): 
  40.         status = 'open'  
  41.         while status is 'open':  
  42.             await asyncio.sleep(self.check_timeout)  
  43.             order = self.exchange.fetch_order(order_id)  
  44.             status = order['status']  
  45.         logging.info(f'Finished order {order_id} with {status} status')  
  46.         # do not proceed further if we canceled order  
  47.         if status == 'canceled':  
  48.             raise ExchangeError('Trade has been canceled') 

ccxt使用REST API进行数据传输。它不如某些交易所支持的WebSockets快,但是对于这个简单的机器人来说,速度或许差别。

  1. async def _wait_order_complete(self, order_id):  
  2.         status = 'open'  
  3.         order = None  
  4.         while status is 'open':  
  5.             await asyncio.sleep(self.check_timeout)  
  6.             order = self.exchange.fetch_order(order_id)  
  7.             status = order['status']  
  8.         logging.info(f'Finished order {order_id} with {status} status') 
  9.         # do not proceed further if we canceled order  
  10.         if status == 'canceled':  
  11.             raise ExchangeError('Trade has been canceled')  
  12.         return order 

接下来将创建Telegram机器人,这是最有难度的部分,我们将使其拥有以下指令:

1、列出/取消有效订单

2、显示可用余额

3、建立做多或做空交易

我们还需要对机器人做一些安全限制,使其仅对你的消息做出响应,而其他人则无法使用你的帐户进行交易。

主要是进行做多和做空交易的部分:

1、选择做空或者做多

2、输入数字货币品种

3、输入交易数量

4、所占百分比

5、每个价格

6、显示确认信息

7、显示最终交易信息

我们来创建telegrambot.py并添加以下常量:

  1. SELECTION = "selection"  
  2. SHORT_TRADE = "short_trade"  
  3. LONG_TRADE = "long_trade"  
  4. OPEN_ORDERS = "open_orders"  
  5. FREE_BALANCE = "free_balance"  
  6. CANCEL_ORD = "cancel_order"  
  7. PROCESS_ORD_CANCEL = "process_ord_cancel"  
  8. COIN_NAME = "coin_select"  
  9. PERCENT_CHANGE = "percent_select"  
  10. AMOUNT = "amount"  
  11. PRICE = "price"  
  12. PROCESS_TRADE = "process_trade"  
  13. CONFIRM = "confirm"  
  14. CANCEL = "cancel"  
  15. END_CONVERSATION = ConversationHandler.END 

我们可以通过扩展BaseFilter来实现对user_id的限制。这样机器人必须接受被允许用户的token、id才能执行操作。

  1. class TelegramBot:  
  2.     class PrivateUserFiler(BaseFilter):  
  3.         def __init__(self, user_id):  
  4.             self.user_id = int(user_id)  
  5.         def filter(self, message):  
  6.             return message.from_user.id == self.user_id  
  7.     def __init__(self, token: str, allowed_user_id, trade_executor: TradeExecutor):  
  8.         self.updater = Updater(tokentoken=token)  
  9.         selfself.dispatcher = self.updater.dispatcher  
  10.         self.trade_executor = trade_executor  
  11.         selfself.exchange = self.trade_executor.exchange  
  12.         selfself.private_filter = self.PrivateUserFiler(allowed_user_id)  
  13.         self._prepare() 

在_prepare()函数中,我们将创建所有处理函数并将其附加到调度程序。我们开始与机器人聊天时希望显示的基本选项:

  1. def _prepare(self):  
  2.       # Create our handlers  
  3.       def show_help(bot, update):  
  4.           update.effective_message.reply_text('Type /trade to show options ')  
  5.       def show_options(bot, update):  
  6.           button_list = [  
  7.               [InlineKeyboardButton("Short trade", callback_data=SHORT_TRADE),
  8.                 InlineKeyboardButton("Long trade", callback_data=LONG_TRADE), ],  
  9.               [InlineKeyboardButton("Open orders", callback_data=OPEN_ORDERS),  
  10.                InlineKeyboardButton("Available balance", callback_data=FREE_BALANCE)],  
  11.           ]  
  12.           update.message.reply_text("Trade options:", reply_markup=InlineKeyboardMarkup(button_list))  
  13.           return TRADE_SELECT 

InlineKeyboardButton允许我们将文本选项显示为键盘。这比键入所有命令更为直观。callback_data允许在按下按钮时传递其他数据。show_options返回下一个继续进行对话的处理函数的名称。其他处理函数将使用类似的方法。然后我们执行用户选择的处理程序。在这里,我们主要从一个问题转到另一个问题:   

  1. def process_trade_selection(bot, update, user_data):  
  2.          query = update.callback_query  
  3.          selection = query.data 
  4.          if selection == OPEN_ORDERS:  
  5.              orders = self.exchange.fetch_open_orders()  
  6.              if len(orders) == 0:  
  7.                  bot.edit_message_text(text="You don't have open orders",  
  8.                                        chat_id=query.message.chat_id,  
  9.                                        message_id=query.message.message_id)  
  10.                  return END_CONVERSATION  
  11.              # show the option to cancel active orders  
  12.              keyboard = [ 
  13.                   [InlineKeyboardButton("Ok", callback_data=CONFIRM),  
  14.                   InlineKeyboardButton("Cancel order", callback_data=CANCEL)]  
  15.              ]  
  16.              bot.edit_message_text(text=formatter.format_open_orders(orders),  
  17.                                    chat_id=query.message.chat_id,  
  18.                                    message_id=query.message.message_id,  
  19.                                    reply_markup=InlineKeyboardMarkup(keyboard))  
  20.              # attach opened orders, so that we can cancel by index  
  21.              user_data[OPEN_ORDERS] = orders  
  22.              return CANCEL_ORD  
  23.          elif selection == FREE_BALANCE:  
  24.              balance = self.exchange.free_balance  
  25.              msg = "You don't have any available balance" if len(balance) == 0 \  
  26.                  else f"Your available balance:\n{formatter.format_balance(balance)}"  
  27.              bot.edit_message_text(text=msg,  
  28.                                    chat_id=query.message.chat_id,  
  29.                                    message_id=query.message.message_id)  
  30.              return END_CONVERSATION  
  31.          user_data[TRADE_SELECT] = selection  
  32.          bot.edit_message_text(text=f'Enter coin name for {selection}',  
  33.                                chat_id=query.message.chat_id,  
  34.                                message_id=query.message.message_id)  
  35.          return COIN_NAME 
  36.      def cancel_order(bot, update):  
  37.          query = update.callback_query  
  38.          if query.data == CANCEL:  
  39.              query.message.reply_text('Enter order index to cancel: ')  
  40.              return PROCESS_ORD_CANCEL  
  41.          show_help(bot, update)  
  42.          return END_CONVERSATION  
  43.      def process_order_cancel(bot, update, user_data):  
  44.          idx = int(update.message.text)  
  45.          order = user_data[OPEN_ORDERS][idx]  
  46.          self.exchange.cancel_order(order['id'])  
  47.          update.message.reply_text(f'Canceled order: {formatter.format_order(order)}')  
  48.          return END_CONVERSATION  
  49.      def process_coin_name(bot, update, user_data):  
  50.          user_data[COIN_NAME] = update.message.text.upper()  
  51.          update.message.reply_text(f'What amount of {user_data[COIN_NAME]}')  
  52.          return AMOUNT  
  53.      def process_amount(bot, update, user_data): 
  54.           user_data[AMOUNT] = float(update.message.text)  
  55.          update.message.reply_text(f'What % change for {user_data[AMOUNT]} {user_data[COIN_NAME]}')  
  56.          return PERCENT_CHANGE  
  57.      def process_percent(bot, update, user_data):  
  58.          user_data[PERCENT_CHANGE] = float(update.message.text)  
  59.          update.message.reply_text(f'What price for 1 unit of {user_data[COIN_NAME]}')  
  60.          return PRICE  
  61.      def process_price(bot, update, user_data):  
  62.          user_data[PRICE] = float(update.message.text)  
  63.          keyboard = [  
  64.              [InlineKeyboardButton("Confirm", callback_data=CONFIRM),  
  65.               InlineKeyboardButton("Cancel", callback_data=CANCEL)]  
  66.          ]  
  67.          update.message.reply_text(f"Confirm the trade: '{TelegramBot.build_trade(user_data)}'",  
  68.                                    reply_markup=InlineKeyboardMarkup(keyboard))  
  69.          return PROCESS_TRADE 

最后,我们构建会话处理程序,设置错误处理程序,并将所有处理程序添加到调度程序中。     

  1. def process_trade(bot, update, user_data):  
  2.            query = update.callback_query 
  3.            if query.data == CONFIRM: 
  4.                 trade = TelegramBot.build_trade(user_data)  
  5.                self._execute_trade(trade)  
  6.                update.callback_query.message.reply_text(f'Scheduled: {trade}')  
  7.            else:  
  8.                show_help(bot, update)  
  9.            return END_CONVERSATION  
  10.        def handle_error(bot, update, error):  
  11.            logging.warning('Update "%s" caused error "%s"', update, error)  
  12.            update.message.reply_text(f'Unexpected error:\n{error}')  
  13.        # configure our handlers  
  14.        def build_conversation_handler():  
  15.            entry_handler = CommandHandler('trade', filters=self.private_filter, callback=show_options)  
  16.            conversation_handler = ConversationHandler( 
  17.                 entry_points=[entry_handler],  
  18.                fallbacks=[entry_handler],  
  19.                states={  
  20.                    TRADE_SELECT: [CallbackQueryHandler(process_trade_selection, pass_user_data=True)],  
  21.                    CANCEL_ORD: [CallbackQueryHandler(cancel_order)],  
  22.                    PROCESS_ORD_CANCEL: [MessageHandler(filters=Filters.text, callback=process_order_cancel, pass_user_data=True)],  
  23.                    COIN_NAME: [MessageHandler(filters=Filters.text, callback=process_coin_name, pass_user_data=True)],  
  24.                    AMOUNT: [MessageHandler(Filters.text, callback=process_amount, pass_user_data=True)], 
  25.                     PERCENT_CHANGE: [MessageHandler(Filters.text, callback=process_percent, pass_user_data=True)],  
  26.                    PRICE: [MessageHandler(Filters.text, callback=process_price, pass_user_data=True)],  
  27.                    PROCESS_TRADE: [CallbackQueryHandler(process_trade, pass_user_data=True)],  
  28.                },  
  29.            ) 
  30.             return conversation_handler  
  31.        self.dispatcher.add_handler(CommandHandler('start', filters=self.private_filter, callback=show_help))  
  32.        self.dispatcher.add_handler(build_conversation_handler())  
  33.        self.dispatcher.add_error_handler(handle_error) 

传递用户数据时允许我们向处理程序提供其他user_data参数。这样可以确保机器人从一个处理程序传递到另一个处理程序时,保持所有答复的对话状态。我们需要run_async装饰器在后台执行交易,而又不会阻止机器人对新消息进行响应:

  1. def start_bot(self):  
  2.       self.updater.start_polling()  
  3.   @run_async  
  4.   def _execute_trade(self, trade):  
  5.       loop = asyncio.new_event_loop()  
  6.       task = loop.create_task(self.trade_executor.execute_trade(trade))  
  7.       loop.run_until_complete(task)  
  8.   @staticmethod  
  9.   def build_trade(user_data):  
  10.       current_trade = user_data[TRADE_SELECT]  
  11.       price = user_data[PRICE]  
  12.       coin_name = user_data[COIN_NAME]  
  13.       amount = user_data[AMOUNT]  
  14.       percent_change = user_data[PERCENT_CHANGE]  
  15.       if current_trade == LONG_TRADE:  
  16.           return LongTrade(price, coin_name, amount, percent_change)  
  17.       elif current_trade == SHORT_TRADE:  
  18.           return ShortTrade(price, coin_name, amount, percent_change)  
  19.       else:  
  20.           raise NotImplementedError 

这是用于订单和余额显示的格式化程序:

  1. TITLES = ['idx', 'type', 'remaining', 'symbol', 'price']  
  2. SPACING = [4, 6, 8, 10, 8] 
  3. def format_open_orders(orders) -> str:  
  4.     def join_line(ln): 
  5.         return ' | '.join(str(item).center(SPACING[i]) for i, item in enumerate(ln))  
  6.     title_line = join_line(TITLES) 
  7.     lines = [title_line]  
  8.     for idx, order in enumerate(orders): 
  9.         line = [idx, order['side'], order['remaining'], order['symbol'], order['price']]  
  10.         lines.append(join_line(line))  
  11.     separator_line = '-' * len(title_line)  
  12.     return f"\n{separator_line}\n".join(lines)  
  13. def format_order(order): 
  14.     return f"{order['amount']} {order['symbol']} priced at {order['price']}"  
  15. def format_balance(balance) -> str:  
  16.     coin_balance_as_list = list(f"{coin}: {val}" for coin, val in balance.items())  
  17.     return "\n".join(coin_balance_as_list) 

最后,我们创建main.py并将所有内容归结在一起:

  1. import logging  
  2. import os  
  3. import ccxt  
  4. from core.exchange import CryptoExchange  
  5. from core.telegrambot import TelegramBot  
  6. from core.tradeexcutor import TradeExecutor 
  7. if __name__ == '__main__':  
  8.     logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)  
  9.     c_dir = os.path.dirname(__file__)  
  10.     with open(os.path.join(c_dir, "config/secrets.txt")) as key_file:  
  11.         api_key, secret, telegram_tkn, user_id = key_file.read().splitlines()  
  12.     ccxtccxt_ex = ccxt.bitfinex()  
  13.     ccxt_ex.apiKey = api_key  
  14.     ccxt_ex.secret = secret  
  15.     exchange = CryptoExchange(ccxt_ex)  
  16.     trade_executor = TradeExecutor(exchange)  
  17.     telegram_bot = TelegramBot(telegram_tkn, user_id, trade_executor) 
  18.     telegram_bot.start_bot() 

我们从secrets.txt文件中获取交易所密钥,telegram的token和用户ID,构造核心类并启动机器人。使用以下内容在config文件夹中创建secrets.txt:

  1. # YOUR_API_KEY  
  2. # YOUR_SECRET  
  3. # YOUR_TELEGRAM_TOKEN  
  4. # YOUR_TELEGRAM_USER_ID 

总结

对于想要简化交易并拥有更好使用体验的人来说,该机器人更像是一个辅助工具。它不是最先进的算法交易机器人。后面可以进行以下改进:

  •  当进行做空交易时获取可用余额并显示用户可以根据余额做空的最大值
  •  要求在交易所执行之前验证创建的订单
  •  添加TA指标、信号以通知最佳交易时间
  •  止盈/止损操作和其他统计数据
  •  有策略地根据超时等原因取消订单 

评论关闭