db_helper.py是数据库操作包,主要有两个函数,分别是read()数据库读操作函数和write()数据库写操作函数。这个包的代码是从改造过来的。
1 #!/usr/bin/env python 2 # coding=utf-8 3 4 import psycopg2 5 from common import log_helper 6 from config import const 7 8 # 初始化数据库参数 9 db_name = const.DB_NAME10 db_host = const.DB_HOST11 db_port = const.DB_PORT12 db_user = const.DB_USER13 db_pass = const.DB_PASS14 15 16 def read(sql):17 """18 连接pg数据库并进行数据查询19 如果连接失败,会把错误写入日志中,并返回false,如果sql执行失败,也会把错误写入日志中,并返回false20 如果所有执行正常,则返回查询到的数据,这个数据是经过转换的,转成字典格式,方便模板调用,其中字典的key是数据表里的字段名21 """22 try:23 # 连接数据库24 conn = psycopg2.connect(database=db_name, user=db_user, password=db_pass, host=db_host, port=db_port)25 # 获取游标26 cursor = conn.cursor()27 except Exception as e:28 print(e.args)29 log_helper.error('连接数据库失败:' + str(e.args))30 return False31 try:32 # 执行查询操作33 cursor.execute(sql)34 # 将返回的结果转换成字典格式35 data = [dict((cursor.description[i][0], value) for i, value in enumerate(row)) for row in cursor.fetchall()]36 except Exception as e:37 print(e.args)38 log_helper.error('sql执行失败:' + str(e.args) + ' sql:' + str(sql))39 return False40 finally:41 # 关闭游标和数据库链接42 cursor.close()43 conn.close()44 # 返回结果(字典格式)45 return data46 47 48 def write(sql, vars):49 """50 连接pg数据库并进行写的操作51 如果连接失败,会把错误写入日志中,并返回false,如果sql执行失败,也会把错误写入日志中,并返回false,如果所有执行正常,则返回true52 """53 try:54 # 连接数据库55 conn = psycopg2.connect(database=db_name, user=db_user, password=db_pass, host=db_host, port=db_port)56 # 获取游标57 cursor = conn.cursor()58 except Exception as e:59 print(e.args)60 log_helper.error('连接数据库失败:' + str(e.args))61 return False62 try:63 # 执行sql语句64 cursor.execute(sql, vars)65 # 提交事务66 conn.commit()67 except Exception as e:68 print(e.args)69 # 如果出错,则事务回滚70 conn.rollback()71 log_helper.error('sql执行失败:' + str(e.args) + ' sql:' + str(sql))72 return False73 else:74 # 获取数据75 try:76 data = [dict((cursor.description[i][0], value) for i, value in enumerate(row))77 for row in cursor.fetchall()]78 except Exception as e:79 # 没有设置returning或执行修改或删除语句时,记录不存在80 data = None81 finally:82 # 关闭游标和数据库链接83 cursor.close()84 conn.close()85 86 # 如果写入数据后,将数据库返回的数据返回给调用者87 return data
read(sql)是用来执行数据库查询操作,里面没有事务提交,所以用它来执行增删改操作时,虽然能提交成功,但执行后数据库记录也不会有什么变化,所以只能用它来执行select语句
write(sql, data)是用来执行数据库写操作的,write函数执行后会返回下面几种状态:
1.False状态(数据库链接失败、sql语句不正确、链接数据库操时等执行数据库出现异常时返回这个状态)
2.None状态(sql语句没有添加RETURNING id代码指定sql语句执行结束后返回指定字段值时出现;你如果修改代码第80行,data = None为data = True,执行成功时则会返回True状态)
3.[] (sql语句添加了returning函数,且执行修改或删除时,记录不存在)
4.{'id': 1,}(sql语句添加了returning函数,执行成功后返回我们指定的字段值)
PS:我们在执行新增的时候,如果想要获取新增的id,postgresql有一个非常好用的函数returning,只需要在增删改语句的后面添加returning id或returning id,name或returning *等你想要返回的字段名称,语句执行成功以后都会返回这些指定的字段值,大家可以尝试修改测试用例代码,看看返回的值是什么。
#!/usr/bin/evn python# coding=utf-8import unittestfrom common import db_helperclass DbHelperTest(unittest.TestCase): """数据库操作包测试类""" def setUp(self): """初始化测试环境""" print('------ini------') def tearDown(self): """清理测试环境""" print('------clear------') def test(self): # 新增记录,不带return参数 sql = """ INSERT INTO product_class( name, is_enable) VALUES (%s, %s) """ data = ('糖果', 1) result = db_helper.write(sql, data) print(result) # 新增记录,使用return参数返回新增id sql = """ INSERT INTO product_class( name, is_enable) VALUES (%s, %s) RETURNING id; """ data = ('饼干', 1) result = db_helper.write(sql, data) print(result) # 修改不存在的记录 sql = """ UPDATE product_class SET name=%s, is_enable=%s WHERE id=10000 RETURNING id; """ data = ('糖果', 1) result = db_helper.write(sql, data) print(result) # 查询记录 sql = """ SELECT * FROM product_class """ result = db_helper.read(sql) print(result)if __name__ == '__main__': unittest.main()
执行结果
------ini------None[{ 'id': 2}][][{ 'id': 1, 'name': '糖果', 'add_time': datetime.datetime(2017, 10, 16, 14, 51, 49), 'is_enable': 1}, { 'id': 2, 'name': '饼干', 'add_time': datetime.datetime(2017, 10, 16, 15, 30, 50), 'is_enable': 1}]------clear------
encrypt_helper.py是加密操作包,目前只有md5加密函数,其他加密函数以后有需要再添加进来
#!/usr/bin/evn python# coding=utf-8import hashlibdef md5(text): """md5加密函数""" md5 = hashlib.md5() if not isinstance(text, bytes): text = str(text).encode('utf-8') md5.update(text) return md5.hexdigest()
md5()参数类型支持各种类型,如果参数为非bytes类型时会自动转换为str类型来进行操作,例如以下测试用例,大家也可以尝试用元组、字典或列表类型测试看看结果
#!/usr/bin/evn python# coding=utf-8import unittestfrom common import encrypt_helperclass DbHelperTest(unittest.TestCase): """数据库操作包测试类""" def setUp(self): """初始化测试环境""" print('------ini------') def tearDown(self): """清理测试环境""" print('------clear------') def test(self): result = encrypt_helper.md5(1) print(result) self.assertEqual(result, 'c4ca4238a0b923820dcc509a6f75849b') result = encrypt_helper.md5('1') print(result) self.assertEqual(result, 'c4ca4238a0b923820dcc509a6f75849b') result = encrypt_helper.md5(b'1') print(result) self.assertEqual(result, 'c4ca4238a0b923820dcc509a6f75849b')if __name__ == '__main__': unittest.main()
执行结果
------ini------c4ca4238a0b923820dcc509a6f75849bc4ca4238a0b923820dcc509a6f75849bc4ca4238a0b923820dcc509a6f75849b------clear------
except_helper.py包主要功能是获取代码当前位置的堆栈信息,它被log_helper.py包的error()错误日志记录函数调用,输出发生错误时的堆栈信息内容,方便开发人员分析代码异常。
#!/usr/bin/evn python# coding=utf-8import osimport sysdef detailtrace(): """获取程序当前运行的堆栈信息""" retStr = "" f = sys._getframe() f = f.f_back # first frame is detailtrace, ignore it while hasattr(f, "f_code"): co = f.f_code retStr = "%s(%s:%s)->"%(os.path.basename(co.co_filename), co.co_name, f.f_lineno) + retStr f = f.f_back return retStr
json_helper.py包里只有一个日期格式化类。
#!/usr/bin/evn python# coding=utf-8import jsonimport datetimeclass CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime('%Y-%m-%d') else: return json.JSONEncoder.default(self, obj)
python的json将时间类型转换为字符串时,它会处理不了出现异常,需要使用这个自定义类进行格式化处理
比如我们如果直接这样对时间类型进行转换时,就会出现异常:
def test(self): js = { 'test5': datetime.datetime.now(), } print(js) result = json.dumps(js) print(result)
执行结果:
------ini------{ 'test5': datetime.datetime(2017, 10, 16, 16, 56, 58, 654832)}------clear------ErrorTraceback (most recent call last): File "E:\Python\simple\code\test\json_helper_test.py", line 26, in test result = json.dumps(js) File "C:\Users\Empty\AppData\Local\Programs\Python\Python35-32\lib\json\__init__.py", line 230, in dumps return _default_encoder.encode(obj) File "C:\Users\Empty\AppData\Local\Programs\Python\Python35-32\lib\json\encoder.py", line 198, in encode chunks = self.iterencode(o, _one_shot=True) File "C:\Users\Empty\AppData\Local\Programs\Python\Python35-32\lib\json\encoder.py", line 256, in iterencode return _iterencode(o, 0) File "C:\Users\Empty\AppData\Local\Programs\Python\Python35-32\lib\json\encoder.py", line 179, in default raise TypeError(repr(o) + " is not JSON serializable")TypeError: datetime.datetime(2017, 10, 16, 16, 56, 58, 654832) is not JSON serializable
改成下面代码的话就正常了
def test(self): js = { 'test5': datetime.datetime.now(), } print(js) result = json.dumps(js, cls=json_helper.CJsonEncoder) print(result)
执行结果
------ini------{ 'test5': datetime.datetime(2017, 10, 16, 16, 59, 40, 756103)}{ "test5": "2017-10-16 16:59:40"}------clear------
版权声明:本文原创发表于 ,作者为 本文欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则视为侵权。
python开发QQ群:669058475(本群已满)、733466321(可以加2群) 作者博客:http://www.cnblogs.com/EmptyFS/