1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use chrono::TimeDelta;
5
use futures::TryStreamExt;
6
use mongodb::{
7
    Database,
8
    bson::{DateTime, Document, doc},
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use sylvia_iot_corelib::err::E_UNKNOWN;
13

            
14
use super::super::access_token::{AccessToken, AccessTokenModel, EXPIRES, QueryCond};
15

            
16
/// Model instance.
17
pub struct Model {
18
    /// The associated database connection.
19
    conn: Arc<Database>,
20
}
21

            
22
/// MongoDB schema.
23
#[derive(Deserialize, Serialize)]
24
struct Schema {
25
    #[serde(rename = "accessToken")]
26
    access_token: String,
27
    #[serde(rename = "refreshToken", skip_serializing_if = "Option::is_none")]
28
    refresh_token: Option<String>,
29
    #[serde(rename = "expiresAt")]
30
    expires_at: DateTime,
31
    scope: Option<String>,
32
    #[serde(rename = "clientId")]
33
    client_id: String,
34
    #[serde(rename = "redirectUri")]
35
    redirect_uri: String,
36
    #[serde(rename = "userId")]
37
    user_id: String,
38
    #[serde(rename = "createdAt")]
39
    created_at: DateTime,
40
}
41

            
42
const COL_NAME: &'static str = "accessToken";
43

            
44
impl Model {
45
    /// To create the model instance with a database connection.
46
14
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
47
14
        let model = Model { conn };
48
14
        model.init().await?;
49
14
        Ok(model)
50
14
    }
51
}
52

            
53
#[async_trait]
54
impl AccessTokenModel for Model {
55
24
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
56
        let indexes = vec![
57
            doc! {"name": "accessToken_1", "key": {"accessToken": 1}, "unique": true},
58
            doc! {"name": "refreshToken_1", "key": {"refreshToken": 1}},
59
            doc! {"name": "clientId_1", "key": {"clientId": 1}},
60
            doc! {"name": "userId_1", "key": {"userId": 1}},
61
            doc! {"name": "ttl_1", "key": {"createdAt": 1}, "expireAfterSeconds": EXPIRES + 60},
62
        ];
63
        let command = doc! {
64
            "createIndexes": COL_NAME,
65
            "indexes": indexes,
66
        };
67
        self.conn.run_command(command).await?;
68
        Ok(())
69
24
    }
70

            
71
538
    async fn get(&self, access_token: &str) -> Result<Option<AccessToken>, Box<dyn StdError>> {
72
        let mut cursor = self
73
            .conn
74
            .collection::<Schema>(COL_NAME)
75
            .find(doc! {"accessToken": access_token})
76
            .await?;
77
        if let Some(item) = cursor.try_next().await? {
78
            return Ok(Some(AccessToken {
79
                access_token: item.access_token,
80
                refresh_token: item.refresh_token,
81
                expires_at: item.expires_at.into(),
82
                scope: item.scope,
83
                client_id: item.client_id,
84
                redirect_uri: item.redirect_uri,
85
                user_id: item.user_id,
86
            }));
87
        }
88
        Ok(None)
89
538
    }
90

            
91
302
    async fn add(&self, token: &AccessToken) -> Result<(), Box<dyn StdError>> {
92
        let item = Schema {
93
            access_token: token.access_token.clone(),
94
            refresh_token: token.refresh_token.clone(),
95
            expires_at: token.expires_at.into(),
96
            scope: token.scope.clone(),
97
            client_id: token.client_id.clone(),
98
            redirect_uri: token.redirect_uri.clone(),
99
            user_id: token.user_id.clone(),
100
            created_at: match TimeDelta::try_seconds(EXPIRES) {
101
                None => panic!("{}", E_UNKNOWN),
102
                Some(t) => (token.expires_at - t).into(),
103
            },
104
        };
105
        self.conn
106
            .collection::<Schema>(COL_NAME)
107
            .insert_one(item)
108
            .await?;
109
        Ok(())
110
302
    }
111

            
112
40
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
113
        let filter = get_query_filter(cond);
114
        self.conn
115
            .collection::<Schema>(COL_NAME)
116
            .delete_many(filter)
117
            .await?;
118
        Ok(())
119
40
    }
120
}
121

            
122
/// Transforms query conditions to the MongoDB document.
123
40
fn get_query_filter(cond: &QueryCond) -> Document {
124
40
    let mut filter = Document::new();
125
40
    if let Some(value) = cond.access_token {
126
8
        filter.insert("accessToken", value);
127
32
    }
128
40
    if let Some(value) = cond.refresh_token {
129
14
        filter.insert("refreshToken", value);
130
26
    }
131
40
    if let Some(value) = cond.client_id {
132
6
        filter.insert("clientId", value);
133
34
    }
134
40
    if let Some(value) = cond.user_id {
135
14
        filter.insert("userId", value);
136
26
    }
137
40
    filter
138
40
}