1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use chrono::TimeDelta;
5
use futures::TryStreamExt;
6
use mongodb::{
7
    bson::{doc, DateTime, Document},
8
    Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use sylvia_iot_corelib::err::E_UNKNOWN;
13

            
14
use super::super::refresh_token::{QueryCond, RefreshToken, RefreshTokenModel, EXPIRES};
15

            
16
/// Model instance.
17
pub struct Model {
18
    /// The associated database connection.
19
    conn: Arc<Database>,
20
}
21

            
22
/// MongoDB schema.
23
216
#[derive(Deserialize, Serialize)]
24
struct Schema {
25
    #[serde(rename = "refreshToken")]
26
    refresh_token: String,
27
    #[serde(rename = "expiresAt")]
28
    expires_at: DateTime,
29
    scope: Option<String>,
30
    #[serde(rename = "clientId")]
31
    client_id: String,
32
    #[serde(rename = "redirectUri")]
33
    redirect_uri: String,
34
    #[serde(rename = "userId")]
35
    user_id: String,
36
    #[serde(rename = "createdAt")]
37
    created_at: DateTime,
38
}
39

            
40
const COL_NAME: &'static str = "refreshToken";
41

            
42
impl Model {
43
    /// To create the model instance with a database connection.
44
7
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
45
7
        let model = Model { conn };
46
14
        model.init().await?;
47
7
        Ok(model)
48
7
    }
49
}
50

            
51
#[async_trait]
52
impl RefreshTokenModel for Model {
53
12
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
54
12
        let indexes = vec![
55
12
            doc! {"name": "refreshToken_1", "key": {"refreshToken": 1}, "unique": true},
56
12
            doc! {"name": "clientId_1", "key": {"clientId": 1}},
57
12
            doc! {"name": "userId_1", "key": {"userId": 1}},
58
12
            doc! {"name": "ttl_1", "key": {"createdAt": 1}, "expireAfterSeconds": EXPIRES + 60},
59
12
        ];
60
12
        let command = doc! {
61
12
            "createIndexes": COL_NAME,
62
12
            "indexes": indexes,
63
12
        };
64
24
        self.conn.run_command(command).await?;
65
12
        Ok(())
66
24
    }
67

            
68
33
    async fn get(&self, refresh_token: &str) -> Result<Option<RefreshToken>, Box<dyn StdError>> {
69
33
        let mut cursor = self
70
33
            .conn
71
33
            .collection::<Schema>(COL_NAME)
72
33
            .find(doc! {"refreshToken": refresh_token})
73
66
            .await?;
74
33
        if let Some(item) = cursor.try_next().await? {
75
24
            return Ok(Some(RefreshToken {
76
24
                refresh_token: item.refresh_token,
77
24
                expires_at: item.expires_at.into(),
78
24
                scope: item.scope,
79
24
                client_id: item.client_id,
80
24
                redirect_uri: item.redirect_uri,
81
24
                user_id: item.user_id,
82
24
            }));
83
9
        }
84
9
        Ok(None)
85
66
    }
86

            
87
149
    async fn add(&self, token: &RefreshToken) -> Result<(), Box<dyn StdError>> {
88
149
        let item = Schema {
89
149
            refresh_token: token.refresh_token.clone(),
90
149
            expires_at: token.expires_at.into(),
91
149
            scope: token.scope.clone(),
92
149
            client_id: token.client_id.clone(),
93
149
            redirect_uri: token.redirect_uri.clone(),
94
149
            user_id: token.user_id.clone(),
95
149
            created_at: match TimeDelta::try_seconds(EXPIRES) {
96
                None => panic!("{}", E_UNKNOWN),
97
149
                Some(t) => (token.expires_at - t).into(),
98
149
            },
99
149
        };
100
149
        self.conn
101
149
            .collection::<Schema>(COL_NAME)
102
149
            .insert_one(item)
103
299
            .await?;
104
148
        Ok(())
105
298
    }
106

            
107
18
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
108
18
        let filter = get_query_filter(cond);
109
18
        self.conn
110
18
            .collection::<Schema>(COL_NAME)
111
18
            .delete_many(filter)
112
36
            .await?;
113
18
        Ok(())
114
36
    }
115
}
116

            
117
/// Transforms query conditions to the MongoDB document.
118
18
fn get_query_filter(cond: &QueryCond) -> Document {
119
18
    let mut filter = Document::new();
120
18
    if let Some(value) = cond.refresh_token {
121
9
        filter.insert("refreshToken", value);
122
9
    }
123
18
    if let Some(value) = cond.client_id {
124
3
        filter.insert("clientId", value);
125
15
    }
126
18
    if let Some(value) = cond.user_id {
127
7
        filter.insert("userId", value);
128
11
    }
129
18
    filter
130
18
}