1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use chrono::TimeDelta;
5
use futures::TryStreamExt;
6
use mongodb::{
7
    bson::{doc, DateTime, Document},
8
    Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use sylvia_iot_corelib::err::E_UNKNOWN;
13

            
14
use super::super::access_token::{AccessToken, AccessTokenModel, QueryCond, EXPIRES};
15

            
16
/// Model instance.
17
pub struct Model {
18
    /// The associated database connection.
19
    conn: Arc<Database>,
20
}
21

            
22
/// MongoDB schema.
23
#[derive(Deserialize, Serialize)]
24
struct Schema {
25
    #[serde(rename = "accessToken")]
26
    access_token: String,
27
    #[serde(rename = "refreshToken", skip_serializing_if = "Option::is_none")]
28
    refresh_token: Option<String>,
29
    #[serde(rename = "expiresAt")]
30
    expires_at: DateTime,
31
    scope: Option<String>,
32
    #[serde(rename = "clientId")]
33
    client_id: String,
34
    #[serde(rename = "redirectUri")]
35
    redirect_uri: String,
36
    #[serde(rename = "userId")]
37
    user_id: String,
38
    #[serde(rename = "createdAt")]
39
    created_at: DateTime,
40
}
41

            
42
const COL_NAME: &'static str = "accessToken";
43

            
44
impl Model {
45
    /// To create the model instance with a database connection.
46
14
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
47
14
        let model = Model { conn };
48
14
        model.init().await?;
49
14
        Ok(model)
50
14
    }
51
}
52

            
53
#[async_trait]
54
impl AccessTokenModel for Model {
55
24
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
56
24
        let indexes = vec![
57
24
            doc! {"name": "accessToken_1", "key": {"accessToken": 1}, "unique": true},
58
24
            doc! {"name": "refreshToken_1", "key": {"refreshToken": 1}},
59
24
            doc! {"name": "clientId_1", "key": {"clientId": 1}},
60
24
            doc! {"name": "userId_1", "key": {"userId": 1}},
61
24
            doc! {"name": "ttl_1", "key": {"createdAt": 1}, "expireAfterSeconds": EXPIRES + 60},
62
24
        ];
63
24
        let command = doc! {
64
24
            "createIndexes": COL_NAME,
65
24
            "indexes": indexes,
66
24
        };
67
24
        self.conn.run_command(command).await?;
68
24
        Ok(())
69
48
    }
70

            
71
538
    async fn get(&self, access_token: &str) -> Result<Option<AccessToken>, Box<dyn StdError>> {
72
538
        let mut cursor = self
73
538
            .conn
74
538
            .collection::<Schema>(COL_NAME)
75
538
            .find(doc! {"accessToken": access_token})
76
538
            .await?;
77
538
        if let Some(item) = cursor.try_next().await? {
78
478
            return Ok(Some(AccessToken {
79
478
                access_token: item.access_token,
80
478
                refresh_token: item.refresh_token,
81
478
                expires_at: item.expires_at.into(),
82
478
                scope: item.scope,
83
478
                client_id: item.client_id,
84
478
                redirect_uri: item.redirect_uri,
85
478
                user_id: item.user_id,
86
478
            }));
87
60
        }
88
60
        Ok(None)
89
1076
    }
90

            
91
302
    async fn add(&self, token: &AccessToken) -> Result<(), Box<dyn StdError>> {
92
302
        let item = Schema {
93
302
            access_token: token.access_token.clone(),
94
302
            refresh_token: token.refresh_token.clone(),
95
302
            expires_at: token.expires_at.into(),
96
302
            scope: token.scope.clone(),
97
302
            client_id: token.client_id.clone(),
98
302
            redirect_uri: token.redirect_uri.clone(),
99
302
            user_id: token.user_id.clone(),
100
302
            created_at: match TimeDelta::try_seconds(EXPIRES) {
101
                None => panic!("{}", E_UNKNOWN),
102
302
                Some(t) => (token.expires_at - t).into(),
103
302
            },
104
302
        };
105
302
        self.conn
106
302
            .collection::<Schema>(COL_NAME)
107
302
            .insert_one(item)
108
302
            .await?;
109
300
        Ok(())
110
604
    }
111

            
112
40
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
113
40
        let filter = get_query_filter(cond);
114
40
        self.conn
115
40
            .collection::<Schema>(COL_NAME)
116
40
            .delete_many(filter)
117
40
            .await?;
118
40
        Ok(())
119
80
    }
120
}
121

            
122
/// Transforms query conditions to the MongoDB document.
123
40
fn get_query_filter(cond: &QueryCond) -> Document {
124
40
    let mut filter = Document::new();
125
40
    if let Some(value) = cond.access_token {
126
8
        filter.insert("accessToken", value);
127
32
    }
128
40
    if let Some(value) = cond.refresh_token {
129
14
        filter.insert("refreshToken", value);
130
26
    }
131
40
    if let Some(value) = cond.client_id {
132
6
        filter.insert("clientId", value);
133
34
    }
134
40
    if let Some(value) = cond.user_id {
135
14
        filter.insert("userId", value);
136
26
    }
137
40
    filter
138
40
}