1 概述
本文总结了一些Django数据库查询的实践经验。
- 基本的增删改查
 
- 分类统计:
aggregate和annotate的使用 
- 实现按年/月/日统计
 
- Manager和QuerySet的混合使用
 
- 编写迁移文件
 
根据Django官方文档,本人整理出与数据库相关的话题列表,
- 基本的增删改查
 
- 外键访问 (Accessing related objects)
 
- 管理器和查询集 (Manger & QuerySet)
 
- 原生SQL (raw SQL)
 
- 事务 (Transactions)
 
- 统计、聚合和分组 (Aggregation)
 
- 搜索 (Search)
 
- 自定义字段 (Custom fields)
 
- 多数据库 (Multiple databases )
 
- 查询表达式和自定义查询表达式(Lookup expressions & Custom lookups)
 
- 条件表达式 (Conditional Expressions)
 
- 数据库函数 (Database Functions)
 
- 数据库优化 (Optimize database access)
 
- 数据库迁移 (Migrations)
 
其中一部分是在数据库有对应的内容,另外一部分则是Django框架自有的特性。涉及的代码主要包括以下三个包:
django.db.connections: 底层数据库连接对象操作 
django.db.migrations:  迁移相关 
django.db.models: 模型定义、数据库查询 
2 查询API
2.1 模型描述
以一个设备管理系统的简易系统为例,该项目包含了设备描述和报警记录。
- 设备以序列号唯一确定该设备,可默认为“主键”。
 
longitude  latitude和address表示设备的地理位置,创建后可认为不可更改。 
- 设备含有使用和报警两个状态标识变量。
 
Alarm.catalog表示警报类型,定义在choices上。 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
   | class Device(models.Model):     serial = models.CharField(verbose_name='序列号', max_length=100, unique=True)     name = models.CharField(verbose_name='名称', max_length=100, null=True, blank=True)     longitude = models.FloatField(verbose_name='经度', null=True,                                   validators=[validators.MaxValueValidator(180), validators.MinValueValidator(-180)])     latitude = models.FloatField(verbose_name='纬度', null=True,                                  validators=[validators.MaxValueValidator(90), validators.MinValueValidator(-90)])     address = models.CharField(verbose_name='地址', max_length=100, null=True)     is_active = models.BooleanField(verbose_name='使用标识', default=True)     is_alarm = models.BooleanField(verbose_name='报警标识', default=False)     latest_alarm_time = models.DateTimeField(verbose_name='最新报警时间', null=True, blank=True)     latest_alarm_remark = models.CharField(verbose_name='最新报警内容', max_length=200, null=True, blank=True)
      def __str__(self):         return self.serial
  class Alarm(models.Model):     ALARM_CATALOG_CHOICES = (           ('low_battery', '低电量'),           ('fail_connection', '通信故障'),           ('location_moved', '位置移动')       )     device = models.ForeignKey(Device, verbose_name='设备')     create_time = models.DateTimeField(verbose_name='创建时间', default=timezone.now)     catalog = models.CharField(max_length=30, null=True, choices=ALARM_CATALOG_CHOICES)     content = models.CharField(verbose_name='内容', max_length=30, null=True, blank=True)     read = models.BooleanField(verbose_name='已读', default=False)
   | 
 
2.2 查询一览表
2.2.1 检索、过滤、外键查询、分页
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
   |  try:     device = models.Device.objects.get(serial='0FFFFFFF561C4030') except models.Device.DoesNotExist:     device = None
  <Device:0FFFFFFF561C4030>
 
  >>> device_list = models.Device.objects.filter(address__icontains='小区') <QuerySet [<Device: 0FFFFFFF5BC91F87>, <Device: 0FFFFFFF561C4030>, ...]>
 
  models.Alarm.objects.filter(device__serial='0FFFFFFF5BC91F87')
 
  device_list = models.Device.objects.all()[20:30]
 
  | 
 
2.2.2 更新
1 2 3 4 5 6 7 8 9 10 11 12 13
   |  try:     device = models.Device.objects.get(serial='0FFFFFFF561C4030')     device.is_active = False     device.save() except models.Device.DoesNotExist:     pass
 
  models.Device.objects.filter(serial__in=['0FFFFFFF561C4030', '0FFFFFFF56174CA0']).update(is_active=False)
 
  models.Device.objects.filter(address__isnull=False).update(address=F('address').strip('福建省'))
 
  | 
 
