Example 6: Simple Market Maker Bot#
This example shows how to create a simple market maker bot that will quote on both sides of the orderbook for a given market. It references the Binance USD-M futures market midpoint as the fair price, and dynamically adjusts quotes as the market moves.
You can market-make on the SOL-PERP market with 20bps edge by running:
python market_maker.py --asset SOL --edge 20
Note
This bot can run using the public RPCs (default option), but if you’re quoting aggressively you may want to run it using your own RPC using the -u option as you’ll likely get rate-limited.
import argparse
import asyncio
import logging
import time
import traceback
from datetime import datetime, timedelta
from typing import List
import anchorpy
import httpx
from binance import AsyncClient, BinanceSocketManager # type: ignore
from solana.exceptions import SolanaRpcException
from solana.rpc.commitment import Commitment, Confirmed
from solana.rpc.types import TxOpts
from zetamarkets_py import utils
from zetamarkets_py.client import Client
from zetamarkets_py.constants import MIN_NATIVE_MIN_LOT_SIZE, POSITION_PRECISION
from zetamarkets_py.types import (
Asset,
Network,
Order,
OrderArgs,
OrderOptions,
OrderType,
Side,
)
class MarketMaker:
def __init__(
self, client: Client, asset: Asset, size: float, edge: float, offset: float, current_open_orders: List[Order]
) -> None:
self.client = client
self.asset = asset
self._is_quoting = False
self.fair_price = None
# feel free to play around with these params
self.edge_bps = edge
self.offset_bps = offset
self.quote_size = size
self.EDGE_REQUOTE_THRESHOLD = 0.25 # only requote if the fair price moves more than 25% of the edge
self.TIF_DURATION = 120 # 2 minutes, can do lower at the expense of quotes expiring when volatility is low
self._ratelimit_until_ts = 0 # timestamp for when we can retry after being rate limited
# get the best bid in the open orders
self.bid_price = max([o.info.price for o in current_open_orders if o.side == Side.Bid], default=None)
self.ask_price = min([o.info.price for o in current_open_orders if o.side == Side.Ask], default=None)
@classmethod
async def load(
cls,
endpoint: str,
wallet: anchorpy.Wallet,
asset: Asset,
size=0.001,
edge=20,
offset=0,
network=Network.MAINNET,
commitment=Confirmed,
):
tx_opts = TxOpts(
skip_preflight=False, skip_confirmation=False, preflight_commitment=commitment
) # skip preflight to save time
client = await Client.load(
endpoint=endpoint,
commitment=commitment,
wallet=wallet,
assets=[asset],
tx_opts=tx_opts,
network=network,
log_level=logging.INFO,
)
open_orders = await client.fetch_open_orders(asset)
return cls(client, asset, size, edge, offset, open_orders)
async def subscribe_fair_price(self):
client = await AsyncClient.create()
bm = BinanceSocketManager(client)
retry_count = 0
while True:
try:
# get the latest bid/ask price from Binance USD-M futures
ts = bm.symbol_ticker_futures_socket(str(self.asset).upper() + "USDT")
async with ts as tscm:
while True:
response = await tscm.recv()
if "e" in response and response["e"] == "error":
# close and restart the socket
print(f"Binance websocket error: {response['m']}")
retry_count += 1
break
ticker = response["data"]
# volume weighted average price based on BBO
binance_vwap = (
float(ticker["b"]) * float(ticker["B"]) + float(ticker["a"]) * float(ticker["A"])
) / (float(ticker["B"]) + float(ticker["A"]))
self.fair_price = binance_vwap
try:
# Use asyncio.create_task to run update_quotes concurrently as to not block the websocket
asyncio.create_task(self.update_quotes())
except Exception as e:
print(f"Exception in update_quotes: {e}")
retry_count += 1
break
except asyncio.CancelledError:
print("Gracefully exiting...")
break
except Exception as e:
print(f"Unexpected error: {e}")
finally:
await client.close_connection()
async def subscribe_zeta_bid(self):
async for orderbook, _ in self.client.subscribe_orderbook(self.asset, Side.Bid):
print(f"Best bid: {orderbook._get_l2(1)}")
async def subscribe_zeta_ask(self):
async for orderbook, _ in self.client.subscribe_orderbook(self.asset, Side.Ask):
print(f"Best ask: {orderbook._get_l2(1)}")
async def subscribe_zeta_price(self):
bid_task = asyncio.create_task(self.subscribe_zeta_bid())
ask_task = asyncio.create_task(self.subscribe_zeta_ask())
await asyncio.gather(bid_task, ask_task)
async def update_quotes(self):
if self._is_quoting:
# print("Already quoting")
return
if time.time() < self._ratelimit_until_ts:
print(f"Rate limited by RPC. Retrying in {self._ratelimit_until_ts - time.time():.1f} seconds")
return
if self.fair_price is None:
print("No fair price yet")
return
# Skip requote if the fair price is within the edge
fair_price = self.fair_price * (1 + self.offset_bps / 10000)
if self.bid_price is not None and self.ask_price is not None:
# Get the latest mark price and calculate bid/ask prices
current_mid_price = (self.bid_price + self.ask_price) / 2
edge = current_mid_price * (self.edge_bps / 10000)
divergence = abs(fair_price - current_mid_price)
print(f"Divergence/edge = {divergence/edge:.2%}")
# If the fair price is within the edge, don't update the quotes
if divergence < edge * self.EDGE_REQUOTE_THRESHOLD:
return
print(f"External price: {self.fair_price}")
print(f"Internal price: {fair_price}")
# Requote
bid_price = fair_price * (1 - self.edge_bps / 10000)
ask_price = fair_price * (1 + self.edge_bps / 10000)
# Set order options
# Orders are sent as post-only to ensure that we don't take liquidity
# We use time-in-force to ensure that the order is cancelled after a certain time and not left hanging
if self.client.network == Network.MAINNET:
expiry_ts = int((datetime.now() + timedelta(seconds=self.TIF_DURATION)).timestamp())
else:
# TIF doesn't work in devnet for some assets afaik
expiry_ts = None
# Use PostOnly to avoid taker fills, use Limit if you want to take
order_opts = OrderOptions(expiry_ts=expiry_ts, order_type=OrderType.PostOnly, client_order_id=1337)
bid_order = OrderArgs(bid_price, self.quote_size, Side.Bid, order_opts)
ask_order = OrderArgs(ask_price, self.quote_size, Side.Ask, order_opts)
# Execute quote!
self._is_quoting = True
try:
print(f"Quoting {self.asset}: ${bid_order.price:.4f}@${ask_order.price:.4f} x {self.quote_size}")
await self.client.replace_orders_for_market(self.asset, [bid_order, ask_order])
# (Re)set the state now that we know we've succesfully quoted
self.bid_price = bid_price
self.ask_price = ask_price
except SolanaRpcException as e:
original_exception = e.__cause__
if (
isinstance(original_exception, httpx.HTTPStatusError)
and original_exception.response.status_code == 429 # HTTP status code for Too Many Requests
):
retry_after = int(original_exception.response.headers.get("Retry-After", 10))
print(f"Rate limited. Retrying after {retry_after} seconds...")
self._ratelimit_until_ts = time.time() + retry_after # Retry after x seconds
else:
print(f"An RPC error occurred: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
self._is_quoting = False
async def run(self):
try:
tasks = [
asyncio.create_task(self.subscribe_zeta_price()),
asyncio.create_task(self.subscribe_fair_price()),
]
# Monitor tasks for exceptions
while True:
await asyncio.sleep(1) # Check every second
for task in tasks:
if task.done() and task.exception() is not None:
e = task.exception()
print(f"An error occurred in a task: {e}")
traceback.print_exception(type(e), e, e.__traceback__)
except KeyboardInterrupt:
print("Exiting...")
finally:
# Cancel all orders on exit
for task in tasks:
task.cancel()
await self.client.cancel_orders_for_market(self.asset)
async def main():
parser = argparse.ArgumentParser(description="Process some blockchain parameters.")
parser.add_argument(
"-n",
"--network",
type=Network,
choices=list(Network),
default=Network.MAINNET,
help="The network to use. Defaults to %(default)s.",
)
parser.add_argument(
"-u",
"--url",
type=str,
help="The endpoint URL (optional).",
)
parser.add_argument(
"-c",
"--commitment",
type=Commitment,
default=Confirmed,
help="The commitment level. Defaults to %(default)s.",
)
parser.add_argument(
"-a",
"--asset",
type=Asset,
choices=list(Asset),
default=Asset.SOL,
help="The asset identifier. Defaults to %(default)s.",
)
parser.add_argument(
"-s",
"--size",
type=float,
default=0.1,
help="The order size. Defaults to %(default)s lots.",
)
parser.add_argument(
"-e",
"--edge",
type=float,
default=20,
help="The quote edge in bps. Defaults to %(default)s bps.",
)
parser.add_argument(
"-o",
"--offset",
type=float,
default=0,
help="The quote offset in bps. Defaults to %(default)s bps.",
)
args = parser.parse_args()
# If endpoint is not specified, get it from the network argument
endpoint = args.url if args.url else utils.cluster_endpoint(args.network)
print(f"Network: {args.network.value}")
print(f"Endpoint: {endpoint}")
print(f"Commitment: {args.commitment}")
print(f"Asset: {args.asset}")
print(f"Size: {args.size} lots")
print(f"Edge: {args.edge} bps")
print(f"Offset: {args.offset} bps")
# Load your wallet
wallet = anchorpy.Wallet.local() # read in local filesystem keypair wallet
# wallet = anchorpy.Wallet(Keypair.from_json(os.environ["SOLANA_PRIVATE_KEY"])) # alternatively from env var
mm = await MarketMaker.load(
endpoint, wallet, args.asset, args.size, args.edge, args.offset, args.network, args.commitment
)
await mm.run()
asyncio.run(main())