Categories News

Limita el número de sockets abiertos en los oyentes TCP basados ​​en Tokio

08 de enero de 2021 [Async programming, Programming, Rust]

Hoy aprendí mucho sobre cómo pensar en la concurrencia en Rust. Estoy intentando utilizar Semaphore para limitar el número de sockets abiertos que permite mi escucha TCP y tengo problemas para que funcione. O en realidad no funciona, permitiendo que se conecte cualquier número de clientes, o el compilador me dice que no puedo hacer lo que quiero, porque mi vida útil de Semaphore no es ‘estática’. Este es el camino que tomé para trabajar con el código que creo que es correcto (se agradecen sus comentarios).

Motivación

En el tutorial de Tokio hay una breve sección titulada “Contrapresión y canales limitados” (en la parte inferior de la página Canales). Contiene esta declaración:

…tenga cuidado de garantizar que la cantidad total de concurrencia sea limitada. Por ejemplo, al escribir un bucle de recepción TCP, asegúrese de que el número total de sockets abiertos sea limitado.

Obviamente, cuando comencé a trabajar en el bucle de recepción TCP, quise seguir este consejo.

Como muchas cosas en mi viaje con Rust, fue más difícil de lo que esperaba y, en última instancia, esclarecedor.

Código

Aquí hay un breve programa de Rust que escucha un puerto TCP y acepta conexiones entrantes.

Kargo.toml:

[package]
name = "tcp-listener-example"
version = "1.0.0"
edition = "2018"
include = ["src/"]

[dependencies]
tokio =  version = ">=1.0.1", features = ["full"] 

src/main.rs:

use tokio::io::AsyncReadExt;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap();

    loop 
        let (mut tcp_stream, _) = listener.accept().await.unwrap();
        tokio::spawn(async move 
            let mut buf: [u8; 1024] = [0; 1024];
            loop 
                let n = tcp_stream.read(&mut buf).await.unwrap();
                if n == 0 
                    return;
                
                print!("", String::from_utf8_lossy(&buf[0..n]));
            
        );
    
}

Este programa escucha en el puerto 8080 y cada vez que un cliente se conecta, genera una tarea asincrónica para manejarlo.

Si lo ejecuto con:

cargo run

Se inicia y puedo conectarme a él desde algún otro proceso como este:

telnet 0.0.0.0 8080

Todo lo que escriba en la ventana de la terminal telnet se imprimirá en la terminal donde ejecuto la carga. El programa funciona: escucha en el puerto TCP 8080 e imprime todos los mensajes que recibe.

Entonces, ¿cuál es el problema?

El problema es que el programa puede verse abrumado: si muchos procesos se conectan a él, aceptará todas las conexiones y eventualmente se quedará sin sockets. Esto podría impedir que otras cosas funcionen correctamente en la computadora, o podría bloquear nuestros programas o algo más. Necesitamos algún tipo de límites razonables, como se menciona en el tutorial de Tokio.

Entonces, ¿cómo limitamos la cantidad de personas a las que se les permite conectarse al mismo tiempo?

Solo usa un semáforo, estúpido.

El semáforo hace lo que necesitamos aquí: cuenta cuántas personas están haciendo algo y evita que ese número crezca demasiado. Entonces, todo lo que tenemos que hacer es limitar la cantidad de clientes que permitimos conectarse mediante semáforos.

Aquí está mi primer intento:

use tokio::io::AsyncReadExt;
use tokio::net::TcpListener;
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap();
    let sem = Semaphore::new(2);

    loop {
        let (mut tcp_stream, _) = listener.accept().await.unwrap();
        // Don't copy this code: it doesn't work
        let aq = sem.try_acquire();
        if let Ok(_guard) = aq 
            tokio::spawn(async move 
                let mut buf: [u8; 1024] = [0; 1024];
                loop 
                    let n = tcp_stream.read(&mut buf).await.unwrap();
                    if n == 0 
                        return;
                    
                    print!("", String::from_utf8_lossy(&buf[0..n]));
                
            );
         else 
            println!("Rejecting client: too many open sockets");
        
    }
}

¡Se compila bien, pero no produce nada! Aunque llamamos a Semaphore::new con el argumento 2, con la intención de permitir que solo se conectaran 2 clientes, de hecho, todavía pude conectar más que eso. Parece que nuestros cambios en el código no tienen ningún efecto.

Lo que esperamos que suceda es que cada vez que un cliente se conecte, creemos un _guard, que es un SemaphoreGuard, que ocupa una de las ranuras del semáforo. Esperamos que la guardia permanezca activa hasta que el cliente se desconecte, momento en el que se liberará la ranura.

¿Por qué no funciona? Es fácil de entender cuando piensas en lo que hace tokio::spawn. Crea una tarea y solicita que se ejecute en el futuro, pero en realidad no la ejecuta. Entonces tokio::spawn regresa inmediatamente y _guard se elimina, antes de que se ejecute el código que maneja la solicitud. Entonces, por supuesto, nuestro cambio no limita la cantidad de solicitudes que se manejan porque los espacios del semáforo se desocupan antes de que se procesen las solicitudes.

Sólo mantén la guardia un poco más, tonto.

Entonces, mantengamos SemaphoreGuard un poco más:

use tokio::io::AsyncReadExt;
use tokio::net::TcpListener;
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap();
    let sem = Semaphore::new(2);

    loop {
        let (mut tcp_stream, _) = listener.accept().await.unwrap();
        let aq = sem.try_acquire();
        if let Ok(guard) = aq 
            tokio::spawn(async move 
                let mut buf: [u8; 1024] = [0; 1024];
                loop 
                    let n = tcp_stream.read(&mut buf).await.unwrap();
                    if n == 0 
                        drop(guard);
                        return;
                    
                    print!("", String::from_utf8_lossy(&buf[0..n]));
                
            );
         else 
            println!("Rejecting client: too many open sockets");
        
    }
}

