Real-Time Push

The Rust SDK's PushClient maintains a persistent TCP+TLS connection to the Tiger OpenAPI push server. Messages are encoded with Protobuf and framed with varint32 length prefixes. The client supports connection authentication, subscribe/unsubscribe, per-type callbacks, heartbeat keep-alive, and automatic reconnection. v0.4.0 fixes a Cc dataType dispatcher bug and adds subscribe_cc / unsubscribe_cc and subscribe_market / unsubscribe_market convenience methods.

Quick Start

use std::sync::Arc;
use tigeropen::config::ClientConfig;
use tigeropen::push::{connect, Callbacks, PushClient, SubjectType};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = ClientConfig::builder().build()?;
    let account = config.account.clone();

    let pc = Arc::new(PushClient::new(config, None));

    // Register callbacks
    pc.set_callbacks(Callbacks {
        on_connect:    Some(Arc::new(|| println!("connected"))),
        on_disconnect: Some(Arc::new(|| println!("disconnected"))),
        on_error:      Some(Arc::new(|msg| eprintln!("error: {}", msg))),
        on_kickout:    Some(Arc::new(|msg| println!("kickout: {}", msg))),
        on_quote: Some(Arc::new(|data| {
            println!("[Quote] {} price={:?}", data.symbol, data.latest_price);
        })),
        on_tick: Some(Arc::new(|data| {
            println!("[Tick] {} sn={}", data.symbol, data.sn);
        })),
        on_order: Some(Arc::new(|data| {
            println!("[Order] id={} status={}", data.id, data.status);
        })),
        ..Default::default()
    });

    // Establish the connection (authenticates and spawns read/write/heartbeat loops)
    connect(&pc).await.map_err(|e| format!("connect failed: {}", e))?;

    // Subscribe to market data (symbols as comma-separated string)
    pc.subscribe(&SubjectType::Quote, Some("AAPL,TSLA"), None, None);
    pc.subscribe(&SubjectType::Tick,  Some("AAPL"),      None, None);
    pc.subscribe(&SubjectType::Depth, Some("AAPL"),      None, None);
    pc.subscribe(&SubjectType::Kline, Some("AAPL"),      None, None);

    // Subscribe to account events (pass the trading account)
    pc.subscribe(&SubjectType::Asset,       None, Some(&account), None);
    pc.subscribe(&SubjectType::Position,    None, Some(&account), None);
    pc.subscribe(&SubjectType::Order,       None, Some(&account), None);
    pc.subscribe(&SubjectType::Transaction, None, Some(&account), None);

    tokio::signal::ctrl_c().await?;
    pc.disconnect();
    Ok(())
}

Client Lifecycle

PushClient::new

pub fn new(config: ClientConfig, options: Option<PushClientOptions>) -> PushClient

Creates a new PushClient. Callbacks are registered separately via set_callbacks.

ParameterTypeDescription
configClientConfigLoaded configuration (credentials, account)
optionsOption<PushClientOptions>Optional overrides for URL, intervals, reconnect

PushClientOptions

pub struct PushClientOptions {
    pub push_url:                  Option<String>,
    pub heartbeat_interval_secs:   Option<u64>,
    pub reconnect_interval_secs:   Option<u64>,
    pub auto_reconnect:            Option<bool>,
    pub connect_timeout_secs:      Option<u64>,
}

Defaults: push URL openapi.tigerfintech.com:9883, heartbeat 10s, reconnect 5s (with exponential backoff up to 60s), connect timeout 30s, auto-reconnect true.

use tigeropen::push::{PushClient, PushClientOptions};

let opts = PushClientOptions {
    push_url: Some("openapi.tigerfintech.com:9883".into()),
    heartbeat_interval_secs: Some(10),
    reconnect_interval_secs: Some(5),
    auto_reconnect: Some(true),
    connect_timeout_secs: Some(30),
};
let pc = PushClient::new(config, Some(opts));

connect / disconnect

connect is a free-standing async function that takes &Arc<PushClient>. It establishes the TCP+TLS connection, spawns the write, read, and heartbeat loops, signs and sends the CONNECT authentication message, and waits for the CONNECTED response.

