1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use chrono::TimeDelta;
5
use futures::TryStreamExt;
6
use mongodb::{
7
    bson::{doc, DateTime, Document},
8
    Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use sylvia_iot_corelib::err::E_UNKNOWN;
13

            
14
use super::super::access_token::{AccessToken, AccessTokenModel, QueryCond, EXPIRES};
15

            
16
/// Model instance.
17
pub struct Model {
18
    /// The associated database connection.
19
    conn: Arc<Database>,
20
}
21

            
22
/// MongoDB schema.
23
2385
#[derive(Deserialize, Serialize)]
24
struct Schema {
25
    #[serde(rename = "accessToken")]
26
    access_token: String,
27
    #[serde(rename = "refreshToken", skip_serializing_if = "Option::is_none")]
28
    refresh_token: Option<String>,
29
    #[serde(rename = "expiresAt")]
30
    expires_at: DateTime,
31
    scope: Option<String>,
32
    #[serde(rename = "clientId")]
33
    client_id: String,
34
    #[serde(rename = "redirectUri")]
35
    redirect_uri: String,
36
    #[serde(rename = "userId")]
37
    user_id: String,
38
    #[serde(rename = "createdAt")]
39
    created_at: DateTime,
40
}
41

            
42
const COL_NAME: &'static str = "accessToken";
43

            
44
impl Model {
45
    /// To create the model instance with a database connection.
46
7
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
47
7
        let model = Model { conn };
48
14
        model.init().await?;
49
7
        Ok(model)
50
7
    }
51
}
52

            
53
#[async_trait]
54
impl AccessTokenModel for Model {
55
12
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
56
12
        let indexes = vec![
57
12
            doc! {"name": "accessToken_1", "key": {"accessToken": 1}, "unique": true},
58
12
            doc! {"name": "refreshToken_1", "key": {"refreshToken": 1}},
59
12
            doc! {"name": "clientId_1", "key": {"clientId": 1}},
60
12
            doc! {"name": "userId_1", "key": {"userId": 1}},
61
12
            doc! {"name": "ttl_1", "key": {"createdAt": 1}, "expireAfterSeconds": EXPIRES + 60},
62
12
        ];
63
12
        let command = doc! {
64
12
            "createIndexes": COL_NAME,
65
12
            "indexes": indexes,
66
12
        };
67
24
        self.conn.run_command(command).await?;
68
12
        Ok(())
69
24
    }
70

            
71
269
    async fn get(&self, access_token: &str) -> Result<Option<AccessToken>, Box<dyn StdError>> {
72
269
        let mut cursor = self
73
269
            .conn
74
269
            .collection::<Schema>(COL_NAME)
75
269
            .find(doc! {"accessToken": access_token})
76
538
            .await?;
77
269
        if let Some(item) = cursor.try_next().await? {
78
239
            return Ok(Some(AccessToken {
79
239
                access_token: item.access_token,
80
239
                refresh_token: item.refresh_token,
81
239
                expires_at: item.expires_at.into(),
82
239
                scope: item.scope,
83
239
                client_id: item.client_id,
84
239
                redirect_uri: item.redirect_uri,
85
239
                user_id: item.user_id,
86
239
            }));
87
30
        }
88
30
        Ok(None)
89
538
    }
90

            
91
151
    async fn add(&self, token: &AccessToken) -> Result<(), Box<dyn StdError>> {
92
151
        let item = Schema {
93
151
            access_token: token.access_token.clone(),
94
151
            refresh_token: token.refresh_token.clone(),
95
151
            expires_at: token.expires_at.into(),
96
151
            scope: token.scope.clone(),
97
151
            client_id: token.client_id.clone(),
98
151
            redirect_uri: token.redirect_uri.clone(),
99
151
            user_id: token.user_id.clone(),
100
151
            created_at: match TimeDelta::try_seconds(EXPIRES) {
101
                None => panic!("{}", E_UNKNOWN),
102
151
                Some(t) => (token.expires_at - t).into(),
103
151
            },
104
151
        };
105
151
        self.conn
106
151
            .collection::<Schema>(COL_NAME)
107
151
            .insert_one(item)
108
303
            .await?;
109
150
        Ok(())
110
302
    }
111

            
112
20
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
113
20
        let filter = get_query_filter(cond);
114
20
        self.conn
115
20
            .collection::<Schema>(COL_NAME)
116
20
            .delete_many(filter)
117
40
            .await?;
118
20
        Ok(())
119
40
    }
120
}
121

            
122
/// Transforms query conditions to the MongoDB document.
123
20
fn get_query_filter(cond: &QueryCond) -> Document {
124
20
    let mut filter = Document::new();
125
20
    if let Some(value) = cond.access_token {
126
4
        filter.insert("accessToken", value);
127
16
    }
128
20
    if let Some(value) = cond.refresh_token {
129
7
        filter.insert("refreshToken", value);
130
13
    }
131
20
    if let Some(value) = cond.client_id {
132
3
        filter.insert("clientId", value);
133
17
    }
134
20
    if let Some(value) = cond.user_id {
135
7
        filter.insert("userId", value);
136
13
    }
137
20
    filter
138
20
}