2.2.3 删除
1 2 3 4 5 6 7 8 9 10 11 12
   |  try:     device = models.Device.objects.get(serial='0FFFFFFF561C4030')     device.delete() except models.Device.DoesNotExist:     pass
 
  models.Alarm.objects.filter(serial='0FFFFFFF561C4030').delete()
  models.Alarm.objects.all().delete()  models.Alarm.objects.delete() 
 
  | 
 
2.2.4 基础统计:数据、最值和平均值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
   | # 计算设备0FFFFFFF561C4030所有的报警数目。 models.Alarm.objects.filter(device__serial='0FFFFFFF561C4030').count() # 165
  # 计算每个设备的报警数目。 >>> device_list = models.Device.objects.annotate(num_alarms=Count('alarm')) >>> device_list <QuerySet [<Device: 0FFFFFFF561C4021>, <Device: 0FFFFFFF561C4030>, ...]> >>> device_list [0].num_alarms 34
  # 查询2016年报警次数最多的前5个设备
  models.Alarm.objects.filter(create_time__year=2016).values('serial').annotate(num_alarms=models.Count('serial')).order_by('-num_alarms')[:5]
  [     {'serial':'0FFFFFFF9FFC15F9', 'num_alarms':38},     {'serial':'0FFFFFFF71281152', 'num_alarms':32},     {'serial':'0FFFFFFF5992B723', 'num_alarms':27},     {'serial':'0FFFFFFF05E20356', 'num_alarms':21},     {'serial':'0FFFFFFF66DDF14D', 'num_alarms':12}, ]
   | 
 
