Real-Time Push
The Rust SDK's PushClient maintains a persistent TCP+TLS connection to the Tiger OpenAPI push server. Messages are encoded with Protobuf and framed with varint32 length prefixes. The client supports connection authentication, subscribe/unsubscribe, per-type callbacks, heartbeat keep-alive, and automatic reconnection. v0.4.0 fixes a Cc dataType dispatcher bug and adds subscribe_cc / unsubscribe_cc and subscribe_market / unsubscribe_market convenience methods.
Quick Start
use std::sync::Arc;
use tigeropen::config::ClientConfig;
use tigeropen::push::{connect, Callbacks, PushClient, SubjectType};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = ClientConfig::builder().build()?;
let account = config.account.clone();
let pc = Arc::new(PushClient::new(config, None));
// Register callbacks
pc.set_callbacks(Callbacks {
on_connect: Some(Arc::new(|| println!("connected"))),
on_disconnect: Some(Arc::new(|| println!("disconnected"))),
on_error: Some(Arc::new(|msg| eprintln!("error: {}", msg))),
on_kickout: Some(Arc::new(|msg| println!("kickout: {}", msg))),
on_quote: Some(Arc::new(|data| {
println!("[Quote] {} price={:?}", data.symbol, data.latest_price);
})),
on_tick: Some(Arc::new(|data| {
println!("[Tick] {} sn={}", data.symbol, data.sn);
})),
on_order: Some(Arc::new(|data| {
println!("[Order] id={} status={}", data.id, data.status);
})),
..Default::default()
});
// Establish the connection (authenticates and spawns read/write/heartbeat loops)
connect(&pc).await.map_err(|e| format!("connect failed: {}", e))?;
// Subscribe to market data (symbols as comma-separated string)
pc.subscribe(&SubjectType::Quote, Some("AAPL,TSLA"), None, None);
pc.subscribe(&SubjectType::Tick, Some("AAPL"), None, None);
pc.subscribe(&SubjectType::Depth, Some("AAPL"), None, None);
pc.subscribe(&SubjectType::Kline, Some("AAPL"), None, None);
// Subscribe to account events (pass the trading account)
pc.subscribe(&SubjectType::Asset, None, Some(&account), None);
pc.subscribe(&SubjectType::Position, None, Some(&account), None);
pc.subscribe(&SubjectType::Order, None, Some(&account), None);
pc.subscribe(&SubjectType::Transaction, None, Some(&account), None);
tokio::signal::ctrl_c().await?;
pc.disconnect();
Ok(())
}Client Lifecycle
PushClient::new
pub fn new(config: ClientConfig, options: Option<PushClientOptions>) -> PushClientCreates a new PushClient. Callbacks are registered separately via set_callbacks.
| Parameter | Type | Description |
|---|---|---|
config | ClientConfig | Loaded configuration (credentials, account) |
options | Option<PushClientOptions> | Optional overrides for URL, intervals, reconnect |
PushClientOptions
pub struct PushClientOptions {
pub push_url: Option<String>,
pub heartbeat_interval_secs: Option<u64>,
pub reconnect_interval_secs: Option<u64>,
pub auto_reconnect: Option<bool>,
pub connect_timeout_secs: Option<u64>,
}Defaults: push URL openapi.tigerfintech.com:9883, heartbeat 10s, reconnect 5s (with exponential backoff up to 60s), connect timeout 30s, auto-reconnect true.
use tigeropen::push::{PushClient, PushClientOptions};
let opts = PushClientOptions {
push_url: Some("openapi.tigerfintech.com:9883".into()),
heartbeat_interval_secs: Some(10),
reconnect_interval_secs: Some(5),
auto_reconnect: Some(true),
connect_timeout_secs: Some(30),
};
let pc = PushClient::new(config, Some(opts));connect / disconnect
connect is a free-standing async function that takes &Arc<PushClient>. It establishes the TCP+TLS connection, spawns the write, read, and heartbeat loops, signs and sends the CONNECT authentication message, and waits for the CONNECTED response.
use std::sync::Arc;
use tigeropen::push::{connect, PushClient};
let pc = Arc::new(PushClient::new(config, None));
connect(&pc).await.map_err(|e| format!("connect failed: {}", e))?;
// Graceful shutdown (synchronous)
pc.disconnect();state
pub fn state(&self) -> ConnectionStateReturns the current connection state.
pub enum ConnectionState {
Disconnected,
Connecting,
Connected,
}println!("state: {:?}", pc.state());set_callbacks
pub fn set_callbacks(&self, cb: Callbacks)Registers the callback handlers. Can be called any time; subsequent pushes will be routed to the newly-registered handlers.
pc.set_callbacks(Callbacks {
on_quote: Some(Arc::new(|data| {
println!("{} price={:?}", data.symbol, data.latest_price);
})),
..Default::default()
});Callbacks
All callback fields live on the Callbacks struct and are Option<Arc<dyn Fn(...) + Send + Sync>>. Payloads are Protobuf-generated types from the pb::* module.
| Field | Closure signature | Triggered when |
|---|---|---|
on_connect | Fn() | Authentication completes |
on_disconnect | Fn() | Connection closed |
on_error | Fn(String) | Transport / protocol / server error |
on_kickout | Fn(String) | Server forces disconnection |
on_quote | Fn(pb::QuoteData) | Real-time stock quote update |
on_tick | Fn(pb::TradeTickData) | Trade tick received |
on_depth | Fn(pb::QuoteDepthData) | Order-book depth update |
on_option | Fn(pb::QuoteData) | Option quote update |
on_future | Fn(pb::QuoteData) | Futures quote update |
on_kline | Fn(pb::KlineData) | K-line bar update |
on_stock_top | Fn(pb::StockTopData) | Stock ranking / top list update |
on_option_top | Fn(pb::OptionTopData) | Option ranking / top list update |
on_full_tick | Fn(pb::TickData) | Full tick data |
on_quote_bbo | Fn(pb::QuoteData) | Best bid/offer update |
on_asset | Fn(pb::AssetData) | Account asset change |
on_position | Fn(pb::PositionData) | Position change |
on_order | Fn(pb::OrderStatusData) | Order status change |
on_transaction | Fn(pb::OrderTransactionData) | Fill / execution received |
use std::sync::Arc;
use tigeropen::push::Callbacks;
let callbacks = Callbacks {
on_quote: Some(Arc::new(|data| {
println!("{} price={:?} volume={:?}", data.symbol, data.latest_price, data.volume);
})),
on_depth: Some(Arc::new(|data| {
let ask_levels = data.ask.as_ref().map_or(0, |a| a.price.len());
let bid_levels = data.bid.as_ref().map_or(0, |b| b.price.len());
println!("{} ask_levels={} bid_levels={}", data.symbol, ask_levels, bid_levels);
})),
on_kline: Some(Arc::new(|data| {
println!("{} O={} H={} L={} C={}", data.symbol, data.open, data.high, data.low, data.close);
})),
on_order: Some(Arc::new(|data| {
println!("id={} symbol={} status={}", data.id, data.symbol, data.status);
})),
on_error: Some(Arc::new(|err| eprintln!("push error: {}", err))),
..Default::default()
};
pc.set_callbacks(callbacks);Subscriptions
Subscribe/unsubscribe calls are synchronous (non-async) and return a bool indicating whether the framed message was queued onto the write channel.
subscribe / unsubscribe
pub fn subscribe(
&self,
subject: &SubjectType,
symbols: Option<&str>, // comma-separated list, e.g. "AAPL,TSLA"
account: Option<&str>, // required for account subjects
market: Option<&str>, // e.g. "US" for ranking-style subjects
) -> bool
pub fn unsubscribe(
&self,
subject: &SubjectType,
symbols: Option<&str>,
account: Option<&str>,
market: Option<&str>,
) -> boolSubjectType variants
pub enum SubjectType {
Quote, Tick, Depth, Option, Future, Kline,
StockTop, OptionTop, FullTick, QuoteBbo,
Asset, Position, Order, Transaction,
}| Variant | Callback | Typical args |
|---|---|---|
SubjectType::Quote | on_quote | symbols |
SubjectType::Tick | on_tick | symbols |
SubjectType::Depth | on_depth | symbols |
SubjectType::Option | on_option | symbols (OCC identifiers) |
SubjectType::Future | on_future | symbols (futures contract codes) |
SubjectType::Kline | on_kline | symbols |
SubjectType::StockTop | on_stock_top | market (e.g. "US") |
SubjectType::OptionTop | on_option_top | market |
SubjectType::FullTick | on_full_tick | symbols |
SubjectType::QuoteBbo | on_quote_bbo | symbols |
SubjectType::Asset | on_asset | account |
SubjectType::Position | on_position | account |
SubjectType::Order | on_order | account |
SubjectType::Transaction | on_transaction | account |
// Market data
pc.subscribe(&SubjectType::Quote, Some("AAPL,TSLA,GOOG"), None, None);
pc.subscribe(&SubjectType::Tick, Some("AAPL"), None, None);
pc.subscribe(&SubjectType::Depth, Some("AAPL"), None, None);
pc.subscribe(&SubjectType::Kline, Some("AAPL"), None, None);
pc.subscribe(&SubjectType::Option, Some("AAPL 250117C00150000"), None, None);
pc.subscribe(&SubjectType::Future, Some("CL2506"), None, None);
pc.subscribe(&SubjectType::StockTop, None, None, Some("US"));
// Account events
pc.subscribe(&SubjectType::Asset, None, Some(&account), None);
pc.subscribe(&SubjectType::Position, None, Some(&account), None);
pc.subscribe(&SubjectType::Order, None, Some(&account), None);
pc.subscribe(&SubjectType::Transaction, None, Some(&account), None);
// Unsubscribe
pc.unsubscribe(&SubjectType::Quote, Some("TSLA"), None, None);
pc.unsubscribe(&SubjectType::Option, None, None, None);
pc.unsubscribe(&SubjectType::Order, None, Some(&account), None);Subscription State
The client tracks subscriptions internally so they can be restored after an auto-reconnect. The following helpers expose that state.
pub fn add_subscription(&self, subject: SubjectType, symbols: &[String])
pub fn remove_subscription(&self, subject: SubjectType, symbols: Option<&[String]>)
pub fn get_subscriptions(&self) -> HashMap<SubjectType, Vec<String>>
pub fn add_account_sub(&self, subject: SubjectType)
pub fn remove_account_sub(&self, subject: &SubjectType)
pub fn get_account_subscriptions(&self) -> Vec<SubjectType>Heartbeat & Reconnection
- Heartbeats are sent automatically on the interval configured by
PushClientOptions.heartbeat_interval_secs(default10s). You can also send one manually viapc.send_heartbeat(). - When
auto_reconnectistrue(default) and the connection drops, the client reconnects with exponential backoff (starting atreconnect_interval_secs, capped at60s) and re-sends all previously tracked subscriptions. - TLS hostname verification is relaxed to match the Go SDK (
InsecureSkipVerifyequivalent).
Convenience Subscription Methods (new in v0.4.0)
subscribe_cc / unsubscribe_cc (new in v0.4.0)
pub fn subscribe_cc(&self, symbols: &str) -> bool
pub fn unsubscribe_cc(&self, symbols: &str) -> boolConvenience wrappers for cryptocurrency quote subscription. Equivalent to subscribe(&SubjectType::Quote, Some(symbols), None, None). Cc data is delivered via the on_quote callback.
v0.4.0 Bug Fix: Previously
Cc-type push data was incorrectly routed to theQuoteBBOfallback branch. Now correctly dispatched toon_quote, consistent with Go/Python/Java SDK behavior.
pc.subscribe_cc("BTC,ETH");
// later
pc.unsubscribe_cc("BTC");subscribe_market / unsubscribe_market (new in v0.4.0)
pub fn subscribe_market(&self, market: &str) -> bool
pub fn unsubscribe_market(&self, market: &str) -> boolConvenience wrappers for market status push. Equivalent to subscribe(&SubjectType::Quote, None, None, Some(market)). Market status changes are delivered via the on_quote callback.
pc.subscribe_market("US");
// later
pc.unsubscribe_market("US");Updated 14 days ago
