Async client for the Kite Connect WebSocket API
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

chore: refactor models

chore: refactor models

+566 -495
+1
Cargo.toml
··· 21 21 futures-util = { version = "0.3.28", features = ["sink"] } 22 22 tokio-stream = { version = "0.1.14", features = ["full"] } 23 23 url = "2.4.1" 24 + serde_with = "3.4.0" 24 25 25 26 [dev-dependencies] 26 27 tokio = { version = "1", features = ["test-util"] }
+2 -2
src/lib.rs
··· 51 51 //! ``` 52 52 mod models; 53 53 pub use models::{ 54 - Depth, DepthItem, Exchange, Mode, Request, TextMessage, Tick, TickMessage, 55 - TickerMessage, OHLC, 54 + Depth, DepthItem, Exchange, Mode, Request, TextMessage, TickMessage, 55 + TickerMessage, OHLC, Tick 56 56 }; 57 57 58 58 pub mod ticker;
-492
src/models.rs
··· 1 - use serde::{Deserialize, Serialize}; 2 - use std::{ops::Div, time::Duration}; 3 - 4 - fn value(input: &[u8]) -> Option<u32> { 5 - let value = i32::from_be_bytes(input[0..=3].try_into().unwrap()); 6 - value.try_into().ok() 7 - } 8 - 9 - fn value_short(input: &[u8]) -> Option<u16> { 10 - let value = i16::from_be_bytes(input[0..=1].try_into().unwrap()); 11 - value.try_into().ok() 12 - } 13 - 14 - fn price(input: &[u8], exchange: &Exchange) -> Option<f64> { 15 - let value = i32::from_be_bytes(input[0..4].try_into().unwrap()) as f64; 16 - if exchange.divisor() > 0_f64 { 17 - Some(value.div(exchange.divisor())) 18 - } else { 19 - None 20 - } 21 - } 22 - 23 - pub(crate) fn packet_length(bs: &[u8]) -> usize { 24 - i16::from_be_bytes(bs[0..=1].try_into().unwrap()) as usize 25 - } 26 - 27 - #[derive(Debug, Clone, Default)] 28 - /// 29 - /// Quote packet structure 30 - /// 31 - pub struct Tick { 32 - pub mode: Mode, 33 - pub instrument_token: u32, 34 - pub exchange: Exchange, 35 - pub is_tradable: bool, 36 - pub is_index: bool, 37 - 38 - pub last_traded_qty: Option<u32>, 39 - pub avg_traded_price: Option<f64>, 40 - pub last_price: Option<f64>, 41 - pub volume_traded: Option<u32>, 42 - pub total_buy_qty: Option<u32>, 43 - pub total_sell_qty: Option<u32>, 44 - pub ohlc: Option<OHLC>, 45 - 46 - pub last_traded_timestamp: Option<Duration>, 47 - pub oi: Option<u32>, 48 - pub oi_day_high: Option<u32>, 49 - pub oi_day_low: Option<u32>, 50 - pub exchange_timestamp: Option<Duration>, 51 - 52 - pub net_change: Option<f64>, 53 - pub depth: Option<Depth>, 54 - } 55 - 56 - impl Tick { 57 - fn set_instrument_token(&mut self, input: &[u8]) -> &mut Self { 58 - self.instrument_token = value(&input[0..=3]).unwrap(); 59 - self.exchange = ((self.instrument_token & 0xFF) as usize).into(); 60 - self 61 - } 62 - 63 - fn set_change(&mut self) -> &mut Self { 64 - self.net_change = self 65 - .ohlc 66 - .as_ref() 67 - .map(|o| o.close) 68 - .map(|close_price| { 69 - if let Some(last_price) = self.last_price { 70 - if close_price == 0_f64 { 71 - return None; 72 - } else { 73 - Some(((last_price - close_price) * 100.0).div(close_price)) 74 - } 75 - } else { 76 - None 77 - } 78 - }) 79 - .unwrap_or_default(); 80 - self 81 - } 82 - } 83 - 84 - impl From<&[u8]> for Tick { 85 - fn from(input: &[u8]) -> Self { 86 - let mut tick = Tick::default(); 87 - 88 - let parse_ltp = |t: &mut Tick, i: &[u8]| { 89 - // 0 - 4 bytes : instrument token 90 - t.set_instrument_token(i); 91 - // 4 - 8 bytes : ltp 92 - if let Some(bs) = i.get(4..8) { 93 - t.mode = Mode::LTP; 94 - t.last_price = price(bs, &t.exchange); 95 - } 96 - }; 97 - 98 - let parse_quote = |t: &mut Tick, i: &[u8], is_index: bool| { 99 - if is_index { 100 - if let Some(bs) = i.get(8..28) { 101 - t.mode = Mode::Quote; 102 - // 8 - 24 bytes : ohlc 103 - t.ohlc = OHLC::from(&bs[0..16], &t.exchange); 104 - // 24 - 28 bytes : Price change 105 - // t.net_change = price(&bs[16..=19], &t.exchange); 106 - t.set_change(); 107 - } 108 - } else { 109 - if let Some(bs) = i.get(8..44) { 110 - t.mode = Mode::Quote; 111 - // 8 - 12 bytes : last traded quantity 112 - t.last_traded_qty = value(&bs[0..4]); 113 - // 12 - 16 bytes : avg traded price 114 - t.avg_traded_price = price(&bs[4..8], &t.exchange); 115 - // 16 - 20 bytes : volume traded today 116 - t.volume_traded = value(&bs[8..12]); 117 - // 20 - 24 bytes : total buy quantity 118 - t.total_buy_qty = value(&bs[12..16]); 119 - // 24 - 28 bytes : total sell quantity 120 - t.total_sell_qty = value(&bs[16..20]); 121 - // 28 - 44 bytes : ohlc 122 - t.ohlc = OHLC::from(&bs[20..36], &t.exchange); 123 - 124 - t.set_change(); 125 - } 126 - } 127 - }; 128 - 129 - let parse_full = |t: &mut Tick, i: &[u8], is_index: bool| { 130 - if is_index { 131 - if let Some(bs) = i.get(28..32) { 132 - t.mode = Mode::Full; 133 - // 28 - 32 bytes : exchange time 134 - t.exchange_timestamp = 135 - value(bs).map(|x| Duration::from_secs(x.into())); 136 - } 137 - } else { 138 - if let Some(bs) = i.get(44..184) { 139 - t.mode = Mode::Full; 140 - // 44 - 48 bytes : last traded timestamp 141 - t.last_traded_timestamp = 142 - value(&bs[0..4]).map(|x| Duration::from_secs(x.into())); 143 - 144 - // 48 - 52 bytes : oi 145 - t.oi = value(&bs[4..8]); 146 - // 52 - 56 bytes : oi day high 147 - t.oi_day_high = value(&bs[8..12]); 148 - // 56 - 60 bytes : oi day low 149 - t.oi_day_low = value(&bs[12..16]); 150 - // 60 - 64 bytes : exchange time 151 - t.exchange_timestamp = 152 - value(&bs[16..20]).map(|x| Duration::from_secs(x.into())); 153 - // 64 - 184 bytes : market depth 154 - t.depth = Depth::from(&bs[20..140], &t.exchange); 155 - } 156 - } 157 - }; 158 - 159 - parse_ltp(&mut tick, input); 160 - if !tick.exchange.is_tradable() { 161 - tick.is_index = true; 162 - tick.is_tradable = false; 163 - 164 - parse_quote(&mut tick, input, true); 165 - parse_full(&mut tick, input, true); 166 - } else { 167 - tick.is_index = false; 168 - tick.is_tradable = true; 169 - 170 - parse_quote(&mut tick, input, false); 171 - parse_full(&mut tick, input, false); 172 - } 173 - 174 - tick 175 - } 176 - } 177 - 178 - #[derive(Debug, Clone, Default)] 179 - /// 180 - /// OHLC packet structure 181 - /// 182 - pub struct OHLC { 183 - pub open: f64, 184 - pub high: f64, 185 - pub low: f64, 186 - pub close: f64, 187 - } 188 - 189 - impl OHLC { 190 - fn from(value: &[u8], exchange: &Exchange) -> Option<Self> { 191 - if let Some(bs) = value.get(0..16) { 192 - Some(OHLC { 193 - open: price(&bs[0..=3], exchange).unwrap(), 194 - high: price(&bs[4..=7], exchange).unwrap(), 195 - low: price(&bs[8..=11], exchange).unwrap(), 196 - close: price(&bs[12..=15], exchange).unwrap(), 197 - }) 198 - } else { 199 - None 200 - } 201 - } 202 - } 203 - 204 - #[derive(Debug, Clone, Default)] 205 - /// 206 - /// Market depth packet structure 207 - /// 208 - pub struct Depth { 209 - pub buy: [DepthItem; 5], 210 - pub sell: [DepthItem; 5], 211 - } 212 - 213 - impl Depth { 214 - fn from(input: &[u8], exchange: &Exchange) -> Option<Self> { 215 - if let Some(bs) = input.get(0..120) { 216 - let parse_depth_item = |v: &[u8], start: usize| { 217 - v.get(start..start + 10) 218 - .and_then(|xs| DepthItem::from(xs, exchange)) 219 - .unwrap_or_default() 220 - }; 221 - let mut depth = Depth::default(); 222 - for i in 0..5 { 223 - let start = i * 12; 224 - depth.buy[i] = parse_depth_item(bs, start) 225 - } 226 - for i in 0..5 { 227 - let start = 60 + i * 12; 228 - depth.sell[i] = parse_depth_item(bs, start); 229 - } 230 - 231 - Some(depth) 232 - } else { 233 - None 234 - } 235 - } 236 - } 237 - 238 - #[derive(Debug, Clone, Default)] 239 - /// 240 - /// Structure for each market depth entry 241 - /// 242 - pub struct DepthItem { 243 - pub qty: u32, 244 - pub price: f64, 245 - pub orders: u16, 246 - } 247 - 248 - impl DepthItem { 249 - pub fn from(input: &[u8], exchange: &Exchange) -> Option<Self> { 250 - if let Some(bs) = input.get(0..10) { 251 - Some(DepthItem { 252 - qty: value(&bs[0..=3]).unwrap(), 253 - price: price(&bs[4..=7], exchange).unwrap(), 254 - orders: value_short(&bs[8..=9]).unwrap(), 255 - }) 256 - } else { 257 - None 258 - } 259 - } 260 - } 261 - 262 - #[derive(Debug, Clone)] 263 - /// 264 - /// Parsed message from websocket 265 - /// 266 - pub enum TickerMessage { 267 - /// Quote packets for subscribed tokens 268 - Ticks(Vec<TickMessage>), 269 - /// Error response 270 - Error(String), 271 - /// Order postback 272 - Order(serde_json::Value), 273 - /// Messages and alerts from broker 274 - Message(serde_json::Value), 275 - /// Websocket closing frame 276 - ClosingMessage(serde_json::Value), 277 - } 278 - 279 - impl From<TextMessage> for TickerMessage { 280 - fn from(value: TextMessage) -> Self { 281 - let message_type: TextMessageType = value.message_type.into(); 282 - match message_type { 283 - TextMessageType::Order => Self::Order(value.data), 284 - TextMessageType::Error => Self::Error(value.data.to_string()), 285 - TextMessageType::Message => Self::Message(value.data), 286 - } 287 - } 288 - } 289 - 290 - #[derive(Debug, Clone, Default)] 291 - /// 292 - /// Parsed quote packet 293 - /// 294 - pub struct TickMessage { 295 - pub instrument_token: u32, 296 - pub content: Tick, 297 - } 298 - 299 - impl TickMessage { 300 - pub(crate) fn new(instrument_token: u32, content: Tick) -> Self { 301 - Self { 302 - instrument_token, 303 - content, 304 - } 305 - } 306 - } 307 - 308 - #[derive(Debug, Clone, Default)] 309 - /// 310 - /// Exchange options 311 - /// 312 - pub enum Exchange { 313 - #[default] 314 - NSE, 315 - NFO, 316 - CDS, 317 - BSE, 318 - BFO, 319 - BCD, 320 - MCX, 321 - MCXSX, 322 - INDICES, 323 - } 324 - 325 - impl Exchange { 326 - fn divisor(&self) -> f64 { 327 - match self { 328 - Self::CDS => 100_000_0.0, 329 - Self::BCD => 100_0.0, 330 - _ => 100.0, 331 - } 332 - } 333 - 334 - fn is_tradable(&self) -> bool { 335 - match self { 336 - Self::INDICES => false, 337 - _ => true, 338 - } 339 - } 340 - } 341 - 342 - impl From<usize> for Exchange { 343 - fn from(value: usize) -> Self { 344 - match value { 345 - 9 => Self::INDICES, 346 - 8 => Self::MCXSX, 347 - 7 => Self::MCX, 348 - 6 => Self::BCD, 349 - 5 => Self::BFO, 350 - 4 => Self::BSE, 351 - 3 => Self::CDS, 352 - 2 => Self::NFO, 353 - 1 => Self::NSE, 354 - _ => Self::NSE, 355 - } 356 - } 357 - } 358 - 359 - #[derive( 360 - Debug, Clone, Deserialize, Serialize, Default, PartialEq, PartialOrd, 361 - )] 362 - #[serde(rename_all = "lowercase")] 363 - /// 364 - /// Modes in which packets are streamed 365 - /// 366 - pub enum Mode { 367 - Full, 368 - #[default] 369 - Quote, 370 - LTP, 371 - } 372 - 373 - impl TryFrom<usize> for Mode { 374 - type Error = String; 375 - fn try_from(value: usize) -> Result<Self, Self::Error> { 376 - match value { 377 - 8 => Ok(Self::LTP), 378 - 44 => Ok(Self::Quote), 379 - 184 => Ok(Self::Full), 380 - _ => Err(format!("Invalid packet size: {}", value)), 381 - } 382 - } 383 - } 384 - 385 - #[derive(Clone, Debug, Deserialize, Serialize)] 386 - #[serde(rename_all = "lowercase")] 387 - /// 388 - /// Websocket request actions 389 - /// 390 - enum RequestActions { 391 - Subscribe, 392 - Unsubscribe, 393 - Mode, 394 - } 395 - 396 - #[derive(Clone, Debug, Deserialize, Serialize)] 397 - #[serde(untagged)] 398 - /// 399 - /// Websocket request data 400 - /// 401 - enum RequestData { 402 - InstrumentTokens(Vec<u32>), 403 - InstrumentTokensWithMode(Mode, Vec<u32>), 404 - } 405 - 406 - #[derive(Debug, Clone, Deserialize, Serialize)] 407 - /// 408 - /// Websocket request structure 409 - /// 410 - pub struct Request { 411 - a: RequestActions, 412 - v: RequestData, 413 - } 414 - 415 - impl Request { 416 - fn new(action: RequestActions, value: RequestData) -> Request { 417 - Request { 418 - a: action, 419 - v: value, 420 - } 421 - } 422 - 423 - /// 424 - /// Subscribe to a list of instrument tokens 425 - /// 426 - pub fn subscribe(instrument_tokens: Vec<u32>) -> Request { 427 - Request::new( 428 - RequestActions::Subscribe, 429 - RequestData::InstrumentTokens(instrument_tokens), 430 - ) 431 - } 432 - 433 - /// 434 - /// Subscribe to a list of instrument tokens with mode 435 - /// 436 - pub fn mode(mode: Mode, instrument_tokens: Vec<u32>) -> Request { 437 - Request::new( 438 - RequestActions::Mode, 439 - RequestData::InstrumentTokensWithMode(mode, instrument_tokens), 440 - ) 441 - } 442 - 443 - /// 444 - /// Unsubscribe from a list of instrument tokens 445 - /// 446 - pub fn unsubscribe(instrument_tokens: Vec<u32>) -> Request { 447 - Request::new( 448 - RequestActions::Unsubscribe, 449 - RequestData::InstrumentTokens(instrument_tokens), 450 - ) 451 - } 452 - } 453 - 454 - impl ToString for Request { 455 - fn to_string(&self) -> String { 456 - serde_json::to_string(self) 457 - .expect("failed to serialize TickerInput to JSON") 458 - } 459 - } 460 - 461 - #[derive(Debug, Clone)] 462 - /// 463 - /// Postbacks and non-binary message types 464 - /// 465 - enum TextMessageType { 466 - /// Order postback 467 - Order, 468 - /// Error response 469 - Error, 470 - /// Messages and alerts from the broker 471 - Message, 472 - } 473 - 474 - impl From<String> for TextMessageType { 475 - fn from(value: String) -> Self { 476 - match value.as_str() { 477 - "order" => Self::Order, 478 - "error" => Self::Error, 479 - _ => Self::Message, 480 - } 481 - } 482 - } 483 - 484 - #[derive(Debug, Clone, Deserialize, Serialize)] 485 - /// 486 - /// Postback and non-binary message structure 487 - /// 488 - pub struct TextMessage { 489 - #[serde(rename = "type")] 490 - message_type: String, 491 - data: serde_json::Value, 492 - }
+61
src/models/depth.rs
··· 1 + use crate::Exchange; 2 + 3 + use super::{value, price, value_short}; 4 + 5 + #[derive(Debug, Clone, Default)] 6 + /// 7 + /// Market depth packet structure 8 + /// 9 + pub struct Depth { 10 + pub buy: [DepthItem; 5], 11 + pub sell: [DepthItem; 5], 12 + } 13 + 14 + impl Depth { 15 + pub(crate) fn from(input: &[u8], exchange: &Exchange) -> Option<Self> { 16 + if let Some(bs) = input.get(0..120) { 17 + let parse_depth_item = |v: &[u8], start: usize| { 18 + v.get(start..start + 10) 19 + .and_then(|xs| DepthItem::from(xs, exchange)) 20 + .unwrap_or_default() 21 + }; 22 + let mut depth = Depth::default(); 23 + for i in 0..5 { 24 + let start = i * 12; 25 + depth.buy[i] = parse_depth_item(bs, start) 26 + } 27 + for i in 0..5 { 28 + let start = 60 + i * 12; 29 + depth.sell[i] = parse_depth_item(bs, start); 30 + } 31 + 32 + Some(depth) 33 + } else { 34 + None 35 + } 36 + } 37 + } 38 + 39 + #[derive(Debug, Clone, Default)] 40 + /// 41 + /// Structure for each market depth entry 42 + /// 43 + pub struct DepthItem { 44 + pub qty: u32, 45 + pub price: f64, 46 + pub orders: u16, 47 + } 48 + 49 + impl DepthItem { 50 + pub fn from(input: &[u8], exchange: &Exchange) -> Option<Self> { 51 + if let Some(bs) = input.get(0..10) { 52 + Some(DepthItem { 53 + qty: value(&bs[0..=3]).unwrap(), 54 + price: price(&bs[4..=7], exchange).unwrap(), 55 + orders: value_short(&bs[8..=9]).unwrap(), 56 + }) 57 + } else { 58 + None 59 + } 60 + } 61 + }
+84
src/models/exchange.rs
··· 1 + #[derive(Debug, Clone, Default)] 2 + /// 3 + /// Exchange options 4 + /// 5 + pub enum Exchange { 6 + #[default] 7 + NSE, 8 + NFO, 9 + CDS, 10 + BSE, 11 + BFO, 12 + BCD, 13 + MCX, 14 + MCXSX, 15 + INDICES, 16 + } 17 + 18 + impl Exchange { 19 + pub(crate) fn divisor(&self) -> f64 { 20 + match self { 21 + Self::CDS => 100_000_0.0, 22 + Self::BCD => 100_0.0, 23 + _ => 100.0, 24 + } 25 + } 26 + 27 + pub(crate) fn is_tradable(&self) -> bool { 28 + match self { 29 + Self::INDICES => false, 30 + _ => true, 31 + } 32 + } 33 + } 34 + 35 + impl From<usize> for Exchange { 36 + fn from(value: usize) -> Self { 37 + match value { 38 + 9 => Self::INDICES, 39 + 8 => Self::MCXSX, 40 + 7 => Self::MCX, 41 + 6 => Self::BCD, 42 + 5 => Self::BFO, 43 + 4 => Self::BSE, 44 + 3 => Self::CDS, 45 + 2 => Self::NFO, 46 + 1 => Self::NSE, 47 + _ => Self::NSE, 48 + } 49 + } 50 + } 51 + 52 + impl From<String> for Exchange { 53 + fn from(value: String) -> Self { 54 + match value.as_str() { 55 + "NSE" => Self::NSE, 56 + "NFO" => Self::NFO, 57 + "CDS" => Self::CDS, 58 + "BSE" => Self::BSE, 59 + "BFO" => Self::BFO, 60 + "BCD" => Self::BCD, 61 + "MCX" => Self::MCX, 62 + "MCXSX" => Self::MCXSX, 63 + "INDICES" => Self::INDICES, 64 + _ => Self::NSE, 65 + 66 + } 67 + } 68 + } 69 + 70 + impl From<Exchange> for String { 71 + fn from(value: Exchange) -> Self { 72 + match value { 73 + Exchange::NSE => "NSE".to_string(), 74 + Exchange::NFO => "NFO".to_string(), 75 + Exchange::CDS => "CDS".to_string(), 76 + Exchange::BSE => "BSE".to_string(), 77 + Exchange::BFO => "BFO".to_string(), 78 + Exchange::BCD => "BCD".to_string(), 79 + Exchange::MCX => "MCX".to_string(), 80 + Exchange::MCXSX => "MCXSX".to_string(), 81 + Exchange::INDICES => "INDICES".to_string(), 82 + } 83 + } 84 + }
+43
src/models/mod.rs
··· 1 + use std::ops::Div; 2 + 3 + mod depth; 4 + mod exchange; 5 + mod mode; 6 + mod ohlc; 7 + mod request; 8 + mod text_message; 9 + mod tick; 10 + mod tick_message; 11 + mod ticker_message; 12 + pub use self::depth::{Depth, DepthItem}; 13 + pub use self::exchange::Exchange; 14 + pub use self::mode::Mode; 15 + pub use self::ohlc::OHLC; 16 + pub use self::request::Request; 17 + pub use self::text_message::TextMessage; 18 + pub use self::tick::Tick; 19 + pub use self::tick_message::TickMessage; 20 + pub use self::ticker_message::TickerMessage; 21 + 22 + fn value(input: &[u8]) -> Option<u32> { 23 + let value = i32::from_be_bytes(input[0..=3].try_into().unwrap()); 24 + value.try_into().ok() 25 + } 26 + 27 + fn value_short(input: &[u8]) -> Option<u16> { 28 + let value = i16::from_be_bytes(input[0..=1].try_into().unwrap()); 29 + value.try_into().ok() 30 + } 31 + 32 + fn price(input: &[u8], exchange: &Exchange) -> Option<f64> { 33 + let value = i32::from_be_bytes(input[0..4].try_into().unwrap()) as f64; 34 + if exchange.divisor() > 0_f64 { 35 + Some(value.div(exchange.divisor())) 36 + } else { 37 + None 38 + } 39 + } 40 + 41 + pub(crate) fn packet_length(bs: &[u8]) -> usize { 42 + i16::from_be_bytes(bs[0..=1].try_into().unwrap()) as usize 43 + }
+27
src/models/mode.rs
··· 1 + use serde::{Deserialize, Serialize}; 2 + 3 + #[derive( 4 + Debug, Clone, Deserialize, Serialize, Default, PartialEq, PartialOrd, 5 + )] 6 + #[serde(rename_all = "lowercase")] 7 + /// 8 + /// Modes in which packets are streamed 9 + /// 10 + pub enum Mode { 11 + Full, 12 + #[default] 13 + Quote, 14 + LTP, 15 + } 16 + 17 + impl TryFrom<usize> for Mode { 18 + type Error = String; 19 + fn try_from(value: usize) -> Result<Self, Self::Error> { 20 + match value { 21 + 8 => Ok(Self::LTP), 22 + 44 => Ok(Self::Quote), 23 + 184 => Ok(Self::Full), 24 + _ => Err(format!("Invalid packet size: {}", value)), 25 + } 26 + } 27 + }
+29
src/models/ohlc.rs
··· 1 + use crate::Exchange; 2 + 3 + use super::price; 4 + 5 + #[derive(Debug, Clone, Default)] 6 + /// 7 + /// OHLC packet structure 8 + /// 9 + pub struct OHLC { 10 + pub open: f64, 11 + pub high: f64, 12 + pub low: f64, 13 + pub close: f64, 14 + } 15 + 16 + impl OHLC { 17 + pub(crate) fn from(value: &[u8], exchange: &Exchange) -> Option<Self> { 18 + if let Some(bs) = value.get(0..16) { 19 + Some(OHLC { 20 + open: price(&bs[0..=3], exchange).unwrap(), 21 + high: price(&bs[4..=7], exchange).unwrap(), 22 + low: price(&bs[8..=11], exchange).unwrap(), 23 + close: price(&bs[12..=15], exchange).unwrap(), 24 + }) 25 + } else { 26 + None 27 + } 28 + } 29 + }
+79
src/models/request.rs
··· 1 + use serde::{Deserialize, Serialize}; 2 + 3 + use crate::Mode; 4 + 5 + #[derive(Clone, Debug, Deserialize, Serialize)] 6 + #[serde(rename_all = "lowercase")] 7 + /// 8 + /// Websocket request actions 9 + /// 10 + enum RequestActions { 11 + Subscribe, 12 + Unsubscribe, 13 + Mode, 14 + } 15 + 16 + #[derive(Clone, Debug, Deserialize, Serialize)] 17 + #[serde(untagged)] 18 + /// 19 + /// Websocket request data 20 + /// 21 + enum RequestData { 22 + InstrumentTokens(Vec<u32>), 23 + InstrumentTokensWithMode(Mode, Vec<u32>), 24 + } 25 + 26 + #[derive(Debug, Clone, Deserialize, Serialize)] 27 + /// 28 + /// Websocket request structure 29 + /// 30 + pub struct Request { 31 + a: RequestActions, 32 + v: RequestData, 33 + } 34 + 35 + impl Request { 36 + fn new(action: RequestActions, value: RequestData) -> Request { 37 + Request { 38 + a: action, 39 + v: value, 40 + } 41 + } 42 + 43 + /// 44 + /// Subscribe to a list of instrument tokens 45 + /// 46 + pub fn subscribe(instrument_tokens: Vec<u32>) -> Request { 47 + Request::new( 48 + RequestActions::Subscribe, 49 + RequestData::InstrumentTokens(instrument_tokens), 50 + ) 51 + } 52 + 53 + /// 54 + /// Subscribe to a list of instrument tokens with mode 55 + /// 56 + pub fn mode(mode: Mode, instrument_tokens: Vec<u32>) -> Request { 57 + Request::new( 58 + RequestActions::Mode, 59 + RequestData::InstrumentTokensWithMode(mode, instrument_tokens), 60 + ) 61 + } 62 + 63 + /// 64 + /// Unsubscribe from a list of instrument tokens 65 + /// 66 + pub fn unsubscribe(instrument_tokens: Vec<u32>) -> Request { 67 + Request::new( 68 + RequestActions::Unsubscribe, 69 + RequestData::InstrumentTokens(instrument_tokens), 70 + ) 71 + } 72 + } 73 + 74 + impl ToString for Request { 75 + fn to_string(&self) -> String { 76 + serde_json::to_string(self) 77 + .expect("failed to serialize TickerInput to JSON") 78 + } 79 + }
+34
src/models/text_message.rs
··· 1 + use serde::{Deserialize, Serialize}; 2 + 3 + #[derive(Debug, Clone)] 4 + /// 5 + /// Postbacks and non-binary message types 6 + /// 7 + pub(crate) enum TextMessageType { 8 + /// Order postback 9 + Order, 10 + /// Error response 11 + Error, 12 + /// Messages and alerts from the broker 13 + Message, 14 + } 15 + 16 + impl From<String> for TextMessageType { 17 + fn from(value: String) -> Self { 18 + match value.as_str() { 19 + "order" => Self::Order, 20 + "error" => Self::Error, 21 + _ => Self::Message, 22 + } 23 + } 24 + } 25 + 26 + #[derive(Debug, Clone, Deserialize, Serialize)] 27 + /// 28 + /// Postback and non-binary message structure 29 + /// 30 + pub struct TextMessage { 31 + #[serde(rename = "type")] 32 + pub(crate) message_type: String, 33 + pub(crate) data: serde_json::Value, 34 + }
+156
src/models/tick.rs
··· 1 + use std::{time::Duration, ops::Div}; 2 + 3 + use crate::{Mode, Exchange, OHLC, Depth}; 4 + 5 + use super::{value, price}; 6 + 7 + #[derive(Debug, Clone, Default)] 8 + /// 9 + /// Quote packet structure 10 + /// 11 + pub struct Tick { 12 + pub mode: Mode, 13 + pub instrument_token: u32, 14 + pub exchange: Exchange, 15 + pub is_tradable: bool, 16 + pub is_index: bool, 17 + 18 + pub last_traded_qty: Option<u32>, 19 + pub avg_traded_price: Option<f64>, 20 + pub last_price: Option<f64>, 21 + pub volume_traded: Option<u32>, 22 + pub total_buy_qty: Option<u32>, 23 + pub total_sell_qty: Option<u32>, 24 + pub ohlc: Option<OHLC>, 25 + 26 + pub last_traded_timestamp: Option<Duration>, 27 + pub oi: Option<u32>, 28 + pub oi_day_high: Option<u32>, 29 + pub oi_day_low: Option<u32>, 30 + pub exchange_timestamp: Option<Duration>, 31 + 32 + pub net_change: Option<f64>, 33 + pub depth: Option<Depth>, 34 + } 35 + 36 + impl Tick { 37 + fn set_instrument_token(&mut self, input: &[u8]) -> &mut Self { 38 + self.instrument_token = value(&input[0..=3]).unwrap(); 39 + self.exchange = ((self.instrument_token & 0xFF) as usize).into(); 40 + self 41 + } 42 + 43 + fn set_change(&mut self) -> &mut Self { 44 + self.net_change = self 45 + .ohlc 46 + .as_ref() 47 + .map(|o| o.close) 48 + .map(|close_price| { 49 + if let Some(last_price) = self.last_price { 50 + if close_price == 0_f64 { 51 + return None; 52 + } else { 53 + Some(((last_price - close_price) * 100.0).div(close_price)) 54 + } 55 + } else { 56 + None 57 + } 58 + }) 59 + .unwrap_or_default(); 60 + self 61 + } 62 + } 63 + 64 + impl From<&[u8]> for Tick { 65 + fn from(input: &[u8]) -> Self { 66 + let mut tick = Tick::default(); 67 + 68 + let parse_ltp = |t: &mut Tick, i: &[u8]| { 69 + // 0 - 4 bytes : instrument token 70 + t.set_instrument_token(i); 71 + // 4 - 8 bytes : ltp 72 + if let Some(bs) = i.get(4..8) { 73 + t.mode = Mode::LTP; 74 + t.last_price = price(bs, &t.exchange); 75 + } 76 + }; 77 + 78 + let parse_quote = |t: &mut Tick, i: &[u8], is_index: bool| { 79 + if is_index { 80 + if let Some(bs) = i.get(8..28) { 81 + t.mode = Mode::Quote; 82 + // 8 - 24 bytes : ohlc 83 + t.ohlc = OHLC::from(&bs[0..16], &t.exchange); 84 + // 24 - 28 bytes : Price change 85 + // t.net_change = price(&bs[16..=19], &t.exchange); 86 + t.set_change(); 87 + } 88 + } else { 89 + if let Some(bs) = i.get(8..44) { 90 + t.mode = Mode::Quote; 91 + // 8 - 12 bytes : last traded quantity 92 + t.last_traded_qty = value(&bs[0..4]); 93 + // 12 - 16 bytes : avg traded price 94 + t.avg_traded_price = price(&bs[4..8], &t.exchange); 95 + // 16 - 20 bytes : volume traded today 96 + t.volume_traded = value(&bs[8..12]); 97 + // 20 - 24 bytes : total buy quantity 98 + t.total_buy_qty = value(&bs[12..16]); 99 + // 24 - 28 bytes : total sell quantity 100 + t.total_sell_qty = value(&bs[16..20]); 101 + // 28 - 44 bytes : ohlc 102 + t.ohlc = OHLC::from(&bs[20..36], &t.exchange); 103 + 104 + t.set_change(); 105 + } 106 + } 107 + }; 108 + 109 + let parse_full = |t: &mut Tick, i: &[u8], is_index: bool| { 110 + if is_index { 111 + if let Some(bs) = i.get(28..32) { 112 + t.mode = Mode::Full; 113 + // 28 - 32 bytes : exchange time 114 + t.exchange_timestamp = 115 + value(bs).map(|x| Duration::from_secs(x.into())); 116 + } 117 + } else { 118 + if let Some(bs) = i.get(44..184) { 119 + t.mode = Mode::Full; 120 + // 44 - 48 bytes : last traded timestamp 121 + t.last_traded_timestamp = 122 + value(&bs[0..4]).map(|x| Duration::from_secs(x.into())); 123 + 124 + // 48 - 52 bytes : oi 125 + t.oi = value(&bs[4..8]); 126 + // 52 - 56 bytes : oi day high 127 + t.oi_day_high = value(&bs[8..12]); 128 + // 56 - 60 bytes : oi day low 129 + t.oi_day_low = value(&bs[12..16]); 130 + // 60 - 64 bytes : exchange time 131 + t.exchange_timestamp = 132 + value(&bs[16..20]).map(|x| Duration::from_secs(x.into())); 133 + // 64 - 184 bytes : market depth 134 + t.depth = Depth::from(&bs[20..140], &t.exchange); 135 + } 136 + } 137 + }; 138 + 139 + parse_ltp(&mut tick, input); 140 + if !tick.exchange.is_tradable() { 141 + tick.is_index = true; 142 + tick.is_tradable = false; 143 + 144 + parse_quote(&mut tick, input, true); 145 + parse_full(&mut tick, input, true); 146 + } else { 147 + tick.is_index = false; 148 + tick.is_tradable = true; 149 + 150 + parse_quote(&mut tick, input, false); 151 + parse_full(&mut tick, input, false); 152 + } 153 + 154 + tick 155 + } 156 + }
+19
src/models/tick_message.rs
··· 1 + use crate::Tick; 2 + 3 + #[derive(Debug, Clone, Default)] 4 + /// 5 + /// Parsed quote packet 6 + /// 7 + pub struct TickMessage { 8 + pub instrument_token: u32, 9 + pub content: Tick, 10 + } 11 + 12 + impl TickMessage { 13 + pub(crate) fn new(instrument_token: u32, content: Tick) -> Self { 14 + Self { 15 + instrument_token, 16 + content, 17 + } 18 + } 19 + }
+31
src/models/ticker_message.rs
··· 1 + use crate::{TextMessage, TickMessage}; 2 + 3 + use super::text_message::TextMessageType; 4 + 5 + #[derive(Debug, Clone)] 6 + /// 7 + /// Parsed message from websocket 8 + /// 9 + pub enum TickerMessage { 10 + /// Quote packets for subscribed tokens 11 + Ticks(Vec<TickMessage>), 12 + /// Error response 13 + Error(String), 14 + /// Order postback 15 + Order(serde_json::Value), 16 + /// Messages and alerts from broker 17 + Message(serde_json::Value), 18 + /// Websocket closing frame 19 + ClosingMessage(serde_json::Value), 20 + } 21 + 22 + impl From<TextMessage> for TickerMessage { 23 + fn from(value: TextMessage) -> Self { 24 + let message_type: TextMessageType = value.message_type.into(); 25 + match message_type { 26 + TextMessageType::Order => Self::Order(value.data), 27 + TextMessageType::Error => Self::Error(value.data.to_string()), 28 + TextMessageType::Message => Self::Message(value.data), 29 + } 30 + } 31 + }
-1
src/ticker.rs
··· 6 6 use tokio_tungstenite::{ 7 7 connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream, 8 8 }; 9 - 10 9 use crate::models::{ 11 10 packet_length, Mode, Request, TextMessage, Tick, TickMessage, TickerMessage, 12 11 };