use std::sync::Arc;
use tigeropen::push::{connect, PushClient};

let pc = Arc::new(PushClient::new(config, None));
connect(&pc).await.map_err(|e| format!("connect failed: {}", e))?;

// Graceful shutdown (synchronous)
pc.disconnect();

state

pub fn state(&self) -> ConnectionState

Returns the current connection state.

pub enum ConnectionState {
    Disconnected,
    Connecting,
    Connected,
}
println!("state: {:?}", pc.state());

set_callbacks

pub fn set_callbacks(&self, cb: Callbacks)

Registers the callback handlers. Can be called any time; subsequent pushes will be routed to the newly-registered handlers.

pc.set_callbacks(Callbacks {
    on_quote: Some(Arc::new(|data| {
        println!("{} price={:?}", data.symbol, data.latest_price);
    })),
    ..Default::default()
});

Callbacks

All callback fields live on the Callbacks struct and are Option<Arc<dyn Fn(...) + Send + Sync>>. Payloads are Protobuf-generated types from the pb::* module.

FieldClosure signatureTriggered when
on_connectFn()Authentication completes
on_disconnectFn()Connection closed
on_errorFn(String)Transport / protocol / server error
on_kickoutFn(String)Server forces disconnection
on_quoteFn(pb::QuoteData)Real-time stock quote update
on_tickFn(pb::TradeTickData)Trade tick received
on_depthFn(pb::QuoteDepthData)Order-book depth update
on_optionFn(pb::QuoteData)Option quote update
on_futureFn(pb::QuoteData)Futures quote update
on_klineFn(pb::KlineData)K-line bar update
on_stock_topFn(pb::StockTopData)Stock ranking / top list update
on_option_topFn(pb::OptionTopData)Option ranking / top list update
on_full_tickFn(pb::TickData)Full tick data
on_quote_bboFn(pb::QuoteData)Best bid/offer update
on_assetFn(pb::AssetData)Account asset change
on_positionFn(pb::PositionData)Position change
on_orderFn(pb::OrderStatusData)Order status change
on_transactionFn(pb::OrderTransactionData)Fill / execution received
use std::sync::Arc;
use tigeropen::push::Callbacks;

let callbacks = Callbacks {
    on_quote: Some(Arc::new(|data| {
        println!("{} price={:?} volume={:?}", data.symbol, data.latest_price, data.volume);
    })),
    on_depth: Some(Arc::new(|data| {
        let ask_levels = data.ask.as_ref().map_or(0, |a| a.price.len());
        let bid_levels = data.bid.as_ref().map_or(0, |b| b.price.len());
        println!("{} ask_levels={} bid_levels={}", data.symbol, ask_levels, bid_levels);
    })),
    on_kline: Some(Arc::new(|data| {
        println!("{} O={} H={} L={} C={}", data.symbol, data.open, data.high, data.low, data.close);
    })),
    on_order: Some(Arc::new(|data| {
        println!("id={} symbol={} status={}", data.id, data.symbol, data.status);
    })),
    on_error: Some(Arc::new(|err| eprintln!("push error: {}", err))),
    ..Default::default()
};
pc.set_callbacks(callbacks);

Subscriptions

Subscribe/unsubscribe calls are synchronous (non-async) and return a bool indicating whether the framed message was queued onto the write channel.

subscribe / unsubscribe

pub fn subscribe(
    &self,
    subject: &SubjectType,
    symbols: Option<&str>,   // comma-separated list, e.g. "AAPL,TSLA"
    account: Option<&str>,   // required for account subjects
    market:  Option<&str>,   // e.g. "US" for ranking-style subjects
) -> bool

pub fn unsubscribe(
    &self,
    subject: &SubjectType,
    symbols: Option<&str>,
    account: Option<&str>,
    market:  Option<&str>,
) -> bool

SubjectType variants

