1、web application常见的功都是接口化的,最常见的即为RESTapi,REST就是一种设计API的模式。之前我们的测试页面,直接把获取的数据填入html中,是一个简单的api接口,但是返回值是个user对象。最常用的数据格式是JSON。由于JSON能直接被JavaScript读取,所以,以JSON格式编写的REST风格的API具有简单、易读、易用的特点。通过API操作数据,可以把前端和后端的代码隔离,方便测试维护。
api测试用例:
www/handler.py增加api_get_users接口,返回一个dict,后续中间件response处理返回json数据
#!/usr/bin/env python # -*- coding:utf-8 -*- # author:LeeYY # datetime:2019/3/19 23:55 # software: PyCharm import re, time, json, logging, hashlib, base64, asyncio from coroweb import get, post from apis import Page from models import User, Comment, Blog, next_id # page_index>=1的证书 def get_page_index(page_str): p = 1 try: p = int(page_str) except ValueError as e: pass if p < 1: p = 1 return p @get('/') async def index(request): users = await User.findAll() return { '__template__': 'test.html', 'users': users } @get('/api/users') async def api_get_users(*, page='1'): page_index = get_page_index(page) num = await User.findNumber('id') p = Page(num, page_index) if num == 0: return dict(page=p, users=()) users = await User.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) for u in users: u.passwd = '******' return dict(page=p, users=users)www/apis.py 增加Page类
#!/usr/bin/env python # -*- coding:utf-8 -*- # software: PyCharm # 页面类 class Page(object): def __init__(self, item_count, page_index=1, page_size=8): self.item_count = item_count self.page_size = page_size self.page_count = item_count // page_size + (1 if item_count % page_size > 0 else 0) if (item_count == 0) or (page_index > self.page_count): self.offset = 0 self.limit = 0 self.page_index = 1 else: self.page_index = page_index self.offset = self.page_size * (page_index - 1) self.limit = self.page_size self.has_next = self.page_index < self.page_count self.has_previous = self.page_index > 1 def __str__(self): return 'item_count: %s, page_count: %s, page_index: %s, page_size: %s, offset: %s, limit: %s' % (self.item_count, self.page_count, self.page_index, self.page_size, self.offset, self.limit) __repr__ = __str__ class APIError(Exception): def __init__(self, error, data='', message=''): super(APIError, self).__init__(message) self.error = error self.data = data self.message = message class APIValueError(APIError): def __init__(self, filed, message=''): super(APIValueError, self).__init__('value:invalid', filed, message) class APIResourceNotFoundError(APIError): def __init__(self, filed, message=''): super(APIResourceNotFoundError, self).__init__('value:notfound', filed, message) class APIPressionError(APIError): def __init__(self, message=''): super(APIPressionError, self).__init__('permission:forbidden', 'permission', message)启动app,浏览器访问或者get http://127.0.0.1:9000/api/users 接口:
2、下面是这个web application 需要添加的接口,需要在handler.py里面添加相应的处理函数
以下是所有网站所需要的后端API, 前端页面URL的列表:
后端API包括:
获取日志:GET /api/blogs创建日志:POST /api/blogs修改日志:POST /api/blogs/:blog_id删除日志:POST /api/blogs/:blog_id/delete获取评论:GET /api/comments创建评论:POST /api/blogs/:blog_id/comments删除评论:POST /api/comments/:comment_id/delete创建新用户:POST /api/users获取用户:GET /api/users管理页面包括:
评论列表页:GET /manage/comments日志列表页:GET /manage/blogs创建日志页:GET /manage/blogs/create修改日志页:GET /manage/blogs/用户列表页:GET /manage/users用户浏览页面包括:
注册页:GET /register登录页:GET /signin注销页:GET /signout首页:GET /日志详情页:GET /blog/:blog_id添加代码如下:
#!/usr/bin/env python # -*- coding:utf-8 -*- # software: PyCharm import re, time, json, logging, hashlib, base64, asyncio import markdown2 from aiohttp import web from coroweb import get, post from apis import Page, APIError, APIValueError, APIResourceNotFoundError, APIPermissionError from models import User, Comment, Blog, next_id from config import configs COOKIE_NAME = 'awesession' _COOKIE_KEY = configs.session.secret # 定义EMAIL和HASH的格式规范 _RE_EMAIL = re.compile(r'^[a-z0-9\.\-\_]+\@[a-z0-9\-\_]+(\.[a-z0-9\-\_]+){1,4}$') _RE_SHA1 = re.compile(r'^[0-9a-f]{40}$') # 查看是否是管理员用户 def check_admin(request): if request.__user__ is None or not request.__user__.admin: raise APIPermissionError() # page_index>=1的证书 def get_page_index(page_str): p = 1 try: p = int(page_str) except ValueError as e: pass if p < 1: p = 1 return p # 计算加密cookie def user2cookie(user, max_age): # build cookie string by: id-expires-sha1 expires = str(int(time.time() + max_age)) s = '%s-%s-%s-%s' % (user.id, user.passwd, expires, _COOKIE_KEY) L = [user.id, expires, hashlib.sha1(s.encode('utf-8')).hexdigest()] return '-'.join(L) # 解密cookie async def cookie2user(cookie_str): if not cookie_str: return None try: L = cookie_str.split('-') if len(L) != 3: return None uid, expires, sha1 = L if int(expires) < time.time(): return None user = await User.find(uid) if user is None: return None s = '%s-%s-%s-%s' % (uid, user.passwd, expires, _COOKIE_KEY) if sha1 != hashlib.sha1(s.encode('utf-8')).hexdigest(): logging.info('invalid sha1') return None user.passwd = '******' return user except Exception as e: logging.exception(e) return None # 文本转HTML def text2html(text): lines = map(lambda s: '<p>%s</p>' % s.replace('&', '&').replace( '<', '<').replace('>', '>'), filter(lambda s: s.strip() != '', text.split('\n'))) return ''.join(lines) # 处理首页index @get('/') async def index(*, page='1'): page_index = get_page_index(page) num = await Blog.findNumber('id') p = Page(num, page_index) if num == 0: blogs = [] else: blogs = await Blog.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) return { '__template__': 'blogs.html', 'page': p, 'blogs': blogs } # 处理日志详情页面 @get('/blog/{id}') async def get_blog(id): blog = await Blog.find(id) comments = await Comment.findAll('blog_id=?', [id], orderBy='created_at desc') for c in comments: c.html_content = markdown2.markdown(c.content) blog.html_content = markdown2.markdown(blog.content) return { '__template__': 'blog.html', 'blog': blog, 'comments': comments } # 注册页面 @get('/register') def register(): return { '__template__': 'register.html' } # 登录页面 @get('/signin') def signin(): return { '__template__': 'signin.html' } # 用户登录验证API @post('/api/authenticate') async def authenticate(*, email, passwd): if not email: raise APIValueError('email', 'Invalid email.') if not passwd: raise APIValueError('passwd', 'Invalid password.') users = await User.findAll('email=?', [email]) if len(users) == 0: raise APIValueError('email', 'Email not exist.') user = users[0] # check passwd sha1 = hashlib.sha1() sha1.update(user.id.encode('utf-8')) sha1.update(b':') sha1.update(passwd.encode('utf-8')) if user.passwd != sha1.hexdigest(): raise APIValueError('passwd', 'Invalid password.') # authenticate ok, set cookie: r = web.Response() r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True) user.passwd = '******' r.content_type = 'application/json' r.body = json.dumps(user, ensure_ascii=False).encode('utf-8') return r # 用户登出 @get('/signout') def signout(request): referer = request.headers.get('Referer') r = web.HTTPFound(referer or '/') r.set_cookie(COOKIE_NAME, '-deleted-', max_age=0, httponly=True) logging.info('user signed out.') return r # 获取管理页面 @get('/manage/') def manage(): return 'redirect:/manage/comments' # 评论管理页面 @get('/manage/comments') def manage_comments(*, page='1'): return { '__template__': 'manage_comments.html', 'page_index': get_page_index(page) } # 日志管理页面 @get('/manage/blogs') def manage_blogs(*, page='1'): return { '__template__': 'manage_blogs.html', 'page_index': get_page_index(page) } # 创建日志页面 @get('/manage/blogs/create') def manage_create_blog(): return { '__template__': 'manage_blog_edit.html', 'id': '', 'action': '/api/blogs' } # 编辑日志页面 @get('/manage/blogs/edit') def manage_edit_blog(*, id): return { '__template__': 'manage_blog_edit.html', 'id': id, 'action': '/api/blogs/%s' % id } # 用户管理页面 @get('/manage/users') def manage_users(*, page='1'): return { '__template__': 'manage_users.html', 'page_index': get_page_index(page) } # 获取评论信息API @get('/api/comments') async def api_comments(*, page='1'): page_index = get_page_index(page) num = await Comment.findNumber('id') p = Page(num, page_index) if num == 0: return dict(page=p, comments=()) comments = await Comment.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) return dict(page=p, comments=comments) # 用户发表评论API @post('/api/blogs/{id}/comments') async def api_create_comment(id, request, *, content): user = request.__user__ if user is None: raise APIPermissionError('Please signin first.') if not content or not content.strip(): raise APIValueError('content') blog = await Blog.find(id) if blog is None: raise APIResourceNotFoundError('Blog') comment = Comment(blog_id=blog.id, user_id=user.id, user_name=user.name, user_image=user.image, content=content.strip()) await comment.save() return comment # 管理员删除评论API @post('/api/comments/{id}/delete') async def api_delete_comments(id, request): check_admin(request) c = await Comment.find(id) if c is None: raise APIResourceNotFoundError('Comment') await c.remove() return dict(id=id) # 获取用户信息API @get('/api/users') async def api_get_users(*, page='1'): page_index = get_page_index(page) num = await User.findNumber('id') p = Page(num, page_index) if num == 0: return dict(page=p, users=()) users = await User.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) for u in users: u.passwd = '******' return dict(page=p, users=users) # 用户注册API @post('/api/users') async def api_register_user(*, email, name, passwd): if not name or not name.strip(): raise APIValueError('name') if not email or not _RE_EMAIL.match(email): raise APIValueError('email') if not passwd or not _RE_SHA1.match(passwd): raise APIValueError('passwd') users = await User.findAll('email=?', [email]) if len(users) > 0: raise APIError('register:failed', 'email', 'Email is already in use.') uid = next_id() sha1_passwd = '%s:%s' % (uid, passwd) user = User(id=uid, name=name.strip(), email=email, passwd=hashlib.sha1(sha1_passwd.encode('utf-8')).hexdigest(), image='http://www.gravatar.com/avatar/%s?d=mm&s=120' % hashlib.md5(email.encode('utf-8')).hexdigest()) await user.save() # make session cookie: r = web.Response() r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True) user.passwd = '******' r.content_type = 'application/json' r.body = json.dumps(user, ensure_ascii=False).encode('utf-8') return r # 获取日志列表API @get('/api/blogs') async def api_blogs(*, page='1'): page_index = get_page_index(page) num = await Blog.findNumber('id') p = Page(num, page_index) if num == 0: return dict(page=p, blogs=()) blogs = await Blog.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) return dict(page=p, blogs=blogs) # 获取日志详情API @get('/api/blogs/{id}') async def api_get_blog(*, id): blog = await Blog.find(id) return blog # 发表日志API @post('/api/blogs') async def api_create_blog(request, *, name, summary, content): check_admin(request) if not name or not name.strip(): raise APIValueError('name', 'name cannot be empty.') if not summary or not summary.strip(): raise APIValueError('summary', 'summary cannot be empty.') if not content or not content.strip(): raise APIValueError('content', 'content cannot be empty.') blog = Blog(user_id=request.__user__.id, user_name=request.__user__.name, user_image=request.__user__.image, name=name.strip(), summary=summary.strip(), content=content.strip()) await blog.save() return blog # 编辑日志API @post('/api/blogs/{id}') async def api_update_blog(id, request, *, name, summary, content): check_admin(request) blog = await Blog.find(id) if not name or not name.strip(): raise APIValueError('name', 'name cannot be empty.') if not summary or not summary.strip(): raise APIValueError('summary', 'summary cannot be empty.') if not content or not content.strip(): raise APIValueError('content', 'content cannot be empty.') blog.name = name.strip() blog.summary = summary.strip() blog.content = content.strip() await blog.update() return blog # 删除日志API @post('/api/blogs/{id}/delete') async def api_delete_blog(request, *, id): check_admin(request) blog = await Blog.find(id) await blog.remove() return dict(id=id) # 删除用户API @post('/api/users/{id}/delete') async def api_delete_users(id, request): check_admin(request) id_buff = id user = await User.find(id) if user is None: raise APIResourceNotFoundError('Comment') await user.remove() # 给被删除的用户在评论中标记 comments = await Comment.findAll('user_id=?', [id]) if comments: for comment in comments: id = comment.id c = await Comment.find(id) c.user_name = c.user_name + ' (该用户已被删除)' await c.update() id = id_buff return dict(id=id)至此,所需的api接口编写完成。