La idea es pasar objetos SemaphoreGuard al código que realmente trata con la solicitud del cliente. La forma en que lo intenté fue refiriéndome a la guardia en algún lugar del cierre del movimiento asíncrono. Lo que en realidad estoy haciendo es decirle que deje de proteger cuando hayamos terminado con la solicitud, pero de hecho, cualquier mención de esa variable dentro del cierre es suficiente para decirle al compilador que queremos moverla y simplemente desecharla cuando hayamos terminado.

Suena razonable, pero en realidad este código no se compila. Aquí está el error que recibí:

error[E0597]: `sem` does not live long enough
  --> src/main.rs:12:18
   |
12 |         let aq = sem.try_acquire();
   |                  ^^^--------------
   |                  |
   |                  borrowed value does not live long enough
   |                  argument requires that `sem` is borrowed for `'static`
...
29 | }
   | - `sem` dropped here while still borrowed

Lo que dice el compilador es que SemaphoreGuard se refiere a sem (objeto Semaphore), pero el guard puede sobrevivir al semáforo.

¿Por qué? Seguramente sem tiene un alcance que incluye todo el código de manejo del cliente, por lo que debería durar bastante tiempo.

De hecho, el cierre de movimiento asincrónico que le pasamos a tokio::spawn se agregará a la lista de tareas que se ejecutarán en el futuro, por lo que puede durar más. El hecho de que estemos en un bucle infinito me confunde aún más aquí, pero el principio sigue siendo el mismo: cada vez que creamos un cierre como este y ponemos algo en él, el cierre debe tenerlo, o si lo tomamos prestado, debe vivir para siempre (que es lo que se entiende por “vida útil estática”).

El código anterior pasa la propiedad de la guardia al cierre, pero la propia guardia hace referencia (toma prestado) sem. Es por eso que el compilador dice que “sem se toma prestado para ‘estático’.

Cosas equivocadas que intenté

Como no entendía lo que estaba haciendo, intenté varias otras cosas como hacer que sem sea un Arco, hacer que guarde un Arco, hacer que guarde dentro de un cierre, e incluso intenté hacer que sem realmente tuviera “almacenamiento estático” convirtiéndolo en una constante. (Esto último no funciona porque sólo los tipos muy simples, como números y cadenas, pueden ser constantes).

Solución: compartir semáforo en Arc

Después de lo que me pareció demasiada paliza, encontré lo que creo que es la respuesta correcta:

use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::net::TcpListener;
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap();
    let sem = Arc::new(Semaphore::new(2));

    loop {
        let (mut tcp_stream, _) = listener.accept().await.unwrap();
        let sem_clone = Arc::clone(&sem);
        tokio::spawn(async move 
            let aq = sem_clone.try_acquire();
            if let Ok(_guard) = aq 
                let mut buf: [u8; 1024] = [0; 1024];
                loop 
                    let n = tcp_stream.read(&mut buf).await.unwrap();
                    if n == 0 
                        return;
                    
                    print!("", String::from_utf8_lossy(&buf[0..n]));
                
             else 
                println!("Rejecting client: too many open sockets");
            
        );
    }
}

Este código:

  • Crea un semáforo y lo almacena dentro de Arc, que es un puntero de cálculo de referencia que se puede compartir entre tareas. Esto significa que persistirá mientras alguien tenga la referencia.
  • Clonar Arc para que tengamos una copia que se pueda mover de forma segura a un cierre de movimiento asíncrono. No podemos cerrar el sem porque se utilizará nuevamente en la siguiente ronda. Podemos mover sem_clone al cierre ya que no se usa en ningún otro lugar. sem y sem_clone hacen referencia al mismo objeto Semaphore, por lo que coinciden en la cantidad de clientes conectados, pero son instancias de Arc diferentes, por lo que se pueden mover a cierres.
  • Adquiera SemaphoreGuard solo una vez que estemos dentro del cierre. De esta manera no hacemos nada difícil como tomar prestada una referencia a algo que está fuera del cierre. En su lugar, tomamos prestada una referencia a través de sem_clone, que pertenece al cierre en el que nos encontramos, por lo que sabemos que la referencia persistirá el tiempo suficiente.

¡Realmente funciona! Una vez que los dos clientes están conectados, listener.accept en realidad abre el socket a cualquier cliente nuevo, pero como regresamos del cierre inmediatamente, solo lo abrimos brevemente antes de soltarlo. Esto parece mejor que negarse a abrirlo, lo que creo que probablemente dejaría al cliente esperando, esperando una conexión que tal vez nunca llegue.

La vida es genial y complicada.

Nuevamente, aprendí mucho sobre lo que realmente hace mi código desde el compilador Rust. Esto me parece muy confuso, pero espero que al escribir mis ideas en esta publicación, haya ayudado a mi yo actual y futuro, y tal vez incluso a usted, a ser más claro sobre cómo compartir semáforos entre múltiples tareas asincrónicas.

Es divertido y enriquecedor escribir código que sé que es correcto y que también funciona. La sensación de que “el compilador me respalda” es fuerte y me gusta.

Berita Terkini

Berita Terbaru

Daftar Terbaru

News

Berita Terbaru

Flash News

RuangJP

Pemilu

Berita Terkini

Prediksi Bola

Togel Deposit Pulsa

Technology

Otomotif

Berita Terbaru

Daftar Judi Slot Online Terpercaya

Slot yang lagi gacor

Teknologi

Berita terkini

Berita Pemilu

Berita Teknologi

Hiburan

master Slote

Berita Terkini

Pendidikan

Resep

Jasa Backlink

One Piece Terbaru

More From Author