2.2.5 分类统计
分类统计有以下两种方法。
aggregate + 条件表达式Case,返回一个字典形式的结果,未出现的分类值默认为None,需要使用Coalesce函数设置默认值 
annotate + 分组GROUP BY,返回一个列表形式的结果,未出现的分类值不会出现在最后的结果中 
下面是两种方式查询最近30天中每个报警类型的报警数目为例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
   | latest_week_qs = models.Alarm.objects.filter(create_time__gt=timezone.now()-timedelta(days=30).
 
  latest_week_qs.aggregate(     fail_connection=Coalesce(Sum(         Case(When(catalog='fail_connection', then=1), output_field=models.IntegerField()),     ), 0),     low_battery=Coalesce(Sum(         Case(When(catalog='low_battery', then=1), output_field=models.IntegerField()),     ), 0),     location_moved=Coalesce(Sum(         Case(When(catalog='location_moved', then=1), output_field=models.IntegerField()),     ), 0) )
  {‘fail_connection’:12, 'low_battery':34, 'location_moved': 0}
 
  latest_week_qs.values('catalog').annotate(count=Count('catalog'))
  [{'catalog':'low_battery', 'count':34},{'catalog':'fail_connection', 'count': 12}]
   | 
 
2.2.6 日期统计
实现按年、月、日统计通常有两种方法:
- 数据库函数 
django.db.connection.ops.date_trunc_sql 
- 对第一种的封装类DateExtra,仅Django 1.10+可用
 
以上两种结果中日期类型不一样,第一种返回时datetime对象,第二种只返回其中的分类字段,为整数类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
   |  models.Alarm.objects.filter(serial='0FFFFFFF561C4030', create_time__gt=timezone.now()-timedelta(days=7)).extra(     select={'dt': connection.ops.date_trunc_sql('day', 'create_time')} ).values('dt').annotate(count=models.Count('create_time')).order_by('dt')
  [     {'count':4, 'dt':datetime.datetime(2016, 11, 08, 0, 0, 0,0)},     {'count':2, 'dt':datetime.datetime(2016, 11, 11, 0, 0, 0,0)},     {'count':1, 'dt':datetime.datetime(2016, 11, 12, 0, 0, 0,0)} ]
  models.Alarm.objects.filter(serial='0FFFFFFF561C4030', create_time__gt=timezone.now()-timedelta(days=7))     annotate(day=ExtractDay('create_time'))
  {   'count':4, 'day': 8,   'count':2, 'day': 11,   'count':1, 'day': 12 }
 
  | 
 
2.3 数据库函数
django.db.models.Q: 与、或、非条件组合查询 
django.db.models.F: F()表示数据库中相应字段的值,用于计数器更新等。 
django.db.models.Functions.Coalesce:接收一组参数,返回第一个不为None的数据, 
更多函数可参考Database Functions。
3 管理器和查询集
3.1 管理器与模型的关系
管理器是Django数据库查询的接口。查询语法 models.XxModels.objects.filter(*kwargs) 。
- 一个模型可以拥有一个或多个管理器。
 
- 默认情况下,每个模型都有名为objects的管理器,默认返回数据表中所有记录。
 
- 管理器来源于默认管理器、外键管理器和自定义管理器。
 
3.2 自定义管理器
当一些查询逻辑复杂而且经常使用时,往往是在管理器上添加自定义函数封装相关查询逻辑,一方面减少重复代码,另一方面对view层透明,有利于MVC职责分工。
自定义管理器有三种方法
3.2.1 继承 models.Manager
这是默认出现的方式,以下 period_date函数封装了日期时间段查询函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
   | class AlarmManager(models.Manager):     def period_date(self, field, start_date=None, end_date=None, fmt='%Y-%m-%d'):         """封装日期开始结束时间段查询"""         def to_datetime(val):             if isinstance(val, (datetime, date)):                 return val             else:                 try:                     return datetime.strptime(val, fmt)                 except (TypeError, ValueError):                     pass
          kvs = {}         start_date = to_datetime(start_date)         end_date = to_datetime(end_date)         if start_date:             kvs[field + '__gte'] = start_date         if end_date:             kvs[field + '__lte'] = end_date + timedelta(days=1)           return self.filter(**kvs)         def has_location(self):         return self.filter(longitude__isnull=False, latitude__isnull=False)     def unread(self):         return self.filter(read=False)
 
 
  alarm_list = models.Alarm.objects.unread()
  alarm_list = models.Alarm.objects.period_date(field='create_time', start_time='2016-09-12', end_time='2016-09-21')
  alarm_list = models.Alarm.objects.period_date(field='create_time', end_time='2016-09-26')
  alarm_list  = models.Device.objects.period_date(field='create_time', end_time='2016-09-26').unread() AttributeError: '_QuerySet' object has no attribute 'unread'
   | 
 
在最后一个查询中出现异常,因为这两个方法定义在models.Manager上,返回的却是models.QuerySet实例。这时非常希望自定义的方法能够级联调用,下面的几种方法可以解决这个问题。
3.2.2 使用QuerySet的方法
使用查询集上的 as_manager()函数创建新的管理器
将自定义的方法定义从models.Manager移到models.QuerySet
1 2 3 4 5 6 7 8 9 10 11 12
   | class AlarmQuerySet(models.QuerySet):     def period_date(self, field, start_date=None, end_date=None, fmt='%Y-%m-%d'):                  pass     def has_location(self):         return self.filter(longitude__isnull=False, latitude__isnull=False)     def unread(self):         return self.filter(read=False)
  class Alarm(models.Model):     ...    objects = AlarmQuerySet.as_manager()
   | 
 
这时代码alarm_list  = models.Device.objects.period_date(field='create_time', end_time='2016-09-26').unread()就能够返回正确的结果。
3.2.3 继承Manager和QuerySet
使用管理器上的from_queryset(queryset_class)函数创建新的管理器
在使用django认证用户上一方面需要继承 django.contrib.auth.models.BaseUserManager,另一方面又希望能够自定义函数,这时可以使用这种方式。
1 2 3 4 5 6 7 8 9 10 11 12
   | class UserQuerySet(models.QuerySet):     def no_login_in_days(self, days):         start_time = timezone.now() - timedelta(days=days)         return self.filter(last_login_time__ge=start_time)     def no_activity_in_days(self, days):         start_time = timezone.now() - timedelta(days=days)         return self.filter(last_activity_time__ge=start_time)
  MyUserManager = BaseUserManager.from_queryset(UserQuerySet)
  class MyUser(AbstractBaseUser):     objects = MyUserManager()
   | 
 
 当直接调用django.db.models.Manager.from_queryset方法,其等效于第二种方法,即以下两行等效。
1 2 3 4
   | # 使用as_manager函数 objects = AlarmQuerySet.as_manager() # 使用from_queryset函数 objects = models.Manager.from_queryset(AlarmQuerySet)
   | 
 
3.3 managers模块实践
随着业务逻辑越来越复杂,需要编写更多的自定义管理器,通常的做法是单独创建一个名称为managers的模块,封装所有数据操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
   | from django.db import models from django.contrib.auth.models import BaseUserManager
  __all__ = ['AxxManager', 'BxxManager', 'UserManager'] 
 
  class BaseQuerySet(models.QuerySet):     def common_method_for_all_models(self):         pass
  class AxxQuerySet(BaseQuerySet):     pass
  AxxManager = models.Manager().from_queryset(AxxQuerySet)
  class BxxQuerySet(BaseQuerySet):     pass
  BxxManager = models.Manager().from_queryset(BxxQuerySet)
  class UserQuerySet(BaseQuerySet):     pass
  class UserManager(BaseUserManager.from_queryset(UserQuerySet)):     def create_user(self, username, password=None, **kwargs):         pass
   | 
 
为避免模块循环导入的问题
- 需要使用
django.apps.apps.get_model函数获取模型类对象,不能直接使用 from models import Xxxx 
managers模块一般只能被models模块引用,其他模块应当不能引用 
3.4 其他
覆盖get函数异常
在获取对象函数会抛出ObjectDoesNotExist异常,在这种情况下我们需要使用try-catch捕捉异常,会出现大量重复的代码。这时我们可以用下的代码实现封装。
1 2 3 4 5 6 7 8 9
   | class BaseManager(models.Manager):     def get_object(self, **kwargs):         try:             return self.get(**kwargs)         except models.ObjectDoesNotExist:             return None
 
  device = models.objects.get_object(serial='0FFFFFFF66DDF13F')
   | 
 
访问request变量
按照MVC分离的实践,不应该直接访问request,只能通过参数传递方式。
4 迁移
4.1 开发流程
迁移是将模型代码的变化应用到数据库,可以认为是一个数据库模式的版本管理系统。
在Django1.7之前的版本第三方库South提供了类似的功能。
迁移通常可以按照下列步骤循环进行。
- 1 编写 
models 模块代码 
- 2 模型迁移:执行 
python manage.py makemigrations,在APP.migrations包生成迁移模块文件。 
- 3 数据迁移:如果需要数据迁移,按照一定的格式编写迁移文件。
 
- 3 应用迁移:执行 
python manage.py migrate,将2、3步迁移文件所实现的数据库变化应用到数据库。 
Django Migration分为模式迁移(Schema migration)和数据迁移(Data Migration)。
- 模式迁移:包括表结构修改,对应于 SQL的 
CREATE TABLE ALTER TABLE和 DROP TABLE,可以由Django自动生成。 
- 数据迁移:包括数据记录修改,对应于SQL的 
INSERT TO DELETE和 UPDATE等语句,需要开发者自己编写。 
无论是模式迁移还是数据迁移,迁移模块都具有几个特点:
- 每个迁移文件是一个Python模块,位于应用目录migrations包下,代表了一次迁移
 
- 每个迁移包含一个名为Migration的迁移类,该类继承自 
django.db.migrations.Migration 
dependencies 属性表示需要依赖的迁移模块名称 
operations 属性表示一系列依次进行的迁移操作,这些都定义在 django.db.migrations.operations模块中。 
1 2 3 4 5
   | from django.db import migrations
  class Migration(migrations.Migration):     dependencies = []     operations = []
   | 
 
4.2 数据迁移
下面的例子实现了将设备的longitude latitude和address三个字段复制到报警记录表中,以便查询位置时无需外接操作。
所有的操作被操作 migrations.RunPython 类中,要注意的是需要 django.apps.get_model 函数引用模型类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
   | from __future__ import unicode_literals
  from django.db import migrations, models
  def create_address_for_alarm(apps, scheme_editor):     AlarmClass = apps.get_model('hdc', 'Alarm')     for alarm in AlarmClass.objects.all():         alarm.longitude = alarm.device.longitude         alarm.latitude = alarm.device.latitude         alarm.address = alarm.device.address         alarm.save()
  class Migration(migrations.Migration):     dependencies = [         ('hdc', '0002_alarm_address'),     ]
      operations = [          migrations.RunPython(create_address_for_alarm),     ]
   | 
 
5 参考资料