Async client for the Kite Connect WebSocket API
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

chore: refactor integration tests

+227 -210
+13 -6
justfile
··· 3 3 check: 4 4 cargo check 5 5 6 - test api_key='' access_token='': 7 - KITE_API_KEY={{api_key}} KITE_ACCESS_TOKEN={{access_token}} cargo test --lib 8 - 9 6 build: 10 7 cargo clean --quiet -r 11 8 cargo build --release --quiet ··· 14 11 cargo clean --doc --quiet 15 12 {{cargo-doc-command}} 16 13 17 - doc-test api_key='' access_token='': 18 - KITE_API_KEY={{api_key}} KITE_ACCESS_TOKEN={{access_token}} cargo test --quiet --doc 19 - 20 14 doc-open: 21 15 rm -r target/doc 22 16 {{cargo-doc-command}} --open 23 17 24 18 example api_key access_token: 25 19 KITE_API_KEY={{api_key}} KITE_ACCESS_TOKEN={{access_token}} cargo run --example sample 20 + 21 + test: 22 + cargo test --lib 23 + 24 + test-unit: test 25 + 26 + test-integration api_key='' access_token='': 27 + KITE_API_KEY={{api_key}} KITE_ACCESS_TOKEN={{access_token}} cargo test --test '*' 28 + 29 + test-doc api_key='' access_token='': 30 + KITE_API_KEY={{api_key}} KITE_ACCESS_TOKEN={{access_token}} cargo test --quiet --doc 31 + 32 + test-all: test-unit test-integration test-doc
+8 -204
src/ticker.rs
··· 1 + use crate::models::{ 2 + packet_length, Mode, Request, TextMessage, Tick, TickMessage, TickerMessage, 3 + }; 1 4 use futures_util::{stream::iter, SinkExt, StreamExt}; 2 5 use serde_json::json; 3 6 use std::{collections::HashMap, sync::Arc}; ··· 5 8 use tokio::sync::Mutex; 6 9 use tokio_tungstenite::{ 7 10 connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream, 8 - }; 9 - use crate::models::{ 10 - packet_length, Mode, Request, TextMessage, Tick, TickMessage, TickerMessage, 11 11 }; 12 12 13 13 #[derive(Debug, Clone)] ··· 199 199 200 200 /// Get the next message from the server, waiting if necessary. 201 201 /// If the result is None then server is terminated 202 - pub async fn next_message(&mut self) -> Result<Option<TickerMessage>, String> { 202 + pub async fn next_message( 203 + &mut self, 204 + ) -> Result<Option<TickerMessage>, String> { 203 205 let mut ws_stream = self.ticker.ws_stream.lock().await; 204 206 match ws_stream.next().await { 205 207 Some(message) => match message { ··· 263 265 .map(|x| x.into()) 264 266 .ok() 265 267 } 266 - } 267 268 268 - #[cfg(test)] 269 - mod tests { 270 - use super::*; 271 - use tokio::select; 272 - 273 - async fn check<F>( 274 - mode: Mode, 275 - token: u32, 276 - sb: &mut KiteTickerSubscriber, 277 - assertions: Option<F>, 278 - ) where 279 - F: Fn(Vec<TickMessage>) -> (), 280 - { 281 - loop { 282 - match sb.next_message().await { 283 - Ok(message) => match message { 284 - Some(TickerMessage::Ticks(xs)) => { 285 - if xs.len() == 0 { 286 - continue; 287 - } 288 - assertions.map(|f| f(xs.clone())).or_else(|| { 289 - let tick_message = xs.first().unwrap(); 290 - assert!(tick_message.instrument_token == token); 291 - assert_eq!(tick_message.content.mode, mode); 292 - Some(()) 293 - }); 294 - break; 295 - } 296 - _ => { 297 - continue; 298 - } 299 - }, 300 - _ => { 301 - assert!(false); 302 - break; 303 - } 304 - } 305 - } 306 - } 307 - 308 - #[tokio::test] 309 - async fn test_ticker() { 310 - let api_key = std::env::var("KITE_API_KEY").unwrap(); 311 - let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap(); 312 - let ticker = KiteTickerAsync::connect(&api_key, &access_token).await; 313 - 314 - assert_eq!(ticker.is_ok(), true); 315 - 316 - let ticker = ticker.unwrap(); 317 - let token = 94977; // bata 318 - let mode = Mode::Full; 319 - let sb = ticker.subscribe(&[token], Some(mode.clone())).await; 320 - assert_eq!(sb.is_ok(), true); 321 - let mut sb = sb.unwrap(); 322 - assert_eq!(sb.subscribed_tokens.len(), 1); 323 - let mut loop_cnt = 0; 324 - loop { 325 - loop_cnt += 1; 326 - select! { 327 - Ok(n) = sb.next_message() => { 328 - match n.to_owned() { 329 - Some(message) => { 330 - match message { 331 - TickerMessage::Ticks(xs) => { 332 - if xs.len() == 0 { 333 - if loop_cnt > 5 { 334 - break; 335 - }else { 336 - continue; 337 - } 338 - } 339 - assert_eq!(xs.len(), 1); 340 - let tick_message = xs.first().unwrap(); 341 - assert!(tick_message.instrument_token == token); 342 - assert_eq!(tick_message.content.mode, mode); 343 - if loop_cnt > 5 { 344 - break; 345 - } 346 - }, 347 - _ => { 348 - if loop_cnt > 5 { 349 - break; 350 - } 351 - } 352 - } 353 - }, 354 - _ => { 355 - if loop_cnt > 5 { 356 - assert!(false); 357 - break; 358 - } 359 - } 360 - } 361 - }, 362 - else => { 363 - assert!(false); 364 - break; 365 - } 366 - } 367 - } 368 - 369 - sb.ticker.close().await.unwrap(); 370 - } 371 - 372 - #[tokio::test] 373 - async fn test_unsubscribe() { 374 - // create a ticker 375 - let api_key = std::env::var("KITE_API_KEY").unwrap(); 376 - let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap(); 377 - let ticker = KiteTickerAsync::connect(&api_key, &access_token).await; 378 - 379 - let ticker = ticker.unwrap(); 380 - let token = 94977; // bata 381 - let mode = Mode::Full; 382 - let mut sb = ticker 383 - .subscribe(&[token], Some(mode.clone())) 384 - .await 385 - .unwrap(); 386 - 387 - let mut loop_cnt = 0; 388 - 389 - loop { 390 - match sb.next_message().await { 391 - Ok(message) => match message { 392 - Some(TickerMessage::Ticks(xs)) => { 393 - if xs.len() == 0 { 394 - if loop_cnt > 4 { 395 - assert!(true); 396 - break; 397 - } else { 398 - loop_cnt += 1; 399 - continue; 400 - } 401 - } 402 - assert_eq!(xs.len(), 1); 403 - let tick_message = xs.first().unwrap(); 404 - assert!(tick_message.instrument_token == token); 405 - sb.unsubscribe(&[]).await.unwrap(); 406 - loop_cnt += 1; 407 - if loop_cnt > 5 { 408 - assert!(false); 409 - break; 410 - } 411 - } 412 - _ => { 413 - continue; 414 - } 415 - }, 416 - _ => { 417 - assert!(false); 418 - break; 419 - } 420 - } 421 - } 422 - sb.ticker.close().await.unwrap(); 423 - } 424 - 425 - async fn create_ticker() -> KiteTickerAsync { 426 - // create a ticker 427 - let api_key = std::env::var("KITE_API_KEY").unwrap(); 428 - let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap(); 429 - let ticker = KiteTickerAsync::connect(&api_key, &access_token).await; 430 - ticker.expect("failed to create ticker") 431 - } 432 - 433 - #[tokio::test] 434 - async fn test_set_mode() { 435 - let ticker = create_ticker().await; 436 - let token = 94977; // bata 437 - let mode = Mode::LTP; 438 - let new_mode = Mode::Quote; 439 - let mut sb = ticker 440 - .subscribe(&[token], Some(mode.clone())) 441 - .await 442 - .unwrap(); 443 - 444 - let f1: Option<Box<dyn Fn(Vec<TickMessage>) -> ()>> = None; 445 - let f2: Option<Box<dyn Fn(Vec<TickMessage>) -> ()>> = None; 446 - check(mode, token, &mut sb, f1).await; 447 - sb.set_mode(&[], new_mode.clone()).await.unwrap(); 448 - check(new_mode, token, &mut sb, f2).await; 449 - 450 - sb.ticker.close().await.unwrap(); 451 - } 452 - 453 - #[tokio::test] 454 - async fn test_new_sub() { 455 - let ticker = create_ticker().await; 456 - let token = 94977; // bata 457 - let mode = Mode::LTP; 458 - let mut sb = ticker 459 - .subscribe(&[token], Some(mode.clone())) 460 - .await 461 - .unwrap(); 462 - tokio::spawn(async move { 463 - sb.subscribe(&[2953217], None).await.unwrap(); 464 - }) 465 - .await 466 - .unwrap(); 269 + pub async fn close(&mut self) -> Result<(), String> { 270 + self.ticker.close().await 467 271 } 468 272 }
+38
tests/common/mod.rs
··· 1 + use kiteticker_async::{ 2 + KiteTickerSubscriber, Mode, TickMessage, TickerMessage, 3 + }; 4 + 5 + pub async fn check<F>( 6 + mode: Mode, 7 + token: u32, 8 + sb: &mut KiteTickerSubscriber, 9 + assertions: Option<F>, 10 + ) where 11 + F: Fn(Vec<TickMessage>) -> (), 12 + { 13 + loop { 14 + match sb.next_message().await { 15 + Ok(message) => match message { 16 + Some(TickerMessage::Ticks(xs)) => { 17 + if xs.len() == 0 { 18 + continue; 19 + } 20 + assertions.map(|f| f(xs.clone())).or_else(|| { 21 + let tick_message = xs.first().unwrap(); 22 + assert!(tick_message.instrument_token == token); 23 + assert_eq!(tick_message.content.mode, mode); 24 + Some(()) 25 + }); 26 + break; 27 + } 28 + _ => { 29 + continue; 30 + } 31 + }, 32 + _ => { 33 + assert!(false); 34 + break; 35 + } 36 + } 37 + } 38 + }
+168
tests/ticker_test.rs
··· 1 + mod common; 2 + 3 + use kiteticker_async::ticker::*; 4 + use kiteticker_async::*; 5 + use tokio::select; 6 + 7 + use crate::common::check; 8 + 9 + #[tokio::test] 10 + async fn test_ticker() { 11 + let api_key = std::env::var("KITE_API_KEY").unwrap(); 12 + let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap(); 13 + let ticker = KiteTickerAsync::connect(&api_key, &access_token).await; 14 + 15 + assert_eq!(ticker.is_ok(), true); 16 + 17 + let ticker = ticker.unwrap(); 18 + let token = 94977; // bata 19 + let mode = Mode::Full; 20 + let sb = ticker.subscribe(&[token], Some(mode.clone())).await; 21 + assert_eq!(sb.is_ok(), true); 22 + let mut sb = sb.unwrap(); 23 + assert_eq!(sb.get_subscribed().len(), 1); 24 + let mut loop_cnt = 0; 25 + loop { 26 + loop_cnt += 1; 27 + select! { 28 + Ok(n) = sb.next_message() => { 29 + match n.to_owned() { 30 + Some(message) => { 31 + match message { 32 + TickerMessage::Ticks(xs) => { 33 + if xs.len() == 0 { 34 + if loop_cnt > 5 { 35 + break; 36 + }else { 37 + continue; 38 + } 39 + } 40 + assert_eq!(xs.len(), 1); 41 + let tick_message = xs.first().unwrap(); 42 + assert!(tick_message.instrument_token == token); 43 + assert_eq!(tick_message.content.mode, mode); 44 + if loop_cnt > 5 { 45 + break; 46 + } 47 + }, 48 + _ => { 49 + if loop_cnt > 5 { 50 + break; 51 + } 52 + } 53 + } 54 + }, 55 + _ => { 56 + if loop_cnt > 5 { 57 + assert!(false); 58 + break; 59 + } 60 + } 61 + } 62 + }, 63 + else => { 64 + assert!(false); 65 + break; 66 + } 67 + } 68 + } 69 + 70 + sb.close().await.unwrap(); 71 + } 72 + 73 + #[tokio::test] 74 + async fn test_unsubscribe() { 75 + // create a ticker 76 + let api_key = std::env::var("KITE_API_KEY").unwrap(); 77 + let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap(); 78 + let ticker = KiteTickerAsync::connect(&api_key, &access_token).await; 79 + 80 + let ticker = ticker.unwrap(); 81 + let token = 94977; // bata 82 + let mode = Mode::Full; 83 + let mut sb = ticker 84 + .subscribe(&[token], Some(mode.clone())) 85 + .await 86 + .unwrap(); 87 + 88 + let mut loop_cnt = 0; 89 + 90 + loop { 91 + match sb.next_message().await { 92 + Ok(message) => match message { 93 + Some(TickerMessage::Ticks(xs)) => { 94 + if xs.len() == 0 { 95 + if loop_cnt > 4 { 96 + assert!(true); 97 + break; 98 + } else { 99 + loop_cnt += 1; 100 + continue; 101 + } 102 + } 103 + assert_eq!(xs.len(), 1); 104 + let tick_message = xs.first().unwrap(); 105 + assert!(tick_message.instrument_token == token); 106 + sb.unsubscribe(&[]).await.unwrap(); 107 + loop_cnt += 1; 108 + if loop_cnt > 5 { 109 + assert!(false); 110 + break; 111 + } 112 + } 113 + _ => { 114 + continue; 115 + } 116 + }, 117 + _ => { 118 + assert!(false); 119 + break; 120 + } 121 + } 122 + } 123 + sb.close().await.unwrap(); 124 + } 125 + 126 + async fn create_ticker() -> KiteTickerAsync { 127 + // create a ticker 128 + let api_key = std::env::var("KITE_API_KEY").unwrap(); 129 + let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap(); 130 + let ticker = KiteTickerAsync::connect(&api_key, &access_token).await; 131 + ticker.expect("failed to create ticker") 132 + } 133 + 134 + #[tokio::test] 135 + async fn test_set_mode() { 136 + let ticker = create_ticker().await; 137 + let token = 94977; // bata 138 + let mode = Mode::LTP; 139 + let new_mode = Mode::Quote; 140 + let mut sb = ticker 141 + .subscribe(&[token], Some(mode.clone())) 142 + .await 143 + .unwrap(); 144 + 145 + let f1: Option<Box<dyn Fn(Vec<TickMessage>) -> ()>> = None; 146 + let f2: Option<Box<dyn Fn(Vec<TickMessage>) -> ()>> = None; 147 + check(mode, token, &mut sb, f1).await; 148 + sb.set_mode(&[], new_mode.clone()).await.unwrap(); 149 + check(new_mode, token, &mut sb, f2).await; 150 + 151 + sb.close().await.unwrap(); 152 + } 153 + 154 + #[tokio::test] 155 + async fn test_new_sub() { 156 + let ticker = create_ticker().await; 157 + let token = 94977; // bata 158 + let mode = Mode::LTP; 159 + let mut sb = ticker 160 + .subscribe(&[token], Some(mode.clone())) 161 + .await 162 + .unwrap(); 163 + tokio::spawn(async move { 164 + sb.subscribe(&[2953217], None).await.unwrap(); 165 + }) 166 + .await 167 + .unwrap(); 168 + }