Async client for the Kite Connect WebSocket API
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: make initial commit for open source release

+1046
+2
.gitignore
··· 1 + /target 2 + /Cargo.lock
+26
Cargo.toml
··· 1 + [package] 2 + authors = ["Kaushik Chakraborty <git@kaushikc.org>"] 3 + name = "kiteticker-async" 4 + version = "0.1.0" 5 + edition = "2021" 6 + license = "Apache-2.0" 7 + readme = "README.md" 8 + repository = "https://github.com/kaychaks/kiteticker-async" 9 + documentation = "https://docs.rs/kiteticker-async/0.1.0/kiteticker-async/" 10 + description = """ 11 + Async version of the ticker module of the kiteconnect-rs crate. 12 + """ 13 + categories = ["asynchronous", "finance"] 14 + keywords = ["ticker", "zerodha", "web-sockets", "trading", "real-time"] 15 + 16 + [dependencies] 17 + serde_json = "1.0" 18 + serde = { version = "1.0", features = ["derive"] } 19 + tokio = { version = "1.28.2", features = ["full"] } 20 + tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] } 21 + futures-util = { version = "0.3.28", features = ["sink"] } 22 + tokio-stream = { version = "0.1.14", features = ["full"] } 23 + url = "2.4.1" 24 + 25 + [dev-dependencies] 26 + tokio = { version = "1", features = ["test-util"] }
+17
justfile
··· 1 + cargo-doc-command := "cargo doc --no-deps" 2 + 3 + check: 4 + cargo check 5 + 6 + test: 7 + cargo test 8 + 9 + build: 10 + cargo build 11 + 12 + doc: 13 + rm -r target/doc 14 + {{cargo-doc-command}} 15 + doc-open: 16 + rm -r target/doc 17 + {{cargo-doc-command}} --open
+3
rustfmt.toml
··· 1 + tab_spaces = 2 2 + max_width = 80 3 + use_field_init_shorthand = true
+30
src/lib.rs
··· 1 + #![allow( 2 + clippy::cognitive_complexity, 3 + clippy::large_enum_variant, 4 + clippy::needless_doctest_main 5 + )] 6 + #![warn( 7 + missing_debug_implementations, 8 + rust_2018_idioms, 9 + unreachable_pub 10 + )] 11 + #![doc(test( 12 + no_crate_inject, 13 + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) 14 + ))] 15 + 16 + //! Async implementation of Kite Connect's WebSocket Steaming API 17 + //! 18 + //! This crate provides types to subscribe and receive live quotes for instruments during market hours via WebSockets. 19 + //! The response is parsed and converted into Rust types. 20 + //! The WebSocket connection is managed by the library and reconnected automatically. 21 + //! 22 + 23 + mod models; 24 + pub use models::{ 25 + Depth, DepthItem, Exchange, Mode, Request, TextMessage, Tick, TickMessage, 26 + TickerMessage, OHLC, 27 + }; 28 + 29 + pub mod ticker; 30 + pub use ticker::{KiteTickerAsync, KiteTickerSubscriber};
+492
src/models.rs
··· 1 + use serde::{Deserialize, Serialize}; 2 + use std::{ops::Div, time::Duration}; 3 + 4 + fn value(input: &[u8]) -> Option<u32> { 5 + let value = i32::from_be_bytes(input[0..=3].try_into().unwrap()); 6 + value.try_into().ok() 7 + } 8 + 9 + fn value_short(input: &[u8]) -> Option<u16> { 10 + let value = i16::from_be_bytes(input[0..=1].try_into().unwrap()); 11 + value.try_into().ok() 12 + } 13 + 14 + fn price(input: &[u8], exchange: &Exchange) -> Option<f64> { 15 + let value = i32::from_be_bytes(input[0..4].try_into().unwrap()) as f64; 16 + if exchange.divisor() > 0_f64 { 17 + Some(value.div(exchange.divisor())) 18 + } else { 19 + None 20 + } 21 + } 22 + 23 + pub(crate) fn packet_length(bs: &[u8]) -> usize { 24 + i16::from_be_bytes(bs[0..=1].try_into().unwrap()) as usize 25 + } 26 + 27 + #[derive(Debug, Clone, Default)] 28 + /// 29 + /// Quote packet structure 30 + /// 31 + pub struct Tick { 32 + pub mode: Mode, 33 + pub instrument_token: u32, 34 + pub exchange: Exchange, 35 + pub is_tradable: bool, 36 + pub is_index: bool, 37 + 38 + pub last_traded_qty: Option<u32>, 39 + pub avg_traded_price: Option<f64>, 40 + pub last_price: Option<f64>, 41 + pub volume_traded: Option<u32>, 42 + pub total_buy_qty: Option<u32>, 43 + pub total_sell_qty: Option<u32>, 44 + pub ohlc: Option<OHLC>, 45 + 46 + pub last_traded_timestamp: Option<Duration>, 47 + pub oi: Option<u32>, 48 + pub oi_day_high: Option<u32>, 49 + pub oi_day_low: Option<u32>, 50 + pub exchange_timestamp: Option<Duration>, 51 + 52 + pub net_change: Option<f64>, 53 + pub depth: Option<Depth>, 54 + } 55 + 56 + impl Tick { 57 + fn set_instrument_token(&mut self, input: &[u8]) -> &mut Self { 58 + self.instrument_token = value(&input[0..=3]).unwrap(); 59 + self.exchange = ((self.instrument_token & 0xFF) as usize).into(); 60 + self 61 + } 62 + 63 + fn set_change(&mut self) -> &mut Self { 64 + self.net_change = self 65 + .ohlc 66 + .as_ref() 67 + .map(|o| o.close) 68 + .map(|close_price| { 69 + if let Some(last_price) = self.last_price { 70 + if close_price == 0_f64 { 71 + return None; 72 + } else { 73 + Some(((last_price - close_price) * 100.0).div(close_price)) 74 + } 75 + } else { 76 + None 77 + } 78 + }) 79 + .unwrap_or_default(); 80 + self 81 + } 82 + } 83 + 84 + impl From<&[u8]> for Tick { 85 + fn from(input: &[u8]) -> Self { 86 + let mut tick = Tick::default(); 87 + 88 + let parse_ltp = |t: &mut Tick, i: &[u8]| { 89 + // 0 - 4 bytes : instrument token 90 + t.set_instrument_token(i); 91 + // 4 - 8 bytes : ltp 92 + if let Some(bs) = i.get(4..8) { 93 + t.mode = Mode::LTP; 94 + t.last_price = price(bs, &t.exchange); 95 + } 96 + }; 97 + 98 + let parse_quote = |t: &mut Tick, i: &[u8], is_index: bool| { 99 + if is_index { 100 + if let Some(bs) = i.get(8..28) { 101 + t.mode = Mode::Quote; 102 + // 8 - 24 bytes : ohlc 103 + t.ohlc = OHLC::from(&bs[0..16], &t.exchange); 104 + // 24 - 28 bytes : Price change 105 + // t.net_change = price(&bs[16..=19], &t.exchange); 106 + t.set_change(); 107 + } 108 + } else { 109 + if let Some(bs) = i.get(8..44) { 110 + t.mode = Mode::Quote; 111 + // 8 - 12 bytes : last traded quantity 112 + t.last_traded_qty = value(&bs[0..4]); 113 + // 12 - 16 bytes : avg traded price 114 + t.avg_traded_price = price(&bs[4..8], &t.exchange); 115 + // 16 - 20 bytes : volume traded today 116 + t.volume_traded = value(&bs[8..12]); 117 + // 20 - 24 bytes : total buy quantity 118 + t.total_buy_qty = value(&bs[12..16]); 119 + // 24 - 28 bytes : total sell quantity 120 + t.total_sell_qty = value(&bs[16..20]); 121 + // 28 - 44 bytes : ohlc 122 + t.ohlc = OHLC::from(&bs[20..36], &t.exchange); 123 + 124 + t.set_change(); 125 + } 126 + } 127 + }; 128 + 129 + let parse_full = |t: &mut Tick, i: &[u8], is_index: bool| { 130 + if is_index { 131 + if let Some(bs) = i.get(28..32) { 132 + t.mode = Mode::Full; 133 + // 28 - 32 bytes : exchange time 134 + t.exchange_timestamp = 135 + value(bs).map(|x| Duration::from_secs(x.into())); 136 + } 137 + } else { 138 + if let Some(bs) = i.get(44..184) { 139 + t.mode = Mode::Full; 140 + // 44 - 48 bytes : last traded timestamp 141 + t.last_traded_timestamp = 142 + value(&bs[0..4]).map(|x| Duration::from_secs(x.into())); 143 + 144 + // 48 - 52 bytes : oi 145 + t.oi = value(&bs[4..8]); 146 + // 52 - 56 bytes : oi day high 147 + t.oi_day_high = value(&bs[8..12]); 148 + // 56 - 60 bytes : oi day low 149 + t.oi_day_low = value(&bs[12..16]); 150 + // 60 - 64 bytes : exchange time 151 + t.exchange_timestamp = 152 + value(&bs[16..20]).map(|x| Duration::from_secs(x.into())); 153 + // 64 - 184 bytes : market depth 154 + t.depth = Depth::from(&bs[20..140], &t.exchange); 155 + } 156 + } 157 + }; 158 + 159 + parse_ltp(&mut tick, input); 160 + if !tick.exchange.is_tradable() { 161 + tick.is_index = true; 162 + tick.is_tradable = false; 163 + 164 + parse_quote(&mut tick, input, true); 165 + parse_full(&mut tick, input, true); 166 + } else { 167 + tick.is_index = false; 168 + tick.is_tradable = true; 169 + 170 + parse_quote(&mut tick, input, false); 171 + parse_full(&mut tick, input, false); 172 + } 173 + 174 + tick 175 + } 176 + } 177 + 178 + #[derive(Debug, Clone, Default)] 179 + /// 180 + /// OHLC packet structure 181 + /// 182 + pub struct OHLC { 183 + pub open: f64, 184 + pub high: f64, 185 + pub low: f64, 186 + pub close: f64, 187 + } 188 + 189 + impl OHLC { 190 + fn from(value: &[u8], exchange: &Exchange) -> Option<Self> { 191 + if let Some(bs) = value.get(0..16) { 192 + Some(OHLC { 193 + open: price(&bs[0..=3], exchange).unwrap(), 194 + high: price(&bs[4..=7], exchange).unwrap(), 195 + low: price(&bs[8..=11], exchange).unwrap(), 196 + close: price(&bs[12..=15], exchange).unwrap(), 197 + }) 198 + } else { 199 + None 200 + } 201 + } 202 + } 203 + 204 + #[derive(Debug, Clone, Default)] 205 + /// 206 + /// Market depth packet structure 207 + /// 208 + pub struct Depth { 209 + pub buy: [DepthItem; 5], 210 + pub sell: [DepthItem; 5], 211 + } 212 + 213 + impl Depth { 214 + fn from(input: &[u8], exchange: &Exchange) -> Option<Self> { 215 + if let Some(bs) = input.get(0..120) { 216 + let parse_depth_item = |v: &[u8], start: usize| { 217 + v.get(start..start + 10) 218 + .and_then(|xs| DepthItem::from(xs, exchange)) 219 + .unwrap_or_default() 220 + }; 221 + let mut depth = Depth::default(); 222 + for i in 0..5 { 223 + let start = i * 12; 224 + depth.buy[i] = parse_depth_item(bs, start) 225 + } 226 + for i in 0..5 { 227 + let start = 60 + i * 12; 228 + depth.sell[i] = parse_depth_item(bs, start); 229 + } 230 + 231 + Some(depth) 232 + } else { 233 + None 234 + } 235 + } 236 + } 237 + 238 + #[derive(Debug, Clone, Default)] 239 + /// 240 + /// Structure for each market depth entry 241 + /// 242 + pub struct DepthItem { 243 + pub qty: u32, 244 + pub price: f64, 245 + pub orders: u16, 246 + } 247 + 248 + impl DepthItem { 249 + pub fn from(input: &[u8], exchange: &Exchange) -> Option<Self> { 250 + if let Some(bs) = input.get(0..10) { 251 + Some(DepthItem { 252 + qty: value(&bs[0..=3]).unwrap(), 253 + price: price(&bs[4..=7], exchange).unwrap(), 254 + orders: value_short(&bs[8..=9]).unwrap(), 255 + }) 256 + } else { 257 + None 258 + } 259 + } 260 + } 261 + 262 + #[derive(Debug, Clone)] 263 + /// 264 + /// Parsed message from websocket 265 + /// 266 + pub enum TickerMessage { 267 + /// Quote packets for subscribed tokens 268 + Tick(Vec<TickMessage>), 269 + /// Error response 270 + Error(String), 271 + /// Order postback 272 + Order(serde_json::Value), 273 + /// Messages and alerts from broker 274 + Message(serde_json::Value), 275 + /// Websocket closing frame 276 + ClosingMessage(serde_json::Value), 277 + } 278 + 279 + impl From<TextMessage> for TickerMessage { 280 + fn from(value: TextMessage) -> Self { 281 + let message_type: TextMessageType = value.message_type.into(); 282 + match message_type { 283 + TextMessageType::Order => Self::Order(value.data), 284 + TextMessageType::Error => Self::Error(value.data.to_string()), 285 + TextMessageType::Message => Self::Message(value.data), 286 + } 287 + } 288 + } 289 + 290 + #[derive(Debug, Clone, Default)] 291 + /// 292 + /// Parsed quote packet 293 + /// 294 + pub struct TickMessage { 295 + pub instrument_token: u32, 296 + pub content: Tick, 297 + } 298 + 299 + impl TickMessage { 300 + pub(crate) fn new(instrument_token: u32, content: Tick) -> Self { 301 + Self { 302 + instrument_token, 303 + content, 304 + } 305 + } 306 + } 307 + 308 + #[derive(Debug, Clone, Default)] 309 + /// 310 + /// Exchange options 311 + /// 312 + pub enum Exchange { 313 + #[default] 314 + NSE, 315 + NFO, 316 + CDS, 317 + BSE, 318 + BFO, 319 + BCD, 320 + MCX, 321 + MCXSX, 322 + INDICES, 323 + } 324 + 325 + impl Exchange { 326 + fn divisor(&self) -> f64 { 327 + match self { 328 + Self::CDS => 100_000_0.0, 329 + Self::BCD => 100_0.0, 330 + _ => 100.0, 331 + } 332 + } 333 + 334 + fn is_tradable(&self) -> bool { 335 + match self { 336 + Self::INDICES => false, 337 + _ => true, 338 + } 339 + } 340 + } 341 + 342 + impl From<usize> for Exchange { 343 + fn from(value: usize) -> Self { 344 + match value { 345 + 9 => Self::INDICES, 346 + 8 => Self::MCXSX, 347 + 7 => Self::MCX, 348 + 6 => Self::BCD, 349 + 5 => Self::BFO, 350 + 4 => Self::BSE, 351 + 3 => Self::CDS, 352 + 2 => Self::NFO, 353 + 1 => Self::NSE, 354 + _ => Self::NSE, 355 + } 356 + } 357 + } 358 + 359 + #[derive( 360 + Debug, Clone, Deserialize, Serialize, Default, PartialEq, PartialOrd, 361 + )] 362 + #[serde(rename_all = "lowercase")] 363 + /// 364 + /// Modes in which packets are streamed 365 + /// 366 + pub enum Mode { 367 + Full, 368 + #[default] 369 + Quote, 370 + LTP, 371 + } 372 + 373 + impl TryFrom<usize> for Mode { 374 + type Error = String; 375 + fn try_from(value: usize) -> Result<Self, Self::Error> { 376 + match value { 377 + 8 => Ok(Self::LTP), 378 + 44 => Ok(Self::Quote), 379 + 184 => Ok(Self::Full), 380 + _ => Err(format!("Invalid packet size: {}", value)), 381 + } 382 + } 383 + } 384 + 385 + #[derive(Clone, Debug, Deserialize, Serialize)] 386 + #[serde(rename_all = "lowercase")] 387 + /// 388 + /// Websocket request actions 389 + /// 390 + enum RequestActions { 391 + Subscribe, 392 + Unsubscribe, 393 + Mode, 394 + } 395 + 396 + #[derive(Clone, Debug, Deserialize, Serialize)] 397 + #[serde(untagged)] 398 + /// 399 + /// Websocket request data 400 + /// 401 + enum RequestData { 402 + InstrumentTokens(Vec<u32>), 403 + InstrumentTokensWithMode(Mode, Vec<u32>), 404 + } 405 + 406 + #[derive(Debug, Clone, Deserialize, Serialize)] 407 + /// 408 + /// Websocket request structure 409 + /// 410 + pub struct Request { 411 + a: RequestActions, 412 + v: RequestData, 413 + } 414 + 415 + impl Request { 416 + fn new(action: RequestActions, value: RequestData) -> Request { 417 + Request { 418 + a: action, 419 + v: value, 420 + } 421 + } 422 + 423 + /// 424 + /// Subscribe to a list of instrument tokens 425 + /// 426 + pub fn subscribe(instrument_tokens: Vec<u32>) -> Request { 427 + Request::new( 428 + RequestActions::Subscribe, 429 + RequestData::InstrumentTokens(instrument_tokens), 430 + ) 431 + } 432 + 433 + /// 434 + /// Subscribe to a list of instrument tokens with mode 435 + /// 436 + pub fn mode(mode: Mode, instrument_tokens: Vec<u32>) -> Request { 437 + Request::new( 438 + RequestActions::Mode, 439 + RequestData::InstrumentTokensWithMode(mode, instrument_tokens), 440 + ) 441 + } 442 + 443 + /// 444 + /// Unsubscribe from a list of instrument tokens 445 + /// 446 + pub fn unsubscribe(instrument_tokens: Vec<u32>) -> Request { 447 + Request::new( 448 + RequestActions::Unsubscribe, 449 + RequestData::InstrumentTokens(instrument_tokens), 450 + ) 451 + } 452 + } 453 + 454 + impl ToString for Request { 455 + fn to_string(&self) -> String { 456 + serde_json::to_string(self) 457 + .expect("failed to serialize TickerInput to JSON") 458 + } 459 + } 460 + 461 + #[derive(Debug, Clone)] 462 + /// 463 + /// Postbacks and non-binary message types 464 + /// 465 + enum TextMessageType { 466 + /// Order postback 467 + Order, 468 + /// Error response 469 + Error, 470 + /// Messages and alerts from the broker 471 + Message, 472 + } 473 + 474 + impl From<String> for TextMessageType { 475 + fn from(value: String) -> Self { 476 + match value.as_str() { 477 + "order" => Self::Order, 478 + "error" => Self::Error, 479 + _ => Self::Message, 480 + } 481 + } 482 + } 483 + 484 + #[derive(Debug, Clone, Deserialize, Serialize)] 485 + /// 486 + /// Postback and non-binary message structure 487 + /// 488 + pub struct TextMessage { 489 + #[serde(rename = "type")] 490 + message_type: String, 491 + data: serde_json::Value, 492 + }
+476
src/ticker.rs
··· 1 + use futures_util::{stream::iter, SinkExt, StreamExt}; 2 + use serde_json::json; 3 + use std::{collections::HashMap, sync::Arc}; 4 + use tokio::net::TcpStream; 5 + use tokio::sync::Mutex; 6 + use tokio_tungstenite::{ 7 + connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream, 8 + }; 9 + 10 + use crate::models::{ 11 + packet_length, Mode, Request, TextMessage, Tick, TickMessage, TickerMessage, 12 + }; 13 + 14 + #[derive(Debug, Clone)] 15 + /// 16 + /// The WebSocket client for connecting to Kite Connect's streaming quotes service. 17 + /// 18 + pub struct KiteTickerAsync { 19 + #[allow(dead_code)] 20 + api_key: String, 21 + #[allow(dead_code)] 22 + access_token: String, 23 + ws_stream: Arc<Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>>, 24 + } 25 + 26 + impl KiteTickerAsync { 27 + /// Establish a connection with the Kite WebSocket server 28 + pub async fn connect( 29 + api_key: &str, 30 + access_token: &str, 31 + ) -> Result<Self, String> { 32 + let socket_url = format!( 33 + "wss://{}?api_key={}&access_token={}", 34 + "ws.kite.trade", api_key, access_token 35 + ); 36 + let url = url::Url::parse(socket_url.as_str()).unwrap(); 37 + 38 + let (ws_stream, _) = connect_async(url).await.map_err(|e| e.to_string())?; 39 + 40 + Ok(KiteTickerAsync { 41 + api_key: api_key.to_string(), 42 + access_token: access_token.to_string(), 43 + ws_stream: Arc::new(Mutex::new(ws_stream)), 44 + }) 45 + } 46 + 47 + /// Subscribes the client to a list of instruments 48 + pub async fn subscribe( 49 + mut self, 50 + instrument_tokens: &[u32], 51 + mode: Option<Mode>, 52 + ) -> Result<KiteTickerSubscriber, String> { 53 + self 54 + .subscribe_cmd(instrument_tokens, mode.clone()) 55 + .await 56 + .expect("failed to subscribe"); 57 + let st = instrument_tokens 58 + .to_vec() 59 + .iter() 60 + .map(|t| (t.clone(), mode.to_owned().unwrap_or_default())) 61 + .collect(); 62 + 63 + Ok(KiteTickerSubscriber { 64 + ticker: self, 65 + subscribed_tokens: st, 66 + }) 67 + } 68 + 69 + /// Close the websocket connection 70 + pub async fn close(&mut self) -> Result<(), String> { 71 + let mut ws_stream = self.ws_stream.lock().await; 72 + ws_stream.close(None).await.map_err(|x| x.to_string())?; 73 + Ok(()) 74 + } 75 + 76 + async fn subscribe_cmd( 77 + &mut self, 78 + instrument_tokens: &[u32], 79 + mode: Option<Mode>, 80 + ) -> Result<(), String> { 81 + let mut msgs = iter(vec![ 82 + Ok(Message::Text( 83 + Request::subscribe(instrument_tokens.to_vec()).to_string(), 84 + )), 85 + Ok(Message::Text( 86 + Request::mode(mode.unwrap_or_default(), instrument_tokens.to_vec()) 87 + .to_string(), 88 + )), 89 + ]); 90 + 91 + let mut ws_stream = self.ws_stream.lock().await; 92 + 93 + ws_stream 94 + .send_all(msgs.by_ref()) 95 + .await 96 + .expect("failed to send subscription message"); 97 + 98 + Ok(()) 99 + } 100 + 101 + async fn unsubscribe_cmd( 102 + &mut self, 103 + instrument_tokens: &[u32], 104 + ) -> Result<(), String> { 105 + let mut ws_stream = self.ws_stream.lock().await; 106 + ws_stream 107 + .send(Message::Text( 108 + Request::unsubscribe(instrument_tokens.to_vec()).to_string(), 109 + )) 110 + .await 111 + .expect("failed to send unsubscribe message"); 112 + Ok(()) 113 + } 114 + 115 + async fn set_mode_cmd( 116 + &mut self, 117 + instrument_tokens: &[u32], 118 + mode: Mode, 119 + ) -> Result<(), String> { 120 + let mut ws_stream = self.ws_stream.lock().await; 121 + ws_stream 122 + .send(Message::Text( 123 + Request::mode(mode, instrument_tokens.to_vec()).to_string(), 124 + )) 125 + .await 126 + .expect("failed to send set mode message"); 127 + Ok(()) 128 + } 129 + } 130 + 131 + #[derive(Debug, Clone)] 132 + /// 133 + /// The Websocket client that entered in a pub/sub mode once the client subscribed to a list of instruments 134 + /// 135 + pub struct KiteTickerSubscriber { 136 + ticker: KiteTickerAsync, 137 + subscribed_tokens: HashMap<u32, Mode>, 138 + } 139 + 140 + impl KiteTickerSubscriber { 141 + /// Get the list of subscribed instruments 142 + pub fn get_subscribed(&self) -> Vec<u32> { 143 + self 144 + .subscribed_tokens 145 + .clone() 146 + .into_keys() 147 + .collect::<Vec<_>>() 148 + } 149 + 150 + /// get all tokens common between subscribed tokens and input tokens 151 + /// and if the input is empty then all subscribed tokens will be unsubscribed 152 + fn get_subscribed_or(&self, tokens: &[u32]) -> Vec<u32> { 153 + if tokens.len() == 0 { 154 + self.get_subscribed() 155 + } else { 156 + tokens 157 + .iter() 158 + .filter(|t| self.subscribed_tokens.contains_key(t)) 159 + .map(|t| t.clone()) 160 + .collect::<Vec<_>>() 161 + } 162 + } 163 + 164 + /// Subscribe to new tokens 165 + pub async fn subscribe( 166 + &mut self, 167 + tokens: &[u32], 168 + mode: Option<Mode>, 169 + ) -> Result<(), String> { 170 + self.subscribed_tokens.extend( 171 + tokens 172 + .iter() 173 + .map(|t| (t.clone(), mode.clone().unwrap_or_default())), 174 + ); 175 + let tks = self.get_subscribed(); 176 + dbg!(tks.clone()); 177 + self.ticker.subscribe_cmd(tks.as_slice(), None).await?; 178 + Ok(()) 179 + } 180 + 181 + /// Change the mode of the subscribed instrument tokens 182 + pub async fn set_mode( 183 + &mut self, 184 + instrument_tokens: &[u32], 185 + mode: Mode, 186 + ) -> Result<(), String> { 187 + let tokens = self.get_subscribed_or(instrument_tokens); 188 + self.ticker.set_mode_cmd(tokens.as_slice(), mode).await 189 + } 190 + 191 + /// Unsubscribe provided subscribed tokens, if input is empty then all subscribed tokens will unsubscribed 192 + /// 193 + /// Tokens in the input which are not part of the subscribed tokens will be ignored. 194 + pub async fn unsubscribe( 195 + &mut self, 196 + instrument_tokens: &[u32], 197 + ) -> Result<(), String> { 198 + let tokens = self.get_subscribed_or(instrument_tokens); 199 + dbg!(tokens.clone()); 200 + self.ticker.unsubscribe_cmd(tokens.as_slice()).await 201 + } 202 + 203 + /// Get the next message from the server, waiting if necessary. 204 + /// If the result is None then server is terminated 205 + pub async fn next_message(&mut self) -> Result<Option<TickerMessage>, String> { 206 + let mut ws_stream = self.ticker.ws_stream.lock().await; 207 + match ws_stream.next().await { 208 + Some(message) => match message { 209 + Ok(msg) => Ok(self.process_message(msg)), 210 + Err(e) => Err(e.to_string()), 211 + }, 212 + None => Ok(None), 213 + } 214 + } 215 + 216 + fn process_message(&self, message: Message) -> Option<TickerMessage> { 217 + match message { 218 + Message::Text(text_message) => self.process_text_message(text_message), 219 + Message::Binary(ref binary_message) => { 220 + if binary_message.len() < 2 { 221 + return Some(TickerMessage::Tick(vec![])); 222 + } else { 223 + self.process_binary(binary_message.as_slice()) 224 + } 225 + } 226 + Message::Close(closing_message) => closing_message.map(|c| { 227 + TickerMessage::ClosingMessage(json!({ 228 + "code": c.code.to_string(), 229 + "reason": c.reason.to_string() 230 + })) 231 + }), 232 + Message::Ping(_) => unimplemented!(), 233 + Message::Pong(_) => unimplemented!(), 234 + Message::Frame(_) => unimplemented!(), 235 + } 236 + } 237 + 238 + fn process_binary(&self, binary_message: &[u8]) -> Option<TickerMessage> { 239 + // 0 - 2 : number of packets in the message 240 + let num_packets = 241 + i16::from_be_bytes(binary_message[0..=1].try_into().unwrap()) as usize; 242 + if num_packets > 0 { 243 + Some(TickerMessage::Tick( 244 + [0..num_packets] 245 + .into_iter() 246 + .fold((vec![], 2), |(mut acc, start), _| { 247 + // start - start + 2 : length of the packet 248 + let packet_len = packet_length(&binary_message[start..start + 2]); 249 + let next_start = start + 2 + packet_len; 250 + let tick = Tick::from(&binary_message[start + 2..next_start]); 251 + acc.push(TickMessage::new(tick.instrument_token, tick)); 252 + (acc, next_start) 253 + }) 254 + .0, 255 + )) 256 + } else { 257 + None 258 + } 259 + } 260 + 261 + fn process_text_message( 262 + &self, 263 + text_message: String, 264 + ) -> Option<TickerMessage> { 265 + dbg!(text_message.clone()); 266 + serde_json::from_str::<TextMessage>(&text_message) 267 + .map(|x| x.into()) 268 + .ok() 269 + } 270 + } 271 + 272 + #[cfg(test)] 273 + mod tests { 274 + use super::*; 275 + use tokio::select; 276 + 277 + async fn check<F>( 278 + mode: Mode, 279 + token: u32, 280 + sb: &mut KiteTickerSubscriber, 281 + assertions: Option<F>, 282 + ) where 283 + F: Fn(Vec<TickMessage>) -> (), 284 + { 285 + loop { 286 + match sb.next_message().await { 287 + Ok(message) => match message { 288 + Some(TickerMessage::Tick(xs)) => { 289 + if xs.len() == 0 { 290 + continue; 291 + } 292 + assertions.map(|f| f(xs.clone())).or_else(|| { 293 + let tick_message = xs.first().unwrap(); 294 + assert!(tick_message.instrument_token == token); 295 + assert_eq!(tick_message.content.mode, mode); 296 + Some(()) 297 + }); 298 + break; 299 + } 300 + _ => { 301 + dbg!(message); 302 + continue; 303 + } 304 + }, 305 + _ => { 306 + assert!(false); 307 + break; 308 + } 309 + } 310 + } 311 + } 312 + 313 + #[tokio::test] 314 + async fn test_ticker() { 315 + let api_key = std::env::var("KITE_API_KEY").unwrap(); 316 + let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap(); 317 + let ticker = KiteTickerAsync::connect(&api_key, &access_token).await; 318 + 319 + assert_eq!(ticker.is_ok(), true); 320 + 321 + let ticker = ticker.unwrap(); 322 + let token = 94977; // bata 323 + let mode = Mode::Full; 324 + let sb = ticker.subscribe(&[token], Some(mode.clone())).await; 325 + assert_eq!(sb.is_ok(), true); 326 + let mut sb = sb.unwrap(); 327 + assert_eq!(sb.subscribed_tokens.len(), 1); 328 + let mut loop_cnt = 0; 329 + loop { 330 + loop_cnt += 1; 331 + select! { 332 + Ok(n) = sb.next_message() => { 333 + match n.to_owned() { 334 + Some(message) => { 335 + match message { 336 + TickerMessage::Tick(xs) => { 337 + if xs.len() == 0 { 338 + if loop_cnt > 5 { 339 + break; 340 + }else { 341 + continue; 342 + } 343 + } 344 + assert_eq!(xs.len(), 1); 345 + let tick_message = xs.first().unwrap(); 346 + dbg!(tick_message); 347 + assert!(tick_message.instrument_token == token); 348 + assert_eq!(tick_message.content.mode, mode); 349 + if loop_cnt > 5 { 350 + break; 351 + } 352 + }, 353 + _ => { 354 + if loop_cnt > 5 { 355 + break; 356 + } 357 + } 358 + } 359 + }, 360 + _ => { 361 + if loop_cnt > 5 { 362 + assert!(false); 363 + break; 364 + } 365 + } 366 + } 367 + }, 368 + else => { 369 + assert!(false); 370 + break; 371 + } 372 + } 373 + } 374 + 375 + sb.ticker.close().await.unwrap(); 376 + } 377 + 378 + #[tokio::test] 379 + async fn test_unsubscribe() { 380 + // create a ticker 381 + let api_key = std::env::var("KITE_API_KEY").unwrap(); 382 + let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap(); 383 + let ticker = KiteTickerAsync::connect(&api_key, &access_token).await; 384 + 385 + let ticker = ticker.unwrap(); 386 + let token = 94977; // bata 387 + let mode = Mode::Full; 388 + let mut sb = ticker 389 + .subscribe(&[token], Some(mode.clone())) 390 + .await 391 + .unwrap(); 392 + 393 + let mut loop_cnt = 0; 394 + 395 + loop { 396 + match sb.next_message().await { 397 + Ok(message) => match message { 398 + Some(TickerMessage::Tick(xs)) => { 399 + if xs.len() == 0 { 400 + if loop_cnt > 4 { 401 + assert!(true); 402 + break; 403 + } else { 404 + loop_cnt += 1; 405 + continue; 406 + } 407 + } 408 + assert_eq!(xs.len(), 1); 409 + let tick_message = xs.first().unwrap(); 410 + dbg!(tick_message); 411 + assert!(tick_message.instrument_token == token); 412 + sb.unsubscribe(&[]).await.unwrap(); 413 + loop_cnt += 1; 414 + if loop_cnt > 5 { 415 + assert!(false); 416 + break; 417 + } 418 + } 419 + _ => { 420 + dbg!(message); 421 + continue; 422 + } 423 + }, 424 + _ => { 425 + assert!(false); 426 + break; 427 + } 428 + } 429 + } 430 + sb.ticker.close().await.unwrap(); 431 + } 432 + 433 + async fn create_ticker() -> KiteTickerAsync { 434 + // create a ticker 435 + let api_key = std::env::var("KITE_API_KEY").unwrap(); 436 + let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap(); 437 + let ticker = KiteTickerAsync::connect(&api_key, &access_token).await; 438 + ticker.expect("failed to create ticker") 439 + } 440 + 441 + #[tokio::test] 442 + async fn test_set_mode() { 443 + let ticker = create_ticker().await; 444 + let token = 94977; // bata 445 + let mode = Mode::LTP; 446 + let new_mode = Mode::Quote; 447 + let mut sb = ticker 448 + .subscribe(&[token], Some(mode.clone())) 449 + .await 450 + .unwrap(); 451 + 452 + let f1: Option<Box<dyn Fn(Vec<TickMessage>) -> ()>> = None; 453 + let f2: Option<Box<dyn Fn(Vec<TickMessage>) -> ()>> = None; 454 + check(mode, token, &mut sb, f1).await; 455 + sb.set_mode(&[], new_mode.clone()).await.unwrap(); 456 + check(new_mode, token, &mut sb, f2).await; 457 + 458 + sb.ticker.close().await.unwrap(); 459 + } 460 + 461 + #[tokio::test] 462 + async fn test_new_sub() { 463 + let ticker = create_ticker().await; 464 + let token = 94977; // bata 465 + let mode = Mode::LTP; 466 + let mut sb = ticker 467 + .subscribe(&[token], Some(mode.clone())) 468 + .await 469 + .unwrap(); 470 + tokio::spawn(async move { 471 + sb.subscribe(&[2953217], None).await.unwrap(); 472 + }) 473 + .await 474 + .unwrap(); 475 + } 476 + }