import pymongo
db_info = {
'dba': {
'user': 'dba',
'pwd': 'dba',
'ip': '192.168.1.100',
'port': '24000',
'dbname': 'dba'
},
'dbb': {
'user': 'dbb',
'pwd': 'dbb',
'ip': '192.168.1.101',
'port': '24000',
'dbname': 'dbb'
}
}
def mongodb_cursor(db, tabname):
"""
连接mongo
:param db: 数据库名
:return:
"""
d = db_info.get(db)
if not d:
return False
user = d.get('user')
pwd = d.get('pwd')
ip = d.get('ip')
port = d.get('port')
dbname = d.get('dbname')
conn = pymongo.MongoClient(f"mongodb://{user}:{pwd}@{ip}:{port}/{dbname}")
return conn[dbname][tabname]
def m_findall(dbname, tabname):
"""
返回某库某表的全部记录
:param dbname:
:param tabname:
:return:
"""
tab = mongodb_cursor(dbname, tabname)
return list(tab.find())
def m_insert(dbname, tabname, data):
"""
插入一条或多条记录
:param dbname:
:param tabname:
:param data:
:return:
"""
tab = mongodb_cursor(dbname, tabname)
try:
if isinstance(data, dict):
tab.insert_one(data)
elif isinstance(data, list):
tab.insert_many(data)
else:
print('data格式错误')
return '0'
return '1'
except Exception:
return '0'
def m_findone(dbname, tabname, where: dict=None):
"""
(按条件)查询一条数据
:param dbname:
:param tabname:
:param where:
:return:
"""
tab = mongodb_cursor(dbname, tabname)
if where:
row = tab.find_one(where)
else:
row = tab.find_one()
return row
def m_search(dbname, tabname, field, where: dict=None):
"""
按条件查询数据
:param dbname:
:param tabname:
:param field:
:param where: { "name": { "$gt": "H" } } 或者 { "name": "xxx" }
:return:
"""
return m_findone(dbname, tabname, where).get(field)
def m_update(dbname, tabname, newvalues: dict, where: dict):
"""
按条件修改多条记录
:param dbname:
:param tabname:
:param newvalues: { "$set": { "alexa": "123" } }
:param where:
:return: 返回被修改的记录数(>=0),错误返回error
"""
tab = mongodb_cursor(dbname, tabname)
try:
result = tab.update_one(where, newvalues)
return result.modified_count
except Exception:
return 'error'
if __name__ == '__main__':
pass
转载请注明原文地址:https://ipadbbs.8miu.com/read-24733.html