pub enum SubjectType {
    Quote, Tick, Depth, Option, Future, Kline,
    StockTop, OptionTop, FullTick, QuoteBbo,
    Asset, Position, Order, Transaction,
}
VariantCallbackTypical args
SubjectType::Quoteon_quotesymbols
SubjectType::Tickon_ticksymbols
SubjectType::Depthon_depthsymbols
SubjectType::Optionon_optionsymbols (OCC identifiers)
SubjectType::Futureon_futuresymbols (futures contract codes)
SubjectType::Klineon_klinesymbols
SubjectType::StockTopon_stock_topmarket (e.g. "US")
SubjectType::OptionTopon_option_topmarket
SubjectType::FullTickon_full_ticksymbols
SubjectType::QuoteBboon_quote_bbosymbols
SubjectType::Asseton_assetaccount
SubjectType::Positionon_positionaccount
SubjectType::Orderon_orderaccount
SubjectType::Transactionon_transactionaccount
// Market data
pc.subscribe(&SubjectType::Quote,  Some("AAPL,TSLA,GOOG"), None, None);
pc.subscribe(&SubjectType::Tick,   Some("AAPL"),           None, None);
pc.subscribe(&SubjectType::Depth,  Some("AAPL"),           None, None);
pc.subscribe(&SubjectType::Kline,  Some("AAPL"),           None, None);
pc.subscribe(&SubjectType::Option, Some("AAPL  250117C00150000"), None, None);
pc.subscribe(&SubjectType::Future, Some("CL2506"),         None, None);
pc.subscribe(&SubjectType::StockTop, None, None, Some("US"));

// Account events
pc.subscribe(&SubjectType::Asset,       None, Some(&account), None);
pc.subscribe(&SubjectType::Position,    None, Some(&account), None);
pc.subscribe(&SubjectType::Order,       None, Some(&account), None);
pc.subscribe(&SubjectType::Transaction, None, Some(&account), None);

// Unsubscribe
pc.unsubscribe(&SubjectType::Quote,  Some("TSLA"), None, None);
pc.unsubscribe(&SubjectType::Option, None, None, None);
pc.unsubscribe(&SubjectType::Order,  None, Some(&account), None);

Subscription State

The client tracks subscriptions internally so they can be restored after an auto-reconnect. The following helpers expose that state.

pub fn add_subscription(&self, subject: SubjectType, symbols: &[String])
pub fn remove_subscription(&self, subject: SubjectType, symbols: Option<&[String]>)
pub fn get_subscriptions(&self) -> HashMap<SubjectType, Vec<String>>

pub fn add_account_sub(&self, subject: SubjectType)
pub fn remove_account_sub(&self, subject: &SubjectType)
pub fn get_account_subscriptions(&self) -> Vec<SubjectType>

Heartbeat & Reconnection

  • Heartbeats are sent automatically on the interval configured by PushClientOptions.heartbeat_interval_secs (default 10s). You can also send one manually via pc.send_heartbeat().
  • When auto_reconnect is true (default) and the connection drops, the client reconnects with exponential backoff (starting at reconnect_interval_secs, capped at 60s) and re-sends all previously tracked subscriptions.
  • TLS hostname verification is relaxed to match the Go SDK (InsecureSkipVerify equivalent).

Convenience Subscription Methods (new in v0.4.0)

subscribe_cc / unsubscribe_cc (new in v0.4.0)

pub fn subscribe_cc(&self, symbols: &str) -> bool
pub fn unsubscribe_cc(&self, symbols: &str) -> bool

Convenience wrappers for cryptocurrency quote subscription. Equivalent to subscribe(&SubjectType::Quote, Some(symbols), None, None). Cc data is delivered via the on_quote callback.

v0.4.0 Bug Fix: Previously Cc-type push data was incorrectly routed to the QuoteBBO fallback branch. Now correctly dispatched to on_quote, consistent with Go/Python/Java SDK behavior.

pc.subscribe_cc("BTC,ETH");
// later
pc.unsubscribe_cc("BTC");

subscribe_market / unsubscribe_market (new in v0.4.0)

pub fn subscribe_market(&self, market: &str) -> bool
pub fn unsubscribe_market(&self, market: &str) -> bool

Convenience wrappers for market status push. Equivalent to subscribe(&SubjectType::Quote, None, None, Some(market)). Market status changes are delivered via the on_quote callback.

pc.subscribe_market("US");
// later
pc.unsubscribe_market("US");