1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use chrono::TimeDelta;
5
use futures::TryStreamExt;
6
use mongodb::{
7
    Database,
8
    bson::{DateTime, Document, doc},
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use sylvia_iot_corelib::err::E_UNKNOWN;
13

            
14
use super::super::login_session::{EXPIRES, LoginSession, LoginSessionModel, QueryCond};
15

            
16
/// Model instance.
17
pub struct Model {
18
    /// The associated database connection.
19
    conn: Arc<Database>,
20
}
21

            
22
/// MongoDB schema.
23
#[derive(Deserialize, Serialize)]
24
struct Schema {
25
    #[serde(rename = "sessionId")]
26
    session_id: String,
27
    #[serde(rename = "expiresAt")]
28
    expires_at: DateTime,
29
    #[serde(rename = "userId")]
30
    user_id: String,
31
    #[serde(rename = "createdAt")]
32
    created_at: DateTime,
33
}
34

            
35
const COL_NAME: &'static str = "loginSession";
36

            
37
impl Model {
38
    /// To create the model instance with a database connection.
39
14
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
40
14
        let model = Model { conn };
41
14
        model.init().await?;
42
14
        Ok(model)
43
14
    }
44
}
45

            
46
#[async_trait]
47
impl LoginSessionModel for Model {
48
24
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
49
        let indexes = vec![
50
            doc! {"name": "sessionId_1", "key": {"sessionId": 1}, "unique": true},
51
            doc! {"name": "userId_1", "key": {"userId": 1}},
52
            doc! {"name": "ttl_1", "key": {"createdAt": 1}, "expireAfterSeconds": EXPIRES + 60},
53
        ];
54
        let command = doc! {
55
            "createIndexes": COL_NAME,
56
            "indexes": indexes,
57
        };
58
        self.conn.run_command(command).await?;
59
        Ok(())
60
24
    }
61

            
62
296
    async fn get(&self, session_id: &str) -> Result<Option<LoginSession>, Box<dyn StdError>> {
63
        let mut cursor = self
64
            .conn
65
            .collection::<Schema>(COL_NAME)
66
            .find(doc! {"sessionId": session_id})
67
            .await?;
68
        if let Some(item) = cursor.try_next().await? {
69
            return Ok(Some(LoginSession {
70
                session_id: item.session_id,
71
                expires_at: item.expires_at.into(),
72
                user_id: item.user_id,
73
            }));
74
        }
75
        Ok(None)
76
296
    }
77

            
78
304
    async fn add(&self, session: &LoginSession) -> Result<(), Box<dyn StdError>> {
79
        let item = Schema {
80
            session_id: session.session_id.clone(),
81
            expires_at: session.expires_at.into(),
82
            user_id: session.user_id.clone(),
83
            created_at: match TimeDelta::try_seconds(EXPIRES) {
84
                None => panic!("{}", E_UNKNOWN),
85
                Some(t) => (session.expires_at - t).into(),
86
            },
87
        };
88
        self.conn
89
            .collection::<Schema>(COL_NAME)
90
            .insert_one(item)
91
            .await?;
92
        Ok(())
93
304
    }
94

            
95
282
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
96
        let filter = get_query_filter(cond);
97
        self.conn
98
            .collection::<Schema>(COL_NAME)
99
            .delete_many(filter)
100
            .await?;
101
        Ok(())
102
282
    }
103
}
104

            
105
/// Transforms query conditions to the MongoDB document.
106
282
fn get_query_filter(cond: &QueryCond) -> Document {
107
282
    let mut filter = Document::new();
108
282
    if let Some(value) = cond.session_id {
109
280
        filter.insert("sessionId", value);
110
280
    }
111
282
    if let Some(value) = cond.user_id {
112
2
        filter.insert("userId", value);
113
280
    }
114
282
    filter
115
282
}