Задать вопрос
@TicSo

Как повторять цикл в дочернем потоке с заданным интервалом для последних актуальных данных?

В продолжение темы спрошу по отдельности.
Приведенный код даёт время (задал для удобства 20 секунд) на заполнение вектора v_txt_main
в потоке main. Например, внесли в вектор три элемента в консоли следующим образом:
1 - qqq
1 - www
1 - eee
Если на момент ввода третьего элемента прошло более заданных 20 секунд, то по каналу
актуальное содержимое вектора уйдёт в дочерний поток в работу, в данном случае для вывода в консоль. Типа такого:
from thread:   "qqq" -> 2025-07-25T07:04:36.376855400Z -> 2025-07-25T07:04:05.360081700Z
from thread:   "www" -> 2025-07-25T07:04:36.377157300Z -> 2025-07-25T07:04:05.360081700Z
from thread:   "eee" -> 2025-07-25T07:04:36.377357600Z -> 2025-07-25T07:04:05.360081700Z


Как изменить приведенный код, чтобы в этом отдельном потоке в случае, если за 10 секунд
по каналу ничего не пришло, повторялся вывод в консоль последнего актуального набора
данных? Например, хочу выводить верхние три строчки каждые 10 секунд, если не зашло
новое содержимое вектора. Точнее: каждые 10 сек. от времени job_01 отдельный поток
выводит данные, если на старте программы вектор ещё ни разу не зашёл,
то выводит - "нет данных".
код

// [dependencies]
// chrono = "0.4.41"
use std::io;
use std::collections::HashMap;
//
use std::thread;
use std::thread::sleep;
use std::time::Duration;
use std::sync::mpsc;
//
use std::sync::mpsc::Sender;
use std::sync::mpsc::Receiver;
//
use chrono::prelude::*;

fn main() {

   let mut dt_start: DateTime<chrono::Utc> = Utc::now();
   println!("---\n{:?}\n---\n", &dt_start);
   
   let mut v_txt_main: Vec<String> = Vec::new();
   let (tx_01, rx_01):(Sender<DateTime<chrono::Utc>>, Receiver<_>) = mpsc::channel();
   let (tx_02, rx_02):(Sender<Vec<String>>, Receiver<_>) = mpsc::channel();

   thread::spawn(move || {
   
      let job_01: Result<chrono::DateTime<chrono::Utc>, std::sync::mpsc::RecvError> = rx_01.recv();
      println!("dt_start to thread: {:?}", &job_01.unwrap());
      
      let dt_next: DateTime<chrono::Utc> = Utc::now();
      let mut job_02: Result<Vec<String>, std::sync::mpsc::RecvError>;

         loop {
            println!("{:?} ^ {:?}", job_01.unwrap(), dt_next);
            job_02 = rx_02.recv();
            for v in job_02.unwrap().iter() {
             println!("from thread: {:?} -> {:?} -> {:?}", &v, Utc::now(), &job_01.unwrap());
            }
            sleep(Duration::from_millis(10));
         }
   });

   
   let _ = tx_01.send(dt_start);

   loop {
      let mut h_add_del_txt: HashMap<u8, String> = HashMap::new();
      let console: (u8, String) = f01(); 

      if console.0 == 1 {
         h_add_del_txt.insert(1, console.1);
       } else if console.0 == 2 {
         h_add_del_txt.insert(2, console.1);
       } else {  
         println!("Неверный выбор, повторите:");
         f01();
      }

      for (k, v) in &h_add_del_txt {
        if v_txt_main.iter().any(|n| *n == *v) {
         
           if *k == 2 {
              v_txt_main.retain(|a| *a != *v);
            } else {
              // ...
            }
        } else {
            if *k == 1 {
               v_txt_main.push(v.clone());
            } else {
            // ...
            }     
        }
      }    

     if Utc::now() >= dt_start + Duration::from_secs(20) {
        let _ = tx_02.send(v_txt_main.clone());
        sleep(Duration::from_millis(20));
        dt_start = Utc::now();
     }
   }
}


