python 操作 mongodb

    技术2022-07-13  70

    import pymongo db_info = { # 测试环境 # 1 'dba': { 'user': 'dba', 'pwd': 'dba', 'ip': '192.168.1.100', 'port': '24000', 'dbname': 'dba' }, # 2 'dbb': { 'user': 'dbb', 'pwd': 'dbb', 'ip': '192.168.1.101', 'port': '24000', 'dbname': 'dbb' } } def mongodb_cursor(db, tabname): """ 连接mongo :param db: 数据库名 :return: """ d = db_info.get(db) if not d: return False user = d.get('user') pwd = d.get('pwd') ip = d.get('ip') port = d.get('port') dbname = d.get('dbname') # mongo连接登录 conn = pymongo.MongoClient(f"mongodb://{user}:{pwd}@{ip}:{port}/{dbname}") # 选择数据库游标 return conn[dbname][tabname] def m_findall(dbname, tabname): """ 返回某库某表的全部记录 :param dbname: :param tabname: :return: """ tab = mongodb_cursor(dbname, tabname) return list(tab.find()) def m_insert(dbname, tabname, data): """ 插入一条或多条记录 :param dbname: :param tabname: :param data: :return: """ tab = mongodb_cursor(dbname, tabname) try: if isinstance(data, dict): tab.insert_one(data) elif isinstance(data, list): tab.insert_many(data) else: print('data格式错误') return '0' return '1' except Exception: return '0' def m_findone(dbname, tabname, where: dict=None): """ (按条件)查询一条数据 :param dbname: :param tabname: :param where: :return: """ tab = mongodb_cursor(dbname, tabname) if where: row = tab.find_one(where) else: row = tab.find_one() return row def m_search(dbname, tabname, field, where: dict=None): """ 按条件查询数据 :param dbname: :param tabname: :param field: :param where: { "name": { "$gt": "H" } } 或者 { "name": "xxx" } :return: """ return m_findone(dbname, tabname, where).get(field) def m_update(dbname, tabname, newvalues: dict, where: dict): """ 按条件修改多条记录 :param dbname: :param tabname: :param newvalues: { "$set": { "alexa": "123" } } :param where: :return: 返回被修改的记录数(>=0),错误返回error """ tab = mongodb_cursor(dbname, tabname) try: result = tab.update_one(where, newvalues) return result.modified_count except Exception: return 'error' if __name__ == '__main__': pass
    Processed: 0.039, SQL: 9