1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use chrono::TimeDelta;
5
use futures::TryStreamExt;
6
use mongodb::{
7
    bson::{doc, DateTime, Document},
8
    Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use sylvia_iot_corelib::err::E_UNKNOWN;
13

            
14
use super::super::refresh_token::{QueryCond, RefreshToken, RefreshTokenModel, EXPIRES};
15

            
16
/// Model instance.
17
pub struct Model {
18
    /// The associated database connection.
19
    conn: Arc<Database>,
20
}
21

            
22
/// MongoDB schema.
23
#[derive(Deserialize, Serialize)]
24
struct Schema {
25
    #[serde(rename = "refreshToken")]
26
    refresh_token: String,
27
    #[serde(rename = "expiresAt")]
28
    expires_at: DateTime,
29
    scope: Option<String>,
30
    #[serde(rename = "clientId")]
31
    client_id: String,
32
    #[serde(rename = "redirectUri")]
33
    redirect_uri: String,
34
    #[serde(rename = "userId")]
35
    user_id: String,
36
    #[serde(rename = "createdAt")]
37
    created_at: DateTime,
38
}
39

            
40
const COL_NAME: &'static str = "refreshToken";
41

            
42
impl Model {
43
    /// To create the model instance with a database connection.
44
14
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
45
14
        let model = Model { conn };
46
14
        model.init().await?;
47
14
        Ok(model)
48
14
    }
49
}
50

            
51
#[async_trait]
52
impl RefreshTokenModel for Model {
53
24
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
54
24
        let indexes = vec![
55
24
            doc! {"name": "refreshToken_1", "key": {"refreshToken": 1}, "unique": true},
56
24
            doc! {"name": "clientId_1", "key": {"clientId": 1}},
57
24
            doc! {"name": "userId_1", "key": {"userId": 1}},
58
24
            doc! {"name": "ttl_1", "key": {"createdAt": 1}, "expireAfterSeconds": EXPIRES + 60},
59
24
        ];
60
24
        let command = doc! {
61
24
            "createIndexes": COL_NAME,
62
24
            "indexes": indexes,
63
24
        };
64
24
        self.conn.run_command(command).await?;
65
24
        Ok(())
66
48
    }
67

            
68
66
    async fn get(&self, refresh_token: &str) -> Result<Option<RefreshToken>, Box<dyn StdError>> {
69
66
        let mut cursor = self
70
66
            .conn
71
66
            .collection::<Schema>(COL_NAME)
72
66
            .find(doc! {"refreshToken": refresh_token})
73
66
            .await?;
74
66
        if let Some(item) = cursor.try_next().await? {
75
48
            return Ok(Some(RefreshToken {
76
48
                refresh_token: item.refresh_token,
77
48
                expires_at: item.expires_at.into(),
78
48
                scope: item.scope,
79
48
                client_id: item.client_id,
80
48
                redirect_uri: item.redirect_uri,
81
48
                user_id: item.user_id,
82
48
            }));
83
18
        }
84
18
        Ok(None)
85
132
    }
86

            
87
298
    async fn add(&self, token: &RefreshToken) -> Result<(), Box<dyn StdError>> {
88
298
        let item = Schema {
89
298
            refresh_token: token.refresh_token.clone(),
90
298
            expires_at: token.expires_at.into(),
91
298
            scope: token.scope.clone(),
92
298
            client_id: token.client_id.clone(),
93
298
            redirect_uri: token.redirect_uri.clone(),
94
298
            user_id: token.user_id.clone(),
95
298
            created_at: match TimeDelta::try_seconds(EXPIRES) {
96
                None => panic!("{}", E_UNKNOWN),
97
298
                Some(t) => (token.expires_at - t).into(),
98
298
            },
99
298
        };
100
298
        self.conn
101
298
            .collection::<Schema>(COL_NAME)
102
298
            .insert_one(item)
103
298
            .await?;
104
296
        Ok(())
105
596
    }
106

            
107
36
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
108
36
        let filter = get_query_filter(cond);
109
36
        self.conn
110
36
            .collection::<Schema>(COL_NAME)
111
36
            .delete_many(filter)
112
36
            .await?;
113
36
        Ok(())
114
72
    }
115
}
116

            
117
/// Transforms query conditions to the MongoDB document.
118
36
fn get_query_filter(cond: &QueryCond) -> Document {
119
36
    let mut filter = Document::new();
120
36
    if let Some(value) = cond.refresh_token {
121
18
        filter.insert("refreshToken", value);
122
18
    }
123
36
    if let Some(value) = cond.client_id {
124
6
        filter.insert("clientId", value);
125
30
    }
126
36
    if let Some(value) = cond.user_id {
127
14
        filter.insert("userId", value);
128
22
    }
129
36
    filter
130
36
}