Lines
99.68 %
Functions
58.7 %
Branches
100 %
use std::{collections::HashMap, error::Error as StdError, sync::Arc};
use async_trait::async_trait;
use futures::TryStreamExt;
use mongodb::{
action::Find,
bson::{self, doc, Bson, DateTime, Document, Regex},
Cursor as MongoDbCursor, Database,
};
use serde::{Deserialize, Serialize};
use super::super::user::{
Cursor, ListOptions, ListQueryCond, QueryCond, SortKey, Updates, User, UserModel,
/// Model instance.
pub struct Model {
/// The associated database connection.
conn: Arc<Database>,
}
/// Cursor instance.
struct DbCursor {
/// The associated collection cursor.
cursor: MongoDbCursor<Schema>,
/// (Useless) only for Cursor trait implementation.
offset: u64,
/// MongoDB schema.
#[derive(Deserialize, Serialize)]
struct Schema {
#[serde(rename = "userId")]
user_id: String,
account: String,
#[serde(rename = "createdAt")]
created_at: DateTime,
#[serde(rename = "modifiedAt")]
modified_at: DateTime,
#[serde(rename = "verifiedAt")]
verified_at: Option<DateTime>,
#[serde(rename = "expiredAt")]
expired_at: Option<DateTime>,
#[serde(rename = "disabledAt")]
disabled_at: Option<DateTime>,
roles: HashMap<String, bool>,
password: String,
salt: String,
name: String,
info: Document,
const COL_NAME: &'static str = "user";
impl Model {
/// To create the model instance with a database connection.
pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
let model = Model { conn };
model.init().await?;
Ok(model)
#[async_trait]
impl UserModel for Model {
async fn init(&self) -> Result<(), Box<dyn StdError>> {
let indexes = vec![
doc! {"name": "userId_1", "key": {"userId": 1}, "unique": true},
doc! {"name": "account_1", "key": {"account": 1}, "unique": true},
doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
doc! {"name": "modifiedAt_1", "key": {"modifiedAt": 1}},
doc! {"name": "verifiedAt_1", "key": {"verifiedAt": 1}},
doc! {"name": "expiredAt_1", "key": {"expiredAt": 1}},
doc! {"name": "disabledAt_1", "key": {"disabledAt": 1}},
doc! {"name": "name_1", "key": {"name": 1}},
];
let command = doc! {
"createIndexes": COL_NAME,
"indexes": indexes,
self.conn.run_command(command).await?;
Ok(())
async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
let filter = get_list_query_filter(cond);
let count = self
.conn
.collection::<Schema>(COL_NAME)
.count_documents(filter)
.await?;
Ok(count)
async fn list(
&self,
opts: &ListOptions,
cursor: Option<Box<dyn Cursor>>,
) -> Result<(Vec<User>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
let mut cursor = match cursor {
None => {
let filter = get_list_query_filter(opts.cond);
Box::new(DbCursor::new(
build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
.await?,
))
Some(cursor) => cursor,
let mut count: u64 = 0;
let mut list = Vec::new();
while let Some(item) = cursor.try_next().await? {
list.push(item);
if let Some(cursor_max) = opts.cursor_max {
count += 1;
if count >= cursor_max {
return Ok((list, Some(cursor)));
Ok((list, None))
async fn get(&self, cond: &QueryCond) -> Result<Option<User>, Box<dyn StdError>> {
let filter = get_query_filter(cond);
let mut cursor = self
.find(filter)
if let Some(user) = cursor.try_next().await? {
return Ok(Some(User {
user_id: user.user_id,
account: user.account,
created_at: user.created_at.into(),
modified_at: user.modified_at.into(),
verified_at: match user.verified_at {
None => None,
Some(value) => Some(value.into()),
},
expired_at: match user.expired_at {
disabled_at: match user.disabled_at {
roles: user.roles,
password: user.password,
salt: user.salt,
name: user.name,
info: bson::from_document(user.info)?,
}));
Ok(None)
async fn add(&self, user: &User) -> Result<(), Box<dyn StdError>> {
let item = Schema {
user_id: user.user_id.clone(),
account: user.account.to_lowercase(),
roles: user.roles.clone(),
password: user.password.clone(),
salt: user.salt.clone(),
name: user.name.clone(),
info: bson::to_document(&user.info)?,
self.conn
.insert_one(item)
async fn del(&self, user_id: &str) -> Result<(), Box<dyn StdError>> {
let filter = doc! {"userId": user_id};
.delete_one(filter)
async fn update(&self, user_id: &str, updates: &Updates) -> Result<(), Box<dyn StdError>> {
if let Some(updates) = get_update_doc(updates) {
.update_one(filter, updates)
return Ok(());
impl DbCursor {
/// To create the cursor instance with a collection cursor.
pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
DbCursor { cursor, offset: 0 }
impl Cursor for DbCursor {
async fn try_next(&mut self) -> Result<Option<User>, Box<dyn StdError>> {
if let Some(item) = self.cursor.try_next().await? {
self.offset += 1;
user_id: item.user_id,
account: item.account,
created_at: item.created_at.into(),
modified_at: item.modified_at.into(),
verified_at: match item.verified_at {
expired_at: match item.expired_at {
disabled_at: match item.disabled_at {
roles: item.roles,
password: item.password,
salt: item.salt,
name: item.name,
info: bson::from_document(item.info)?,
fn offset(&self) -> u64 {
self.offset
/// Transforms query conditions to the MongoDB document.
fn get_query_filter(cond: &QueryCond) -> Document {
let mut filter = Document::new();
if let Some(value) = cond.user_id {
filter.insert("userId", value);
if let Some(value) = cond.account {
filter.insert("account", value.to_lowercase().as_str());
filter
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
if let Some(value) = cond.account_contains {
filter.insert(
"account",
Regex {
pattern: value.to_lowercase(),
options: "i".to_string(),
);
if let Some(value) = cond.verified_at {
if value {
filter.insert("verifiedAt", doc! {"$ne": Bson::Null});
} else {
filter.insert("verifiedAt", Bson::Null);
if let Some(value) = cond.disabled_at {
filter.insert("disabledAt", doc! {"$ne": Bson::Null});
filter.insert("disabledAt", Bson::Null);
if let Some(value) = cond.name_contains {
"name",
pattern: value.to_string(),
/// Transforms model options to the options.
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
where
T: Send + Sync,
{
if let Some(offset) = opts.offset {
find = find.skip(offset);
if let Some(limit) = opts.limit {
if limit > 0 {
find = find.limit(limit as i64);
if let Some(sort_list) = opts.sort.as_ref() {
if sort_list.len() > 0 {
let mut sort_opts = Document::new();
for cond in sort_list.iter() {
let key = match cond.key {
SortKey::Account => "account",
SortKey::CreatedAt => "createdAt",
SortKey::ModifiedAt => "modifiedAt",
SortKey::VerifiedAt => "verifiedAt",
SortKey::ExpiredAt => "expiredAt",
SortKey::DisabledAt => "disabledAt",
SortKey::Name => "name",
if cond.asc {
sort_opts.insert(key.to_string(), 1);
sort_opts.insert(key.to_string(), -1);
find = find.sort(sort_opts);
find
/// Transforms the model object to the MongoDB document.
fn get_update_doc(updates: &Updates) -> Option<Document> {
let mut count = 0;
let mut document = Document::new();
if let Some(value) = updates.modified_at.as_ref() {
document.insert(
"modifiedAt",
DateTime::from_millis(value.timestamp_millis()),
if let Some(value) = updates.verified_at.as_ref() {
"verifiedAt",
if let Some(value) = updates.expired_at.as_ref() {
match value {
document.insert("expiredAt", Bson::Null);
Some(value) => {
document.insert("expiredAt", DateTime::from_millis(value.timestamp_millis()));
if let Some(value) = updates.disabled_at.as_ref() {
document.insert("disabledAt", Bson::Null);
"disabledAt",
if let Some(value) = updates.roles {
let mut doc = Document::new();
for (k, v) in value {
doc.insert(k, v);
document.insert("roles", doc);
if let Some(value) = updates.password.as_ref() {
document.insert("password", value);
if let Some(value) = updates.salt.as_ref() {
document.insert("salt", value);
if let Some(value) = updates.name {
document.insert("name", value);
if let Some(value) = updates.info {
"info",
match bson::to_document(value) {
Err(_) => return None,
Ok(doc) => doc,
if count == 0 {
return None;
Some(doc! {"$set": document})