fn f01() -> (u8, String) {

   println!("\nВведите вариант (1 или 2): \n 1 - для добавления `ТЕКСТА`\n или \n 2 - для удаления `ТЕКСТА`\n");   
   let mut flag = String::new();
   io::stdin().read_line(&mut flag).expect("Failed to read line");
   let flag: u8 = flag.trim().parse().expect("Please type a number!");

   if flag == 1 {
     println!("Внесите ТЕКСТ на добавление:");   
     let mut g = String::new();
     io::stdin().read_line(&mut g).expect("Failed to read line");
     let g: String = g.trim().parse().expect("Please type ...");

     return (flag, g);
   } else if flag == 2 {
     println!("Внесите ТЕКСТ на удаление:");   
     let mut g = String::new();
     io::stdin().read_line(&mut g).expect("Failed to read line");
     let g: String = g.trim().parse().expect("Please type a ...");
     return (flag, g);
   } else {
     return (0, "Error_...".to_string());
   }     
}


Не получается `развязать` режим ожидания ввода данных вектора в консоли с
независимой работой отдельного потока с шагом 10 сек., т.е. хочу в консоли без
провалов на ожидание ввода каждые 10 секунд видеть содержимое из
отдельного потока. Вот стартанули по времени в потоке: job_01 , например
в 2025-07-25T07:04:05.360081700Z
и от него каждые 10 секунд в консоли хочу видеть последний набор данных,
который был или уже попал в поток.
Пытался делать ввод данных с консоли в ещё одном потоке, - не получилось.
Так, две проблемы: `развязать` и вывод последних данных с шагом в 10 сек.
  • Вопрос задан
  • 104 просмотра
Подписаться 1 Простой 6 комментариев
Решения вопроса 1
bingo347
@bingo347
Crazy on performance...
Примерно так это можно реализовать:
use chrono::Utc;
use std::{
    io::{self, BufRead as _},
    sync::mpsc::{self, RecvTimeoutError, Sender},
    thread,
    time::{Duration, Instant},
};

enum Op {
    Add(String),
    Del(String),
}

const PRINT_INTERVAL: Duration = Duration::from_secs(10);

fn main() {
    println!("---\n{}\n---\n", Utc::now());

    let (tx, rx) = mpsc::channel();
    start_stdin_thread(tx);

    let mut strings = Vec::new();
    let mut last_print = Instant::now();
    loop {
        match rx.recv_timeout(PRINT_INTERVAL - last_print.elapsed()) {
            Err(RecvTimeoutError::Disconnected) => break,
            Err(RecvTimeoutError::Timeout) => {
                println!("---\n{}\n{:?}\n---\n", Utc::now(), strings);
                last_print = Instant::now();
            }
            Ok(Op::Add(s)) => {
                strings.push(s);
            }
            Ok(Op::Del(s)) => {
                strings.retain(|v| v != &s);
            }
        }
    }
}

fn start_stdin_thread(tx: Sender<Op>) {
    #[derive(Clone, Copy)]
    enum OpRaw {
        Add,
        Del,
    }

    thread::spawn(move || {
        let mut stdin = io::stdin().lock();
        let mut buf = String::with_capacity(1024);
        let mut read_line = |buf: &mut String| -> io::Result<()> {
            buf.clear();
            stdin.read_line(buf).map(|_| ()).inspect_err(|err| {
                eprintln!("Ошибка чтения stdin: {err}");
            })
        };

        loop {
            println!("1. Добавить текст");
            println!("2. Удалить текст");
            print!("Введите номер операции: ");
            if read_line(&mut buf).is_err() {
                break;
            }

            let op = match buf.trim() {
                "1" => OpRaw::Add,
                "2" => OpRaw::Del,
                _ => {
                    eprintln!("Некорректная операция");
                    continue;
                }
            };

            println!(
                "Введите текст на {}:",
                match op {
                    OpRaw::Add => "добавление",
                    OpRaw::Del => "удаление",
                }
            );
            if read_line(&mut buf).is_err() {
                break;
            }

            let text = buf.trim().to_string();
            let op = match op {
                OpRaw::Add => Op::Add(text),
                OpRaw::Del => Op::Del(text),
            };

            if tx.send(op).is_err() {
                break;
            }
        }
    });
}
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы