start-pack
This commit is contained in:
commit
3e1fa59b3d
5723 changed files with 757971 additions and 0 deletions
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -0,0 +1,668 @@
|
|||
"""
|
||||
Oracle database backend for Django.
|
||||
|
||||
Requires oracledb: https://oracle.github.io/python-oracledb/
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import decimal
|
||||
import os
|
||||
import platform
|
||||
from contextlib import contextmanager
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.db import IntegrityError
|
||||
from django.db.backends.base.base import BaseDatabaseWrapper
|
||||
from django.db.backends.oracle.oracledb_any import is_oracledb
|
||||
from django.db.backends.utils import debug_transaction
|
||||
from django.utils.asyncio import async_unsafe
|
||||
from django.utils.encoding import force_bytes, force_str
|
||||
from django.utils.functional import cached_property
|
||||
from django.utils.version import get_version_tuple
|
||||
|
||||
try:
|
||||
from django.db.backends.oracle.oracledb_any import oracledb as Database
|
||||
except ImportError as e:
|
||||
raise ImproperlyConfigured(f"Error loading oracledb module: {e}")
|
||||
|
||||
|
||||
def _setup_environment(environ):
|
||||
# Cygwin requires some special voodoo to set the environment variables
|
||||
# properly so that Oracle will see them.
|
||||
if platform.system().upper().startswith("CYGWIN"):
|
||||
try:
|
||||
import ctypes
|
||||
except ImportError as e:
|
||||
raise ImproperlyConfigured(
|
||||
"Error loading ctypes: %s; "
|
||||
"the Oracle backend requires ctypes to "
|
||||
"operate correctly under Cygwin." % e
|
||||
)
|
||||
kernel32 = ctypes.CDLL("kernel32")
|
||||
for name, value in environ:
|
||||
kernel32.SetEnvironmentVariableA(name, value)
|
||||
else:
|
||||
os.environ.update(environ)
|
||||
|
||||
|
||||
_setup_environment(
|
||||
[
|
||||
# Oracle takes client-side character set encoding from the environment.
|
||||
("NLS_LANG", ".AL32UTF8"),
|
||||
# This prevents Unicode from getting mangled by getting encoded into the
|
||||
# potentially non-Unicode database character set.
|
||||
("ORA_NCHAR_LITERAL_REPLACE", "TRUE"),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
# Some of these import oracledb, so import them after checking if it's
|
||||
# installed.
|
||||
from .client import DatabaseClient # NOQA
|
||||
from .creation import DatabaseCreation # NOQA
|
||||
from .features import DatabaseFeatures # NOQA
|
||||
from .introspection import DatabaseIntrospection # NOQA
|
||||
from .operations import DatabaseOperations # NOQA
|
||||
from .schema import DatabaseSchemaEditor # NOQA
|
||||
from .utils import Oracle_datetime, dsn # NOQA
|
||||
from .validation import DatabaseValidation # NOQA
|
||||
|
||||
|
||||
@contextmanager
|
||||
def wrap_oracle_errors():
|
||||
try:
|
||||
yield
|
||||
except Database.DatabaseError as e:
|
||||
# oracledb raises a oracledb.DatabaseError exception with the
|
||||
# following attributes and values:
|
||||
# code = 2091
|
||||
# message = 'ORA-02091: transaction rolled back
|
||||
# 'ORA-02291: integrity constraint (TEST_DJANGOTEST.SYS
|
||||
# _C00102056) violated - parent key not found'
|
||||
# or:
|
||||
# 'ORA-00001: unique constraint (DJANGOTEST.DEFERRABLE_
|
||||
# PINK_CONSTRAINT) violated
|
||||
# Convert that case to Django's IntegrityError exception.
|
||||
x = e.args[0]
|
||||
if (
|
||||
hasattr(x, "code")
|
||||
and hasattr(x, "message")
|
||||
and x.code == 2091
|
||||
and ("ORA-02291" in x.message or "ORA-00001" in x.message)
|
||||
):
|
||||
raise IntegrityError(*tuple(e.args))
|
||||
raise
|
||||
|
||||
|
||||
class _UninitializedOperatorsDescriptor:
|
||||
def __get__(self, instance, cls=None):
|
||||
# If connection.operators is looked up before a connection has been
|
||||
# created, transparently initialize connection.operators to avert an
|
||||
# AttributeError.
|
||||
if instance is None:
|
||||
raise AttributeError("operators not available as class attribute")
|
||||
# Creating a cursor will initialize the operators.
|
||||
instance.cursor().close()
|
||||
return instance.__dict__["operators"]
|
||||
|
||||
|
||||
class DatabaseWrapper(BaseDatabaseWrapper):
|
||||
vendor = "oracle"
|
||||
display_name = "Oracle"
|
||||
# This dictionary maps Field objects to their associated Oracle column
|
||||
# types, as strings. Column-type strings can contain format strings; they'll
|
||||
# be interpolated against the values of Field.__dict__ before being output.
|
||||
# If a column type is set to None, it won't be included in the output.
|
||||
#
|
||||
# Any format strings starting with "qn_" are quoted before being used in the
|
||||
# output (the "qn_" prefix is stripped before the lookup is performed.
|
||||
data_types = {
|
||||
"AutoField": "NUMBER(11) GENERATED BY DEFAULT ON NULL AS IDENTITY",
|
||||
"BigAutoField": "NUMBER(19) GENERATED BY DEFAULT ON NULL AS IDENTITY",
|
||||
"BinaryField": "BLOB",
|
||||
"BooleanField": "NUMBER(1)",
|
||||
"CharField": "NVARCHAR2(%(max_length)s)",
|
||||
"DateField": "DATE",
|
||||
"DateTimeField": "TIMESTAMP",
|
||||
"DecimalField": "NUMBER(%(max_digits)s, %(decimal_places)s)",
|
||||
"DurationField": "INTERVAL DAY(9) TO SECOND(6)",
|
||||
"FileField": "NVARCHAR2(%(max_length)s)",
|
||||
"FilePathField": "NVARCHAR2(%(max_length)s)",
|
||||
"FloatField": "DOUBLE PRECISION",
|
||||
"IntegerField": "NUMBER(11)",
|
||||
"JSONField": "NCLOB",
|
||||
"BigIntegerField": "NUMBER(19)",
|
||||
"IPAddressField": "VARCHAR2(15)",
|
||||
"GenericIPAddressField": "VARCHAR2(39)",
|
||||
"OneToOneField": "NUMBER(11)",
|
||||
"PositiveBigIntegerField": "NUMBER(19)",
|
||||
"PositiveIntegerField": "NUMBER(11)",
|
||||
"PositiveSmallIntegerField": "NUMBER(11)",
|
||||
"SlugField": "NVARCHAR2(%(max_length)s)",
|
||||
"SmallAutoField": "NUMBER(5) GENERATED BY DEFAULT ON NULL AS IDENTITY",
|
||||
"SmallIntegerField": "NUMBER(11)",
|
||||
"TextField": "NCLOB",
|
||||
"TimeField": "TIMESTAMP",
|
||||
"URLField": "VARCHAR2(%(max_length)s)",
|
||||
"UUIDField": "VARCHAR2(32)",
|
||||
}
|
||||
data_type_check_constraints = {
|
||||
"BooleanField": "%(qn_column)s IN (0,1)",
|
||||
"JSONField": "%(qn_column)s IS JSON",
|
||||
"PositiveBigIntegerField": "%(qn_column)s >= 0",
|
||||
"PositiveIntegerField": "%(qn_column)s >= 0",
|
||||
"PositiveSmallIntegerField": "%(qn_column)s >= 0",
|
||||
}
|
||||
|
||||
# Oracle doesn't support a database index on these columns.
|
||||
_limited_data_types = ("clob", "nclob", "blob")
|
||||
|
||||
operators = _UninitializedOperatorsDescriptor()
|
||||
|
||||
_standard_operators = {
|
||||
"exact": "= %s",
|
||||
"iexact": "= UPPER(%s)",
|
||||
"contains": (
|
||||
"LIKE TRANSLATE(%s USING NCHAR_CS) ESCAPE TRANSLATE('\\' USING NCHAR_CS)"
|
||||
),
|
||||
"icontains": (
|
||||
"LIKE UPPER(TRANSLATE(%s USING NCHAR_CS)) "
|
||||
"ESCAPE TRANSLATE('\\' USING NCHAR_CS)"
|
||||
),
|
||||
"gt": "> %s",
|
||||
"gte": ">= %s",
|
||||
"lt": "< %s",
|
||||
"lte": "<= %s",
|
||||
"startswith": (
|
||||
"LIKE TRANSLATE(%s USING NCHAR_CS) ESCAPE TRANSLATE('\\' USING NCHAR_CS)"
|
||||
),
|
||||
"endswith": (
|
||||
"LIKE TRANSLATE(%s USING NCHAR_CS) ESCAPE TRANSLATE('\\' USING NCHAR_CS)"
|
||||
),
|
||||
"istartswith": (
|
||||
"LIKE UPPER(TRANSLATE(%s USING NCHAR_CS)) "
|
||||
"ESCAPE TRANSLATE('\\' USING NCHAR_CS)"
|
||||
),
|
||||
"iendswith": (
|
||||
"LIKE UPPER(TRANSLATE(%s USING NCHAR_CS)) "
|
||||
"ESCAPE TRANSLATE('\\' USING NCHAR_CS)"
|
||||
),
|
||||
}
|
||||
|
||||
_likec_operators = {
|
||||
**_standard_operators,
|
||||
"contains": "LIKEC %s ESCAPE '\\'",
|
||||
"icontains": "LIKEC UPPER(%s) ESCAPE '\\'",
|
||||
"startswith": "LIKEC %s ESCAPE '\\'",
|
||||
"endswith": "LIKEC %s ESCAPE '\\'",
|
||||
"istartswith": "LIKEC UPPER(%s) ESCAPE '\\'",
|
||||
"iendswith": "LIKEC UPPER(%s) ESCAPE '\\'",
|
||||
}
|
||||
|
||||
# The patterns below are used to generate SQL pattern lookup clauses when
|
||||
# the right-hand side of the lookup isn't a raw string (it might be an expression
|
||||
# or the result of a bilateral transformation).
|
||||
# In those cases, special characters for LIKE operators (e.g. \, %, _)
|
||||
# should be escaped on the database side.
|
||||
#
|
||||
# Note: we use str.format() here for readability as '%' is used as a wildcard for
|
||||
# the LIKE operator.
|
||||
pattern_esc = r"REPLACE(REPLACE(REPLACE({}, '\', '\\'), '%%', '\%%'), '_', '\_')"
|
||||
_pattern_ops = {
|
||||
"contains": "'%%' || {} || '%%'",
|
||||
"icontains": "'%%' || UPPER({}) || '%%'",
|
||||
"startswith": "{} || '%%'",
|
||||
"istartswith": "UPPER({}) || '%%'",
|
||||
"endswith": "'%%' || {}",
|
||||
"iendswith": "'%%' || UPPER({})",
|
||||
}
|
||||
|
||||
_standard_pattern_ops = {
|
||||
k: "LIKE TRANSLATE( " + v + " USING NCHAR_CS)"
|
||||
" ESCAPE TRANSLATE('\\' USING NCHAR_CS)"
|
||||
for k, v in _pattern_ops.items()
|
||||
}
|
||||
_likec_pattern_ops = {
|
||||
k: "LIKEC " + v + " ESCAPE '\\'" for k, v in _pattern_ops.items()
|
||||
}
|
||||
|
||||
Database = Database
|
||||
SchemaEditorClass = DatabaseSchemaEditor
|
||||
# Classes instantiated in __init__().
|
||||
client_class = DatabaseClient
|
||||
creation_class = DatabaseCreation
|
||||
features_class = DatabaseFeatures
|
||||
introspection_class = DatabaseIntrospection
|
||||
ops_class = DatabaseOperations
|
||||
validation_class = DatabaseValidation
|
||||
_connection_pools = {}
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
use_returning_into = self.settings_dict["OPTIONS"].get(
|
||||
"use_returning_into", True
|
||||
)
|
||||
self.features.can_return_columns_from_insert = use_returning_into
|
||||
|
||||
@property
|
||||
def is_pool(self):
|
||||
return self.settings_dict["OPTIONS"].get("pool", False)
|
||||
|
||||
@property
|
||||
def pool(self):
|
||||
if not self.is_pool:
|
||||
return None
|
||||
|
||||
if self.settings_dict.get("CONN_MAX_AGE", 0) != 0:
|
||||
raise ImproperlyConfigured(
|
||||
"Pooling doesn't support persistent connections."
|
||||
)
|
||||
|
||||
pool_key = (self.alias, self.settings_dict["USER"])
|
||||
if pool_key not in self._connection_pools:
|
||||
connect_kwargs = self.get_connection_params()
|
||||
pool_options = connect_kwargs.pop("pool")
|
||||
if pool_options is not True:
|
||||
connect_kwargs.update(pool_options)
|
||||
|
||||
pool = Database.create_pool(
|
||||
user=self.settings_dict["USER"],
|
||||
password=self.settings_dict["PASSWORD"],
|
||||
dsn=dsn(self.settings_dict),
|
||||
**connect_kwargs,
|
||||
)
|
||||
self._connection_pools.setdefault(pool_key, pool)
|
||||
|
||||
return self._connection_pools[pool_key]
|
||||
|
||||
def close_pool(self):
|
||||
if self.pool:
|
||||
self.pool.close(force=True)
|
||||
pool_key = (self.alias, self.settings_dict["USER"])
|
||||
del self._connection_pools[pool_key]
|
||||
|
||||
def get_database_version(self):
|
||||
return self.oracle_version
|
||||
|
||||
def get_connection_params(self):
|
||||
# Pooling feature is only supported for oracledb.
|
||||
if self.is_pool and not is_oracledb:
|
||||
raise ImproperlyConfigured(
|
||||
"Pooling isn't supported by cx_Oracle. Use python-oracledb instead."
|
||||
)
|
||||
conn_params = self.settings_dict["OPTIONS"].copy()
|
||||
if "use_returning_into" in conn_params:
|
||||
del conn_params["use_returning_into"]
|
||||
return conn_params
|
||||
|
||||
@async_unsafe
|
||||
def get_new_connection(self, conn_params):
|
||||
if self.pool:
|
||||
return self.pool.acquire()
|
||||
return Database.connect(
|
||||
user=self.settings_dict["USER"],
|
||||
password=self.settings_dict["PASSWORD"],
|
||||
dsn=dsn(self.settings_dict),
|
||||
**conn_params,
|
||||
)
|
||||
|
||||
def init_connection_state(self):
|
||||
super().init_connection_state()
|
||||
cursor = self.create_cursor()
|
||||
# Set the territory first. The territory overrides NLS_DATE_FORMAT
|
||||
# and NLS_TIMESTAMP_FORMAT to the territory default. When all of
|
||||
# these are set in single statement it isn't clear what is supposed
|
||||
# to happen.
|
||||
cursor.execute("ALTER SESSION SET NLS_TERRITORY = 'AMERICA'")
|
||||
# Set Oracle date to ANSI date format. This only needs to execute
|
||||
# once when we create a new connection. We also set the Territory
|
||||
# to 'AMERICA' which forces Sunday to evaluate to a '1' in
|
||||
# TO_CHAR().
|
||||
cursor.execute(
|
||||
"ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI:SS'"
|
||||
" NLS_TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF'"
|
||||
+ (" TIME_ZONE = 'UTC'" if settings.USE_TZ else "")
|
||||
)
|
||||
cursor.close()
|
||||
if "operators" not in self.__dict__:
|
||||
# Ticket #14149: Check whether our LIKE implementation will
|
||||
# work for this connection or we need to fall back on LIKEC.
|
||||
# This check is performed only once per DatabaseWrapper
|
||||
# instance per thread, since subsequent connections will use
|
||||
# the same settings.
|
||||
cursor = self.create_cursor()
|
||||
try:
|
||||
cursor.execute(
|
||||
"SELECT 1 FROM DUAL WHERE DUMMY %s"
|
||||
% self._standard_operators["contains"],
|
||||
["X"],
|
||||
)
|
||||
except Database.DatabaseError:
|
||||
self.operators = self._likec_operators
|
||||
self.pattern_ops = self._likec_pattern_ops
|
||||
else:
|
||||
self.operators = self._standard_operators
|
||||
self.pattern_ops = self._standard_pattern_ops
|
||||
cursor.close()
|
||||
self.connection.stmtcachesize = 20
|
||||
# Ensure all changes are preserved even when AUTOCOMMIT is False.
|
||||
if not self.get_autocommit():
|
||||
self.commit()
|
||||
|
||||
@async_unsafe
|
||||
def create_cursor(self, name=None):
|
||||
return FormatStylePlaceholderCursor(self.connection, self)
|
||||
|
||||
def _commit(self):
|
||||
if self.connection is not None:
|
||||
with debug_transaction(self, "COMMIT"), wrap_oracle_errors():
|
||||
return self.connection.commit()
|
||||
|
||||
# Oracle doesn't support releasing savepoints. But we fake them when query
|
||||
# logging is enabled to keep query counts consistent with other backends.
|
||||
def _savepoint_commit(self, sid):
|
||||
if self.queries_logged:
|
||||
self.queries_log.append(
|
||||
{
|
||||
"sql": "-- RELEASE SAVEPOINT %s (faked)" % self.ops.quote_name(sid),
|
||||
"time": "0.000",
|
||||
}
|
||||
)
|
||||
|
||||
def _set_autocommit(self, autocommit):
|
||||
with self.wrap_database_errors:
|
||||
self.connection.autocommit = autocommit
|
||||
|
||||
def check_constraints(self, table_names=None):
|
||||
"""
|
||||
Check constraints by setting them to immediate. Return them to deferred
|
||||
afterward.
|
||||
"""
|
||||
with self.cursor() as cursor:
|
||||
cursor.execute("SET CONSTRAINTS ALL IMMEDIATE")
|
||||
cursor.execute("SET CONSTRAINTS ALL DEFERRED")
|
||||
|
||||
def is_usable(self):
|
||||
try:
|
||||
self.connection.ping()
|
||||
except Database.Error:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def close_if_health_check_failed(self):
|
||||
if self.pool:
|
||||
# The pool only returns healthy connections.
|
||||
return
|
||||
return super().close_if_health_check_failed()
|
||||
|
||||
@cached_property
|
||||
def oracle_version(self):
|
||||
with self.temporary_connection():
|
||||
return tuple(int(x) for x in self.connection.version.split("."))
|
||||
|
||||
@cached_property
|
||||
def oracledb_version(self):
|
||||
return get_version_tuple(Database.__version__)
|
||||
|
||||
|
||||
class OracleParam:
|
||||
"""
|
||||
Wrapper object for formatting parameters for Oracle. If the string
|
||||
representation of the value is large enough (greater than 4000 characters)
|
||||
the input size needs to be set as CLOB. Alternatively, if the parameter
|
||||
has an `input_size` attribute, then the value of the `input_size` attribute
|
||||
will be used instead. Otherwise, no input size will be set for the
|
||||
parameter when executing the query.
|
||||
"""
|
||||
|
||||
def __init__(self, param, cursor, strings_only=False):
|
||||
# With raw SQL queries, datetimes can reach this function
|
||||
# without being converted by DateTimeField.get_db_prep_value.
|
||||
if settings.USE_TZ and (
|
||||
isinstance(param, datetime.datetime)
|
||||
and not isinstance(param, Oracle_datetime)
|
||||
):
|
||||
param = Oracle_datetime.from_datetime(param)
|
||||
|
||||
string_size = 0
|
||||
has_boolean_data_type = (
|
||||
cursor.database.features.supports_boolean_expr_in_select_clause
|
||||
)
|
||||
if not has_boolean_data_type:
|
||||
# Oracle < 23c doesn't recognize True and False correctly.
|
||||
if param is True:
|
||||
param = 1
|
||||
elif param is False:
|
||||
param = 0
|
||||
if hasattr(param, "bind_parameter"):
|
||||
self.force_bytes = param.bind_parameter(cursor)
|
||||
elif isinstance(param, (Database.Binary, datetime.timedelta)):
|
||||
self.force_bytes = param
|
||||
else:
|
||||
# To transmit to the database, we need Unicode if supported
|
||||
# To get size right, we must consider bytes.
|
||||
self.force_bytes = force_str(param, cursor.charset, strings_only)
|
||||
if isinstance(self.force_bytes, str):
|
||||
# We could optimize by only converting up to 4000 bytes here
|
||||
string_size = len(force_bytes(param, cursor.charset, strings_only))
|
||||
if hasattr(param, "input_size"):
|
||||
# If parameter has `input_size` attribute, use that.
|
||||
self.input_size = param.input_size
|
||||
elif string_size > 4000:
|
||||
# Mark any string param greater than 4000 characters as a CLOB.
|
||||
self.input_size = Database.DB_TYPE_CLOB
|
||||
elif isinstance(param, datetime.datetime):
|
||||
self.input_size = Database.DB_TYPE_TIMESTAMP
|
||||
elif has_boolean_data_type and isinstance(param, bool):
|
||||
self.input_size = Database.DB_TYPE_BOOLEAN
|
||||
else:
|
||||
self.input_size = None
|
||||
|
||||
|
||||
class VariableWrapper:
|
||||
"""
|
||||
An adapter class for cursor variables that prevents the wrapped object
|
||||
from being converted into a string when used to instantiate an OracleParam.
|
||||
This can be used generally for any other object that should be passed into
|
||||
Cursor.execute as-is.
|
||||
"""
|
||||
|
||||
def __init__(self, var):
|
||||
self.var = var
|
||||
|
||||
def bind_parameter(self, cursor):
|
||||
return self.var
|
||||
|
||||
def __getattr__(self, key):
|
||||
return getattr(self.var, key)
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
if key == "var":
|
||||
self.__dict__[key] = value
|
||||
else:
|
||||
setattr(self.var, key, value)
|
||||
|
||||
|
||||
class FormatStylePlaceholderCursor:
|
||||
"""
|
||||
Django uses "format" (e.g. '%s') style placeholders, but Oracle uses ":var"
|
||||
style. This fixes it -- but note that if you want to use a literal "%s" in
|
||||
a query, you'll need to use "%%s".
|
||||
"""
|
||||
|
||||
charset = "utf-8"
|
||||
|
||||
def __init__(self, connection, database):
|
||||
self.cursor = connection.cursor()
|
||||
self.cursor.outputtypehandler = self._output_type_handler
|
||||
self.database = database
|
||||
|
||||
@staticmethod
|
||||
def _output_number_converter(value):
|
||||
return decimal.Decimal(value) if "." in value else int(value)
|
||||
|
||||
@staticmethod
|
||||
def _get_decimal_converter(precision, scale):
|
||||
if scale == 0:
|
||||
return int
|
||||
context = decimal.Context(prec=precision)
|
||||
quantize_value = decimal.Decimal(1).scaleb(-scale)
|
||||
return lambda v: decimal.Decimal(v).quantize(quantize_value, context=context)
|
||||
|
||||
@staticmethod
|
||||
def _output_type_handler(cursor, name, defaultType, length, precision, scale):
|
||||
"""
|
||||
Called for each db column fetched from cursors. Return numbers as the
|
||||
appropriate Python type, and NCLOB with JSON as strings.
|
||||
"""
|
||||
if defaultType == Database.NUMBER:
|
||||
if scale == -127:
|
||||
if precision == 0:
|
||||
# NUMBER column: decimal-precision floating point.
|
||||
# This will normally be an integer from a sequence,
|
||||
# but it could be a decimal value.
|
||||
outconverter = FormatStylePlaceholderCursor._output_number_converter
|
||||
else:
|
||||
# FLOAT column: binary-precision floating point.
|
||||
# This comes from FloatField columns.
|
||||
outconverter = float
|
||||
elif precision > 0:
|
||||
# NUMBER(p,s) column: decimal-precision fixed point.
|
||||
# This comes from IntegerField and DecimalField columns.
|
||||
outconverter = FormatStylePlaceholderCursor._get_decimal_converter(
|
||||
precision, scale
|
||||
)
|
||||
else:
|
||||
# No type information. This normally comes from a
|
||||
# mathematical expression in the SELECT list. Guess int
|
||||
# or Decimal based on whether it has a decimal point.
|
||||
outconverter = FormatStylePlaceholderCursor._output_number_converter
|
||||
return cursor.var(
|
||||
Database.STRING,
|
||||
size=255,
|
||||
arraysize=cursor.arraysize,
|
||||
outconverter=outconverter,
|
||||
)
|
||||
# oracledb 2.0.0+ returns NLOB columns with IS JSON constraints as
|
||||
# dicts. Use a no-op converter to avoid this.
|
||||
elif defaultType == Database.DB_TYPE_NCLOB:
|
||||
return cursor.var(Database.DB_TYPE_NCLOB, arraysize=cursor.arraysize)
|
||||
|
||||
def _format_params(self, params):
|
||||
try:
|
||||
return {k: OracleParam(v, self, True) for k, v in params.items()}
|
||||
except AttributeError:
|
||||
return tuple(OracleParam(p, self, True) for p in params)
|
||||
|
||||
def _guess_input_sizes(self, params_list):
|
||||
# Try dict handling; if that fails, treat as sequence
|
||||
if hasattr(params_list[0], "keys"):
|
||||
sizes = {}
|
||||
for params in params_list:
|
||||
for k, value in params.items():
|
||||
if value.input_size:
|
||||
sizes[k] = value.input_size
|
||||
if sizes:
|
||||
self.setinputsizes(**sizes)
|
||||
else:
|
||||
# It's not a list of dicts; it's a list of sequences
|
||||
sizes = [None] * len(params_list[0])
|
||||
for params in params_list:
|
||||
for i, value in enumerate(params):
|
||||
if value.input_size:
|
||||
sizes[i] = value.input_size
|
||||
if sizes:
|
||||
self.setinputsizes(*sizes)
|
||||
|
||||
def _param_generator(self, params):
|
||||
# Try dict handling; if that fails, treat as sequence
|
||||
if hasattr(params, "items"):
|
||||
return {k: v.force_bytes for k, v in params.items()}
|
||||
else:
|
||||
return [p.force_bytes for p in params]
|
||||
|
||||
def _fix_for_params(self, query, params, unify_by_values=False):
|
||||
# oracledb wants no trailing ';' for SQL statements. For PL/SQL, it
|
||||
# it does want a trailing ';' but not a trailing '/'. However, these
|
||||
# characters must be included in the original query in case the query
|
||||
# is being passed to SQL*Plus.
|
||||
if query.endswith(";") or query.endswith("/"):
|
||||
query = query[:-1]
|
||||
if params is None:
|
||||
params = []
|
||||
elif hasattr(params, "keys"):
|
||||
# Handle params as dict
|
||||
args = {k: ":%s" % k for k in params}
|
||||
query %= args
|
||||
elif unify_by_values and params:
|
||||
# Handle params as a dict with unified query parameters by their
|
||||
# values. It can be used only in single query execute() because
|
||||
# executemany() shares the formatted query with each of the params
|
||||
# list. e.g. for input params = [0.75, 2, 0.75, 'sth', 0.75]
|
||||
# params_dict = {
|
||||
# (float, 0.75): ':arg0',
|
||||
# (int, 2): ':arg1',
|
||||
# (str, 'sth'): ':arg2',
|
||||
# }
|
||||
# args = [':arg0', ':arg1', ':arg0', ':arg2', ':arg0']
|
||||
# params = {':arg0': 0.75, ':arg1': 2, ':arg2': 'sth'}
|
||||
# The type of parameters in param_types keys is necessary to avoid
|
||||
# unifying 0/1 with False/True.
|
||||
param_types = [(type(param), param) for param in params]
|
||||
params_dict = {
|
||||
param_type: ":arg%d" % i
|
||||
for i, param_type in enumerate(dict.fromkeys(param_types))
|
||||
}
|
||||
args = [params_dict[param_type] for param_type in param_types]
|
||||
params = {
|
||||
placeholder: param for (_, param), placeholder in params_dict.items()
|
||||
}
|
||||
query %= tuple(args)
|
||||
else:
|
||||
# Handle params as sequence
|
||||
args = [(":arg%d" % i) for i in range(len(params))]
|
||||
query %= tuple(args)
|
||||
return query, self._format_params(params)
|
||||
|
||||
def execute(self, query, params=None):
|
||||
query, params = self._fix_for_params(query, params, unify_by_values=True)
|
||||
self._guess_input_sizes([params])
|
||||
with wrap_oracle_errors():
|
||||
return self.cursor.execute(query, self._param_generator(params))
|
||||
|
||||
def executemany(self, query, params=None):
|
||||
if not params:
|
||||
# No params given, nothing to do
|
||||
return None
|
||||
# uniform treatment for sequences and iterables
|
||||
params_iter = iter(params)
|
||||
query, firstparams = self._fix_for_params(query, next(params_iter))
|
||||
# we build a list of formatted params; as we're going to traverse it
|
||||
# more than once, we can't make it lazy by using a generator
|
||||
formatted = [firstparams] + [self._format_params(p) for p in params_iter]
|
||||
self._guess_input_sizes(formatted)
|
||||
with wrap_oracle_errors():
|
||||
return self.cursor.executemany(
|
||||
query, [self._param_generator(p) for p in formatted]
|
||||
)
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
self.cursor.close()
|
||||
except Database.InterfaceError:
|
||||
# already closed
|
||||
pass
|
||||
|
||||
def var(self, *args):
|
||||
return VariableWrapper(self.cursor.var(*args))
|
||||
|
||||
def arrayvar(self, *args):
|
||||
return VariableWrapper(self.cursor.arrayvar(*args))
|
||||
|
||||
def __getattr__(self, attr):
|
||||
return getattr(self.cursor, attr)
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.cursor)
|
||||
|
|
@ -0,0 +1,27 @@
|
|||
import shutil
|
||||
|
||||
from django.db.backends.base.client import BaseDatabaseClient
|
||||
|
||||
|
||||
class DatabaseClient(BaseDatabaseClient):
|
||||
executable_name = "sqlplus"
|
||||
wrapper_name = "rlwrap"
|
||||
|
||||
@staticmethod
|
||||
def connect_string(settings_dict):
|
||||
from django.db.backends.oracle.utils import dsn
|
||||
|
||||
return '%s/"%s"@%s' % (
|
||||
settings_dict["USER"],
|
||||
settings_dict["PASSWORD"],
|
||||
dsn(settings_dict),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def settings_to_cmd_args_env(cls, settings_dict, parameters):
|
||||
args = [cls.executable_name, "-L", cls.connect_string(settings_dict)]
|
||||
wrapper_path = shutil.which(cls.wrapper_name)
|
||||
if wrapper_path:
|
||||
args = [wrapper_path, *args]
|
||||
args.extend(parameters)
|
||||
return args, None
|
||||
|
|
@ -0,0 +1,467 @@
|
|||
import sys
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import DatabaseError
|
||||
from django.db.backends.base.creation import BaseDatabaseCreation
|
||||
from django.utils.crypto import get_random_string
|
||||
from django.utils.functional import cached_property
|
||||
|
||||
TEST_DATABASE_PREFIX = "test_"
|
||||
|
||||
|
||||
class DatabaseCreation(BaseDatabaseCreation):
|
||||
@cached_property
|
||||
def _maindb_connection(self):
|
||||
"""
|
||||
This is analogous to other backends' `_nodb_connection` property,
|
||||
which allows access to an "administrative" connection which can
|
||||
be used to manage the test databases.
|
||||
For Oracle, the only connection that can be used for that purpose
|
||||
is the main (non-test) connection.
|
||||
"""
|
||||
settings_dict = settings.DATABASES[self.connection.alias]
|
||||
user = settings_dict.get("SAVED_USER") or settings_dict["USER"]
|
||||
password = settings_dict.get("SAVED_PASSWORD") or settings_dict["PASSWORD"]
|
||||
settings_dict = {**settings_dict, "USER": user, "PASSWORD": password}
|
||||
DatabaseWrapper = type(self.connection)
|
||||
return DatabaseWrapper(settings_dict, alias=self.connection.alias)
|
||||
|
||||
def _create_test_db(self, verbosity=1, autoclobber=False, keepdb=False):
|
||||
parameters = self._get_test_db_params()
|
||||
with self._maindb_connection.cursor() as cursor:
|
||||
if self._test_database_create():
|
||||
try:
|
||||
self._execute_test_db_creation(
|
||||
cursor, parameters, verbosity, keepdb
|
||||
)
|
||||
except Exception as e:
|
||||
if "ORA-01543" not in str(e):
|
||||
# All errors except "tablespace already exists" cancel tests
|
||||
self.log("Got an error creating the test database: %s" % e)
|
||||
sys.exit(2)
|
||||
if not autoclobber:
|
||||
confirm = input(
|
||||
"It appears the test database, %s, already exists. "
|
||||
"Type 'yes' to delete it, or 'no' to cancel: "
|
||||
% parameters["user"]
|
||||
)
|
||||
if autoclobber or confirm == "yes":
|
||||
if verbosity >= 1:
|
||||
self.log(
|
||||
"Destroying old test database for alias '%s'..."
|
||||
% self.connection.alias
|
||||
)
|
||||
try:
|
||||
self._execute_test_db_destruction(
|
||||
cursor, parameters, verbosity
|
||||
)
|
||||
except DatabaseError as e:
|
||||
if "ORA-29857" in str(e):
|
||||
self._handle_objects_preventing_db_destruction(
|
||||
cursor, parameters, verbosity, autoclobber
|
||||
)
|
||||
else:
|
||||
# Ran into a database error that isn't about
|
||||
# leftover objects in the tablespace.
|
||||
self.log(
|
||||
"Got an error destroying the old test database: %s"
|
||||
% e
|
||||
)
|
||||
sys.exit(2)
|
||||
except Exception as e:
|
||||
self.log(
|
||||
"Got an error destroying the old test database: %s" % e
|
||||
)
|
||||
sys.exit(2)
|
||||
try:
|
||||
self._execute_test_db_creation(
|
||||
cursor, parameters, verbosity, keepdb
|
||||
)
|
||||
except Exception as e:
|
||||
self.log(
|
||||
"Got an error recreating the test database: %s" % e
|
||||
)
|
||||
sys.exit(2)
|
||||
else:
|
||||
self.log("Tests cancelled.")
|
||||
sys.exit(1)
|
||||
|
||||
if self._test_user_create():
|
||||
if verbosity >= 1:
|
||||
self.log("Creating test user...")
|
||||
try:
|
||||
self._create_test_user(cursor, parameters, verbosity, keepdb)
|
||||
except Exception as e:
|
||||
if "ORA-01920" not in str(e):
|
||||
# All errors except "user already exists" cancel tests
|
||||
self.log("Got an error creating the test user: %s" % e)
|
||||
sys.exit(2)
|
||||
if not autoclobber:
|
||||
confirm = input(
|
||||
"It appears the test user, %s, already exists. Type "
|
||||
"'yes' to delete it, or 'no' to cancel: "
|
||||
% parameters["user"]
|
||||
)
|
||||
if autoclobber or confirm == "yes":
|
||||
try:
|
||||
if verbosity >= 1:
|
||||
self.log("Destroying old test user...")
|
||||
self._destroy_test_user(cursor, parameters, verbosity)
|
||||
if verbosity >= 1:
|
||||
self.log("Creating test user...")
|
||||
self._create_test_user(
|
||||
cursor, parameters, verbosity, keepdb
|
||||
)
|
||||
except Exception as e:
|
||||
self.log("Got an error recreating the test user: %s" % e)
|
||||
sys.exit(2)
|
||||
else:
|
||||
self.log("Tests cancelled.")
|
||||
sys.exit(1)
|
||||
# Done with main user -- test user and tablespaces created.
|
||||
self._maindb_connection.close()
|
||||
self._switch_to_test_user(parameters)
|
||||
return self.connection.settings_dict["NAME"]
|
||||
|
||||
def _switch_to_test_user(self, parameters):
|
||||
"""
|
||||
Switch to the user that's used for creating the test database.
|
||||
|
||||
Oracle doesn't have the concept of separate databases under the same
|
||||
user, so a separate user is used; see _create_test_db(). The main user
|
||||
is also needed for cleanup when testing is completed, so save its
|
||||
credentials in the SAVED_USER/SAVED_PASSWORD key in the settings dict.
|
||||
"""
|
||||
real_settings = settings.DATABASES[self.connection.alias]
|
||||
real_settings["SAVED_USER"] = self.connection.settings_dict["SAVED_USER"] = (
|
||||
self.connection.settings_dict["USER"]
|
||||
)
|
||||
real_settings["SAVED_PASSWORD"] = self.connection.settings_dict[
|
||||
"SAVED_PASSWORD"
|
||||
] = self.connection.settings_dict["PASSWORD"]
|
||||
real_test_settings = real_settings["TEST"]
|
||||
test_settings = self.connection.settings_dict["TEST"]
|
||||
real_test_settings["USER"] = real_settings["USER"] = test_settings["USER"] = (
|
||||
self.connection.settings_dict["USER"]
|
||||
) = parameters["user"]
|
||||
real_settings["PASSWORD"] = self.connection.settings_dict["PASSWORD"] = (
|
||||
parameters["password"]
|
||||
)
|
||||
|
||||
def set_as_test_mirror(self, primary_settings_dict):
|
||||
"""
|
||||
Set this database up to be used in testing as a mirror of a primary
|
||||
database whose settings are given.
|
||||
"""
|
||||
self.connection.settings_dict["USER"] = primary_settings_dict["USER"]
|
||||
self.connection.settings_dict["PASSWORD"] = primary_settings_dict["PASSWORD"]
|
||||
|
||||
def _handle_objects_preventing_db_destruction(
|
||||
self, cursor, parameters, verbosity, autoclobber
|
||||
):
|
||||
# There are objects in the test tablespace which prevent dropping it
|
||||
# The easy fix is to drop the test user -- but are we allowed to do so?
|
||||
self.log(
|
||||
"There are objects in the old test database which prevent its destruction."
|
||||
"\nIf they belong to the test user, deleting the user will allow the test "
|
||||
"database to be recreated.\n"
|
||||
"Otherwise, you will need to find and remove each of these objects, "
|
||||
"or use a different tablespace.\n"
|
||||
)
|
||||
if self._test_user_create():
|
||||
if not autoclobber:
|
||||
confirm = input("Type 'yes' to delete user %s: " % parameters["user"])
|
||||
if autoclobber or confirm == "yes":
|
||||
try:
|
||||
if verbosity >= 1:
|
||||
self.log("Destroying old test user...")
|
||||
self._destroy_test_user(cursor, parameters, verbosity)
|
||||
except Exception as e:
|
||||
self.log("Got an error destroying the test user: %s" % e)
|
||||
sys.exit(2)
|
||||
try:
|
||||
if verbosity >= 1:
|
||||
self.log(
|
||||
"Destroying old test database for alias '%s'..."
|
||||
% self.connection.alias
|
||||
)
|
||||
self._execute_test_db_destruction(cursor, parameters, verbosity)
|
||||
except Exception as e:
|
||||
self.log("Got an error destroying the test database: %s" % e)
|
||||
sys.exit(2)
|
||||
else:
|
||||
self.log("Tests cancelled -- test database cannot be recreated.")
|
||||
sys.exit(1)
|
||||
else:
|
||||
self.log(
|
||||
"Django is configured to use pre-existing test user '%s',"
|
||||
" and will not attempt to delete it." % parameters["user"]
|
||||
)
|
||||
self.log("Tests cancelled -- test database cannot be recreated.")
|
||||
sys.exit(1)
|
||||
|
||||
def _destroy_test_db(self, test_database_name, verbosity=1):
|
||||
"""
|
||||
Destroy a test database, prompting the user for confirmation if the
|
||||
database already exists. Return the name of the test database created.
|
||||
"""
|
||||
if not self.connection.is_pool:
|
||||
self.connection.settings_dict["USER"] = self.connection.settings_dict[
|
||||
"SAVED_USER"
|
||||
]
|
||||
self.connection.settings_dict["PASSWORD"] = self.connection.settings_dict[
|
||||
"SAVED_PASSWORD"
|
||||
]
|
||||
self.connection.close()
|
||||
self.connection.close_pool()
|
||||
parameters = self._get_test_db_params()
|
||||
with self._maindb_connection.cursor() as cursor:
|
||||
if self._test_user_create():
|
||||
if verbosity >= 1:
|
||||
self.log("Destroying test user...")
|
||||
self._destroy_test_user(cursor, parameters, verbosity)
|
||||
if self._test_database_create():
|
||||
if verbosity >= 1:
|
||||
self.log("Destroying test database tables...")
|
||||
self._execute_test_db_destruction(cursor, parameters, verbosity)
|
||||
self._maindb_connection.close()
|
||||
self._maindb_connection.close_pool()
|
||||
|
||||
def _execute_test_db_creation(self, cursor, parameters, verbosity, keepdb=False):
|
||||
if verbosity >= 2:
|
||||
self.log("_create_test_db(): dbname = %s" % parameters["user"])
|
||||
if self._test_database_oracle_managed_files():
|
||||
statements = [
|
||||
"""
|
||||
CREATE TABLESPACE %(tblspace)s
|
||||
DATAFILE SIZE %(size)s
|
||||
AUTOEXTEND ON NEXT %(extsize)s MAXSIZE %(maxsize)s
|
||||
""",
|
||||
"""
|
||||
CREATE TEMPORARY TABLESPACE %(tblspace_temp)s
|
||||
TEMPFILE SIZE %(size_tmp)s
|
||||
AUTOEXTEND ON NEXT %(extsize_tmp)s MAXSIZE %(maxsize_tmp)s
|
||||
""",
|
||||
]
|
||||
else:
|
||||
statements = [
|
||||
"""
|
||||
CREATE TABLESPACE %(tblspace)s
|
||||
DATAFILE '%(datafile)s' SIZE %(size)s REUSE
|
||||
AUTOEXTEND ON NEXT %(extsize)s MAXSIZE %(maxsize)s
|
||||
""",
|
||||
"""
|
||||
CREATE TEMPORARY TABLESPACE %(tblspace_temp)s
|
||||
TEMPFILE '%(datafile_tmp)s' SIZE %(size_tmp)s REUSE
|
||||
AUTOEXTEND ON NEXT %(extsize_tmp)s MAXSIZE %(maxsize_tmp)s
|
||||
""",
|
||||
]
|
||||
# Ignore "tablespace already exists" error when keepdb is on.
|
||||
acceptable_ora_err = "ORA-01543" if keepdb else None
|
||||
self._execute_allow_fail_statements(
|
||||
cursor, statements, parameters, verbosity, acceptable_ora_err
|
||||
)
|
||||
|
||||
def _create_test_user(self, cursor, parameters, verbosity, keepdb=False):
|
||||
if verbosity >= 2:
|
||||
self.log("_create_test_user(): username = %s" % parameters["user"])
|
||||
statements = [
|
||||
"""CREATE USER %(user)s
|
||||
IDENTIFIED BY "%(password)s"
|
||||
DEFAULT TABLESPACE %(tblspace)s
|
||||
TEMPORARY TABLESPACE %(tblspace_temp)s
|
||||
QUOTA UNLIMITED ON %(tblspace)s
|
||||
""",
|
||||
"""GRANT CREATE SESSION,
|
||||
CREATE TABLE,
|
||||
CREATE SEQUENCE,
|
||||
CREATE PROCEDURE,
|
||||
CREATE TRIGGER
|
||||
TO %(user)s""",
|
||||
]
|
||||
# Ignore "user already exists" error when keepdb is on
|
||||
acceptable_ora_err = "ORA-01920" if keepdb else None
|
||||
success = self._execute_allow_fail_statements(
|
||||
cursor, statements, parameters, verbosity, acceptable_ora_err
|
||||
)
|
||||
# If the password was randomly generated, change the user accordingly.
|
||||
if not success and self._test_settings_get("PASSWORD") is None:
|
||||
set_password = 'ALTER USER %(user)s IDENTIFIED BY "%(password)s"'
|
||||
self._execute_statements(cursor, [set_password], parameters, verbosity)
|
||||
# Most test suites can be run without "create view" and
|
||||
# "create materialized view" privileges. But some need it.
|
||||
for object_type in ("VIEW", "MATERIALIZED VIEW"):
|
||||
extra = "GRANT CREATE %(object_type)s TO %(user)s"
|
||||
parameters["object_type"] = object_type
|
||||
success = self._execute_allow_fail_statements(
|
||||
cursor, [extra], parameters, verbosity, "ORA-01031"
|
||||
)
|
||||
if not success and verbosity >= 2:
|
||||
self.log(
|
||||
"Failed to grant CREATE %s permission to test user. This may be ok."
|
||||
% object_type
|
||||
)
|
||||
|
||||
def _execute_test_db_destruction(self, cursor, parameters, verbosity):
|
||||
if verbosity >= 2:
|
||||
self.log("_execute_test_db_destruction(): dbname=%s" % parameters["user"])
|
||||
statements = [
|
||||
"DROP TABLESPACE %(tblspace)s "
|
||||
"INCLUDING CONTENTS AND DATAFILES CASCADE CONSTRAINTS",
|
||||
"DROP TABLESPACE %(tblspace_temp)s "
|
||||
"INCLUDING CONTENTS AND DATAFILES CASCADE CONSTRAINTS",
|
||||
]
|
||||
self._execute_statements(cursor, statements, parameters, verbosity)
|
||||
|
||||
def _destroy_test_user(self, cursor, parameters, verbosity):
|
||||
if verbosity >= 2:
|
||||
self.log("_destroy_test_user(): user=%s" % parameters["user"])
|
||||
self.log("Be patient. This can take some time...")
|
||||
statements = [
|
||||
"DROP USER %(user)s CASCADE",
|
||||
]
|
||||
self._execute_statements(cursor, statements, parameters, verbosity)
|
||||
|
||||
def _execute_statements(
|
||||
self, cursor, statements, parameters, verbosity, allow_quiet_fail=False
|
||||
):
|
||||
for template in statements:
|
||||
stmt = template % parameters
|
||||
if verbosity >= 2:
|
||||
print(stmt)
|
||||
try:
|
||||
cursor.execute(stmt)
|
||||
except Exception as err:
|
||||
if (not allow_quiet_fail) or verbosity >= 2:
|
||||
self.log("Failed (%s)" % (err))
|
||||
raise
|
||||
|
||||
def _execute_allow_fail_statements(
|
||||
self, cursor, statements, parameters, verbosity, acceptable_ora_err
|
||||
):
|
||||
"""
|
||||
Execute statements which are allowed to fail silently if the Oracle
|
||||
error code given by `acceptable_ora_err` is raised. Return True if the
|
||||
statements execute without an exception, or False otherwise.
|
||||
"""
|
||||
try:
|
||||
# Statement can fail when acceptable_ora_err is not None
|
||||
allow_quiet_fail = (
|
||||
acceptable_ora_err is not None and len(acceptable_ora_err) > 0
|
||||
)
|
||||
self._execute_statements(
|
||||
cursor,
|
||||
statements,
|
||||
parameters,
|
||||
verbosity,
|
||||
allow_quiet_fail=allow_quiet_fail,
|
||||
)
|
||||
return True
|
||||
except DatabaseError as err:
|
||||
description = str(err)
|
||||
if acceptable_ora_err is None or acceptable_ora_err not in description:
|
||||
raise
|
||||
return False
|
||||
|
||||
def _get_test_db_params(self):
|
||||
return {
|
||||
"dbname": self._test_database_name(),
|
||||
"user": self._test_database_user(),
|
||||
"password": self._test_database_passwd(),
|
||||
"tblspace": self._test_database_tblspace(),
|
||||
"tblspace_temp": self._test_database_tblspace_tmp(),
|
||||
"datafile": self._test_database_tblspace_datafile(),
|
||||
"datafile_tmp": self._test_database_tblspace_tmp_datafile(),
|
||||
"maxsize": self._test_database_tblspace_maxsize(),
|
||||
"maxsize_tmp": self._test_database_tblspace_tmp_maxsize(),
|
||||
"size": self._test_database_tblspace_size(),
|
||||
"size_tmp": self._test_database_tblspace_tmp_size(),
|
||||
"extsize": self._test_database_tblspace_extsize(),
|
||||
"extsize_tmp": self._test_database_tblspace_tmp_extsize(),
|
||||
}
|
||||
|
||||
def _test_settings_get(self, key, default=None, prefixed=None):
|
||||
"""
|
||||
Return a value from the test settings dict, or a given default, or a
|
||||
prefixed entry from the main settings dict.
|
||||
"""
|
||||
settings_dict = self.connection.settings_dict
|
||||
val = settings_dict["TEST"].get(key, default)
|
||||
if val is None and prefixed:
|
||||
val = TEST_DATABASE_PREFIX + settings_dict[prefixed]
|
||||
return val
|
||||
|
||||
def _test_database_name(self):
|
||||
return self._test_settings_get("NAME", prefixed="NAME")
|
||||
|
||||
def _test_database_create(self):
|
||||
return self._test_settings_get("CREATE_DB", default=True)
|
||||
|
||||
def _test_user_create(self):
|
||||
return self._test_settings_get("CREATE_USER", default=True)
|
||||
|
||||
def _test_database_user(self):
|
||||
return self._test_settings_get("USER", prefixed="USER")
|
||||
|
||||
def _test_database_passwd(self):
|
||||
password = self._test_settings_get("PASSWORD")
|
||||
if password is None and self._test_user_create():
|
||||
# Oracle passwords are limited to 30 chars and can't contain symbols.
|
||||
password = get_random_string(30)
|
||||
return password
|
||||
|
||||
def _test_database_tblspace(self):
|
||||
return self._test_settings_get("TBLSPACE", prefixed="USER")
|
||||
|
||||
def _test_database_tblspace_tmp(self):
|
||||
settings_dict = self.connection.settings_dict
|
||||
return settings_dict["TEST"].get(
|
||||
"TBLSPACE_TMP", TEST_DATABASE_PREFIX + settings_dict["USER"] + "_temp"
|
||||
)
|
||||
|
||||
def _test_database_tblspace_datafile(self):
|
||||
tblspace = "%s.dbf" % self._test_database_tblspace()
|
||||
return self._test_settings_get("DATAFILE", default=tblspace)
|
||||
|
||||
def _test_database_tblspace_tmp_datafile(self):
|
||||
tblspace = "%s.dbf" % self._test_database_tblspace_tmp()
|
||||
return self._test_settings_get("DATAFILE_TMP", default=tblspace)
|
||||
|
||||
def _test_database_tblspace_maxsize(self):
|
||||
return self._test_settings_get("DATAFILE_MAXSIZE", default="500M")
|
||||
|
||||
def _test_database_tblspace_tmp_maxsize(self):
|
||||
return self._test_settings_get("DATAFILE_TMP_MAXSIZE", default="500M")
|
||||
|
||||
def _test_database_tblspace_size(self):
|
||||
return self._test_settings_get("DATAFILE_SIZE", default="50M")
|
||||
|
||||
def _test_database_tblspace_tmp_size(self):
|
||||
return self._test_settings_get("DATAFILE_TMP_SIZE", default="50M")
|
||||
|
||||
def _test_database_tblspace_extsize(self):
|
||||
return self._test_settings_get("DATAFILE_EXTSIZE", default="25M")
|
||||
|
||||
def _test_database_tblspace_tmp_extsize(self):
|
||||
return self._test_settings_get("DATAFILE_TMP_EXTSIZE", default="25M")
|
||||
|
||||
def _test_database_oracle_managed_files(self):
|
||||
return self._test_settings_get("ORACLE_MANAGED_FILES", default=False)
|
||||
|
||||
def _get_test_db_name(self):
|
||||
"""
|
||||
Return the 'production' DB name to get the test DB creation machinery
|
||||
to work. This isn't a great deal in this case because DB names as
|
||||
handled by Django don't have real counterparts in Oracle.
|
||||
"""
|
||||
return self.connection.settings_dict["NAME"]
|
||||
|
||||
def test_db_signature(self):
|
||||
settings_dict = self.connection.settings_dict
|
||||
return (
|
||||
settings_dict["HOST"],
|
||||
settings_dict["PORT"],
|
||||
settings_dict["ENGINE"],
|
||||
settings_dict["NAME"],
|
||||
self._test_database_user(),
|
||||
)
|
||||
|
|
@ -0,0 +1,230 @@
|
|||
from django.db import DatabaseError, InterfaceError
|
||||
from django.db.backends.base.features import BaseDatabaseFeatures
|
||||
from django.db.backends.oracle.oracledb_any import is_oracledb
|
||||
from django.utils.functional import cached_property
|
||||
|
||||
|
||||
class DatabaseFeatures(BaseDatabaseFeatures):
|
||||
minimum_database_version = (19,)
|
||||
# Oracle crashes with "ORA-00932: inconsistent datatypes: expected - got
|
||||
# BLOB" when grouping by LOBs (#24096).
|
||||
allows_group_by_lob = False
|
||||
# Although GROUP BY select index is supported by Oracle 23c+, it requires
|
||||
# GROUP_BY_POSITION_ENABLED to be enabled to avoid backward compatibility
|
||||
# issues. Introspection of this settings is not straightforward.
|
||||
allows_group_by_select_index = False
|
||||
interprets_empty_strings_as_nulls = True
|
||||
has_select_for_update = True
|
||||
has_select_for_update_nowait = True
|
||||
has_select_for_update_skip_locked = True
|
||||
has_select_for_update_of = True
|
||||
select_for_update_of_column = True
|
||||
can_return_columns_from_insert = True
|
||||
supports_subqueries_in_group_by = False
|
||||
ignores_unnecessary_order_by_in_subqueries = False
|
||||
supports_transactions = True
|
||||
supports_timezones = False
|
||||
has_native_duration_field = True
|
||||
can_defer_constraint_checks = True
|
||||
supports_partially_nullable_unique_constraints = False
|
||||
supports_deferrable_unique_constraints = True
|
||||
truncates_names = True
|
||||
supports_comments = True
|
||||
supports_tablespaces = True
|
||||
supports_sequence_reset = False
|
||||
can_introspect_materialized_views = True
|
||||
atomic_transactions = False
|
||||
nulls_order_largest = True
|
||||
requires_literal_defaults = True
|
||||
supports_default_keyword_in_bulk_insert = False
|
||||
closed_cursor_error_class = InterfaceError
|
||||
# Select for update with limit can be achieved on Oracle, but not with the
|
||||
# current backend.
|
||||
supports_select_for_update_with_limit = False
|
||||
supports_temporal_subtraction = True
|
||||
# Oracle doesn't ignore quoted identifiers case but the current backend
|
||||
# does by uppercasing all identifiers.
|
||||
ignores_table_name_case = True
|
||||
supports_index_on_text_field = False
|
||||
create_test_procedure_without_params_sql = """
|
||||
CREATE PROCEDURE "TEST_PROCEDURE" AS
|
||||
V_I INTEGER;
|
||||
BEGIN
|
||||
V_I := 1;
|
||||
END;
|
||||
"""
|
||||
create_test_procedure_with_int_param_sql = """
|
||||
CREATE PROCEDURE "TEST_PROCEDURE" (P_I INTEGER) AS
|
||||
V_I INTEGER;
|
||||
BEGIN
|
||||
V_I := P_I;
|
||||
END;
|
||||
"""
|
||||
create_test_table_with_composite_primary_key = """
|
||||
CREATE TABLE test_table_composite_pk (
|
||||
column_1 NUMBER(11) NOT NULL,
|
||||
column_2 NUMBER(11) NOT NULL,
|
||||
PRIMARY KEY (column_1, column_2)
|
||||
)
|
||||
"""
|
||||
supports_callproc_kwargs = True
|
||||
supports_over_clause = True
|
||||
supports_frame_range_fixed_distance = True
|
||||
supports_ignore_conflicts = False
|
||||
max_query_params = 2**16 - 1
|
||||
supports_partial_indexes = False
|
||||
supports_stored_generated_columns = False
|
||||
supports_virtual_generated_columns = True
|
||||
can_rename_index = True
|
||||
supports_slicing_ordering_in_compound = True
|
||||
requires_compound_order_by_subquery = True
|
||||
allows_multiple_constraints_on_same_fields = False
|
||||
supports_json_field_contains = False
|
||||
supports_collation_on_textfield = False
|
||||
supports_tuple_lookups = False
|
||||
test_now_utc_template = "CURRENT_TIMESTAMP AT TIME ZONE 'UTC'"
|
||||
django_test_expected_failures = {
|
||||
# A bug in Django/oracledb with respect to string handling (#23843).
|
||||
"annotations.tests.NonAggregateAnnotationTestCase.test_custom_functions",
|
||||
"annotations.tests.NonAggregateAnnotationTestCase."
|
||||
"test_custom_functions_can_ref_other_functions",
|
||||
}
|
||||
insert_test_table_with_defaults = (
|
||||
"INSERT INTO {} VALUES (DEFAULT, DEFAULT, DEFAULT)"
|
||||
)
|
||||
|
||||
@cached_property
|
||||
def django_test_skips(self):
|
||||
skips = {
|
||||
"Oracle doesn't support SHA224.": {
|
||||
"db_functions.text.test_sha224.SHA224Tests.test_basic",
|
||||
"db_functions.text.test_sha224.SHA224Tests.test_transform",
|
||||
},
|
||||
"Oracle doesn't correctly calculate ISO 8601 week numbering before "
|
||||
"1583 (the Gregorian calendar was introduced in 1582).": {
|
||||
"db_functions.datetime.test_extract_trunc.DateFunctionTests."
|
||||
"test_trunc_week_before_1000",
|
||||
"db_functions.datetime.test_extract_trunc."
|
||||
"DateFunctionWithTimeZoneTests.test_trunc_week_before_1000",
|
||||
},
|
||||
"Oracle doesn't support bitwise XOR.": {
|
||||
"expressions.tests.ExpressionOperatorTests.test_lefthand_bitwise_xor",
|
||||
"expressions.tests.ExpressionOperatorTests."
|
||||
"test_lefthand_bitwise_xor_null",
|
||||
"expressions.tests.ExpressionOperatorTests."
|
||||
"test_lefthand_bitwise_xor_right_null",
|
||||
},
|
||||
"Oracle requires ORDER BY in row_number, ANSI:SQL doesn't.": {
|
||||
"expressions_window.tests.WindowFunctionTests."
|
||||
"test_row_number_no_ordering",
|
||||
"prefetch_related.tests.PrefetchLimitTests.test_empty_order",
|
||||
},
|
||||
"Oracle doesn't support changing collations on indexed columns (#33671).": {
|
||||
"migrations.test_operations.OperationTests."
|
||||
"test_alter_field_pk_fk_db_collation",
|
||||
},
|
||||
"Oracle doesn't support comparing NCLOB to NUMBER.": {
|
||||
"generic_relations_regress.tests.GenericRelationTests."
|
||||
"test_textlink_filter",
|
||||
},
|
||||
"Oracle doesn't support casting filters to NUMBER.": {
|
||||
"lookup.tests.LookupQueryingTests.test_aggregate_combined_lookup",
|
||||
},
|
||||
}
|
||||
if self.connection.oracle_version < (23,):
|
||||
skips.update(
|
||||
{
|
||||
"Raises ORA-00600 on Oracle < 23c: internal error code.": {
|
||||
"model_fields.test_jsonfield.TestQuerying."
|
||||
"test_usage_in_subquery",
|
||||
},
|
||||
}
|
||||
)
|
||||
if self.connection.is_pool:
|
||||
skips.update(
|
||||
{
|
||||
"Pooling does not support persistent connections": {
|
||||
"backends.base.test_base.ConnectionHealthChecksTests."
|
||||
"test_health_checks_enabled",
|
||||
"backends.base.test_base.ConnectionHealthChecksTests."
|
||||
"test_health_checks_enabled_errors_occurred",
|
||||
"backends.base.test_base.ConnectionHealthChecksTests."
|
||||
"test_health_checks_disabled",
|
||||
"backends.base.test_base.ConnectionHealthChecksTests."
|
||||
"test_set_autocommit_health_checks_enabled",
|
||||
"servers.tests.LiveServerTestCloseConnectionTest."
|
||||
"test_closes_connections",
|
||||
"backends.oracle.tests.TransactionalTests."
|
||||
"test_password_with_at_sign",
|
||||
},
|
||||
}
|
||||
)
|
||||
if is_oracledb and self.connection.oracledb_version >= (2, 1, 2):
|
||||
skips.update(
|
||||
{
|
||||
"python-oracledb 2.1.2+ no longer hides 'ORA-1403: no data found' "
|
||||
"exceptions raised in database triggers.": {
|
||||
"backends.oracle.tests.TransactionalTests."
|
||||
"test_hidden_no_data_found_exception"
|
||||
},
|
||||
},
|
||||
)
|
||||
return skips
|
||||
|
||||
@cached_property
|
||||
def introspected_field_types(self):
|
||||
return {
|
||||
**super().introspected_field_types,
|
||||
"GenericIPAddressField": "CharField",
|
||||
"PositiveBigIntegerField": "BigIntegerField",
|
||||
"PositiveIntegerField": "IntegerField",
|
||||
"PositiveSmallIntegerField": "IntegerField",
|
||||
"SmallIntegerField": "IntegerField",
|
||||
"TimeField": "DateTimeField",
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def test_collations(self):
|
||||
return {
|
||||
"ci": "BINARY_CI",
|
||||
"cs": "BINARY",
|
||||
"non_default": "SWEDISH_CI",
|
||||
"swedish_ci": "SWEDISH_CI",
|
||||
"virtual": "SWEDISH_CI" if self.supports_collation_on_charfield else None,
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def supports_collation_on_charfield(self):
|
||||
sql = "SELECT CAST('a' AS VARCHAR2(4001))" + self.bare_select_suffix
|
||||
with self.connection.cursor() as cursor:
|
||||
try:
|
||||
cursor.execute(sql)
|
||||
except DatabaseError as e:
|
||||
if e.args[0].code == 910:
|
||||
return False
|
||||
raise
|
||||
return True
|
||||
|
||||
@cached_property
|
||||
def supports_primitives_in_json_field(self):
|
||||
return self.connection.oracle_version >= (21,)
|
||||
|
||||
@cached_property
|
||||
def supports_frame_exclusion(self):
|
||||
return self.connection.oracle_version >= (21,)
|
||||
|
||||
@cached_property
|
||||
def supports_boolean_expr_in_select_clause(self):
|
||||
return self.connection.oracle_version >= (23,)
|
||||
|
||||
@cached_property
|
||||
def supports_comparing_boolean_expr(self):
|
||||
return self.connection.oracle_version >= (23,)
|
||||
|
||||
@cached_property
|
||||
def supports_aggregation_over_interval_types(self):
|
||||
return self.connection.oracle_version >= (23,)
|
||||
|
||||
@cached_property
|
||||
def bare_select_suffix(self):
|
||||
return "" if self.connection.oracle_version >= (23,) else " FROM DUAL"
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
from django.db.models import DecimalField, DurationField, Func
|
||||
|
||||
|
||||
class IntervalToSeconds(Func):
|
||||
function = ""
|
||||
template = """
|
||||
EXTRACT(day from %(expressions)s) * 86400 +
|
||||
EXTRACT(hour from %(expressions)s) * 3600 +
|
||||
EXTRACT(minute from %(expressions)s) * 60 +
|
||||
EXTRACT(second from %(expressions)s)
|
||||
"""
|
||||
|
||||
def __init__(self, expression, *, output_field=None, **extra):
|
||||
super().__init__(
|
||||
expression, output_field=output_field or DecimalField(), **extra
|
||||
)
|
||||
|
||||
|
||||
class SecondsToInterval(Func):
|
||||
function = "NUMTODSINTERVAL"
|
||||
template = "%(function)s(%(expressions)s, 'SECOND')"
|
||||
|
||||
def __init__(self, expression, *, output_field=None, **extra):
|
||||
super().__init__(
|
||||
expression, output_field=output_field or DurationField(), **extra
|
||||
)
|
||||
|
|
@ -0,0 +1,414 @@
|
|||
from collections import namedtuple
|
||||
|
||||
from django.db import models
|
||||
from django.db.backends.base.introspection import BaseDatabaseIntrospection
|
||||
from django.db.backends.base.introspection import FieldInfo as BaseFieldInfo
|
||||
from django.db.backends.base.introspection import TableInfo as BaseTableInfo
|
||||
from django.db.backends.oracle.oracledb_any import oracledb
|
||||
|
||||
FieldInfo = namedtuple(
|
||||
"FieldInfo", BaseFieldInfo._fields + ("is_autofield", "is_json", "comment")
|
||||
)
|
||||
TableInfo = namedtuple("TableInfo", BaseTableInfo._fields + ("comment",))
|
||||
|
||||
|
||||
class DatabaseIntrospection(BaseDatabaseIntrospection):
|
||||
cache_bust_counter = 1
|
||||
|
||||
# Maps type objects to Django Field types.
|
||||
data_types_reverse = {
|
||||
oracledb.DB_TYPE_DATE: "DateField",
|
||||
oracledb.DB_TYPE_BINARY_DOUBLE: "FloatField",
|
||||
oracledb.DB_TYPE_BLOB: "BinaryField",
|
||||
oracledb.DB_TYPE_CHAR: "CharField",
|
||||
oracledb.DB_TYPE_CLOB: "TextField",
|
||||
oracledb.DB_TYPE_INTERVAL_DS: "DurationField",
|
||||
oracledb.DB_TYPE_NCHAR: "CharField",
|
||||
oracledb.DB_TYPE_NCLOB: "TextField",
|
||||
oracledb.DB_TYPE_NVARCHAR: "CharField",
|
||||
oracledb.DB_TYPE_NUMBER: "DecimalField",
|
||||
oracledb.DB_TYPE_TIMESTAMP: "DateTimeField",
|
||||
oracledb.DB_TYPE_VARCHAR: "CharField",
|
||||
}
|
||||
|
||||
def get_field_type(self, data_type, description):
|
||||
if data_type == oracledb.NUMBER:
|
||||
precision, scale = description[4:6]
|
||||
if scale == 0:
|
||||
if precision > 11:
|
||||
return (
|
||||
"BigAutoField"
|
||||
if description.is_autofield
|
||||
else "BigIntegerField"
|
||||
)
|
||||
elif 1 < precision < 6 and description.is_autofield:
|
||||
return "SmallAutoField"
|
||||
elif precision == 1:
|
||||
return "BooleanField"
|
||||
elif description.is_autofield:
|
||||
return "AutoField"
|
||||
else:
|
||||
return "IntegerField"
|
||||
elif scale == -127:
|
||||
return "FloatField"
|
||||
elif data_type == oracledb.NCLOB and description.is_json:
|
||||
return "JSONField"
|
||||
|
||||
return super().get_field_type(data_type, description)
|
||||
|
||||
def get_table_list(self, cursor):
|
||||
"""Return a list of table and view names in the current database."""
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT
|
||||
user_tables.table_name,
|
||||
't',
|
||||
user_tab_comments.comments
|
||||
FROM user_tables
|
||||
LEFT OUTER JOIN
|
||||
user_tab_comments
|
||||
ON user_tab_comments.table_name = user_tables.table_name
|
||||
WHERE
|
||||
NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM user_mviews
|
||||
WHERE user_mviews.mview_name = user_tables.table_name
|
||||
)
|
||||
UNION ALL
|
||||
SELECT view_name, 'v', NULL FROM user_views
|
||||
UNION ALL
|
||||
SELECT mview_name, 'v', NULL FROM user_mviews
|
||||
"""
|
||||
)
|
||||
return [
|
||||
TableInfo(self.identifier_converter(row[0]), row[1], row[2])
|
||||
for row in cursor.fetchall()
|
||||
]
|
||||
|
||||
def get_table_description(self, cursor, table_name):
|
||||
"""
|
||||
Return a description of the table with the DB-API cursor.description
|
||||
interface.
|
||||
"""
|
||||
# A default collation for the given table/view/materialized view.
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT user_tables.default_collation
|
||||
FROM user_tables
|
||||
WHERE
|
||||
user_tables.table_name = UPPER(%s) AND
|
||||
NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM user_mviews
|
||||
WHERE user_mviews.mview_name = user_tables.table_name
|
||||
)
|
||||
UNION ALL
|
||||
SELECT user_views.default_collation
|
||||
FROM user_views
|
||||
WHERE user_views.view_name = UPPER(%s)
|
||||
UNION ALL
|
||||
SELECT user_mviews.default_collation
|
||||
FROM user_mviews
|
||||
WHERE user_mviews.mview_name = UPPER(%s)
|
||||
""",
|
||||
[table_name, table_name, table_name],
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
default_table_collation = row[0] if row else ""
|
||||
# user_tab_columns gives data default for columns
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT
|
||||
user_tab_cols.column_name,
|
||||
user_tab_cols.data_default,
|
||||
CASE
|
||||
WHEN user_tab_cols.collation = %s
|
||||
THEN NULL
|
||||
ELSE user_tab_cols.collation
|
||||
END collation,
|
||||
CASE
|
||||
WHEN user_tab_cols.char_used IS NULL
|
||||
THEN user_tab_cols.data_length
|
||||
ELSE user_tab_cols.char_length
|
||||
END as display_size,
|
||||
CASE
|
||||
WHEN user_tab_cols.identity_column = 'YES' THEN 1
|
||||
ELSE 0
|
||||
END as is_autofield,
|
||||
CASE
|
||||
WHEN EXISTS (
|
||||
SELECT 1
|
||||
FROM user_json_columns
|
||||
WHERE
|
||||
user_json_columns.table_name = user_tab_cols.table_name AND
|
||||
user_json_columns.column_name = user_tab_cols.column_name
|
||||
)
|
||||
THEN 1
|
||||
ELSE 0
|
||||
END as is_json,
|
||||
user_col_comments.comments as col_comment
|
||||
FROM user_tab_cols
|
||||
LEFT OUTER JOIN
|
||||
user_col_comments ON
|
||||
user_col_comments.column_name = user_tab_cols.column_name AND
|
||||
user_col_comments.table_name = user_tab_cols.table_name
|
||||
WHERE user_tab_cols.table_name = UPPER(%s)
|
||||
""",
|
||||
[default_table_collation, table_name],
|
||||
)
|
||||
field_map = {
|
||||
column: (
|
||||
display_size,
|
||||
default.rstrip() if default and default != "NULL" else None,
|
||||
collation,
|
||||
is_autofield,
|
||||
is_json,
|
||||
comment,
|
||||
)
|
||||
for (
|
||||
column,
|
||||
default,
|
||||
collation,
|
||||
display_size,
|
||||
is_autofield,
|
||||
is_json,
|
||||
comment,
|
||||
) in cursor.fetchall()
|
||||
}
|
||||
self.cache_bust_counter += 1
|
||||
cursor.execute(
|
||||
"SELECT * FROM {} WHERE ROWNUM < 2 AND {} > 0".format(
|
||||
self.connection.ops.quote_name(table_name), self.cache_bust_counter
|
||||
)
|
||||
)
|
||||
description = []
|
||||
for desc in cursor.description:
|
||||
name = desc[0]
|
||||
(
|
||||
display_size,
|
||||
default,
|
||||
collation,
|
||||
is_autofield,
|
||||
is_json,
|
||||
comment,
|
||||
) = field_map[name]
|
||||
name %= {} # oracledb, for some reason, doubles percent signs.
|
||||
description.append(
|
||||
FieldInfo(
|
||||
self.identifier_converter(name),
|
||||
desc[1],
|
||||
display_size,
|
||||
desc[3],
|
||||
desc[4] or 0,
|
||||
desc[5] or 0,
|
||||
*desc[6:],
|
||||
default,
|
||||
collation,
|
||||
is_autofield,
|
||||
is_json,
|
||||
comment,
|
||||
)
|
||||
)
|
||||
return description
|
||||
|
||||
def identifier_converter(self, name):
|
||||
"""Identifier comparison is case insensitive under Oracle."""
|
||||
return name.lower()
|
||||
|
||||
def get_sequences(self, cursor, table_name, table_fields=()):
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT
|
||||
user_tab_identity_cols.sequence_name,
|
||||
user_tab_identity_cols.column_name
|
||||
FROM
|
||||
user_tab_identity_cols,
|
||||
user_constraints,
|
||||
user_cons_columns cols
|
||||
WHERE
|
||||
user_constraints.constraint_name = cols.constraint_name
|
||||
AND user_constraints.table_name = user_tab_identity_cols.table_name
|
||||
AND cols.column_name = user_tab_identity_cols.column_name
|
||||
AND user_constraints.constraint_type = 'P'
|
||||
AND user_tab_identity_cols.table_name = UPPER(%s)
|
||||
""",
|
||||
[table_name],
|
||||
)
|
||||
# Oracle allows only one identity column per table.
|
||||
row = cursor.fetchone()
|
||||
if row:
|
||||
return [
|
||||
{
|
||||
"name": self.identifier_converter(row[0]),
|
||||
"table": self.identifier_converter(table_name),
|
||||
"column": self.identifier_converter(row[1]),
|
||||
}
|
||||
]
|
||||
# To keep backward compatibility for AutoFields that aren't Oracle
|
||||
# identity columns.
|
||||
for f in table_fields:
|
||||
if isinstance(f, models.AutoField):
|
||||
return [{"table": table_name, "column": f.column}]
|
||||
return []
|
||||
|
||||
def get_relations(self, cursor, table_name):
|
||||
"""
|
||||
Return a dictionary of {field_name: (field_name_other_table, other_table)}
|
||||
representing all foreign keys in the given table.
|
||||
"""
|
||||
table_name = table_name.upper()
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT ca.column_name, cb.table_name, cb.column_name
|
||||
FROM user_constraints, USER_CONS_COLUMNS ca, USER_CONS_COLUMNS cb
|
||||
WHERE user_constraints.table_name = %s AND
|
||||
user_constraints.constraint_name = ca.constraint_name AND
|
||||
user_constraints.r_constraint_name = cb.constraint_name AND
|
||||
ca.position = cb.position""",
|
||||
[table_name],
|
||||
)
|
||||
|
||||
return {
|
||||
self.identifier_converter(field_name): (
|
||||
self.identifier_converter(rel_field_name),
|
||||
self.identifier_converter(rel_table_name),
|
||||
)
|
||||
for field_name, rel_table_name, rel_field_name in cursor.fetchall()
|
||||
}
|
||||
|
||||
def get_primary_key_columns(self, cursor, table_name):
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT
|
||||
cols.column_name
|
||||
FROM
|
||||
user_constraints,
|
||||
user_cons_columns cols
|
||||
WHERE
|
||||
user_constraints.constraint_name = cols.constraint_name AND
|
||||
user_constraints.constraint_type = 'P' AND
|
||||
user_constraints.table_name = UPPER(%s)
|
||||
ORDER BY
|
||||
cols.position
|
||||
""",
|
||||
[table_name],
|
||||
)
|
||||
return [self.identifier_converter(row[0]) for row in cursor.fetchall()]
|
||||
|
||||
def get_constraints(self, cursor, table_name):
|
||||
"""
|
||||
Retrieve any constraints or keys (unique, pk, fk, check, index) across
|
||||
one or more columns.
|
||||
"""
|
||||
constraints = {}
|
||||
# Loop over the constraints, getting PKs, uniques, and checks
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT
|
||||
user_constraints.constraint_name,
|
||||
LISTAGG(LOWER(cols.column_name), ',')
|
||||
WITHIN GROUP (ORDER BY cols.position),
|
||||
CASE user_constraints.constraint_type
|
||||
WHEN 'P' THEN 1
|
||||
ELSE 0
|
||||
END AS is_primary_key,
|
||||
CASE
|
||||
WHEN user_constraints.constraint_type IN ('P', 'U') THEN 1
|
||||
ELSE 0
|
||||
END AS is_unique,
|
||||
CASE user_constraints.constraint_type
|
||||
WHEN 'C' THEN 1
|
||||
ELSE 0
|
||||
END AS is_check_constraint
|
||||
FROM
|
||||
user_constraints
|
||||
LEFT OUTER JOIN
|
||||
user_cons_columns cols
|
||||
ON user_constraints.constraint_name = cols.constraint_name
|
||||
WHERE
|
||||
user_constraints.constraint_type = ANY('P', 'U', 'C')
|
||||
AND user_constraints.table_name = UPPER(%s)
|
||||
GROUP BY user_constraints.constraint_name, user_constraints.constraint_type
|
||||
""",
|
||||
[table_name],
|
||||
)
|
||||
for constraint, columns, pk, unique, check in cursor.fetchall():
|
||||
constraint = self.identifier_converter(constraint)
|
||||
constraints[constraint] = {
|
||||
"columns": columns.split(","),
|
||||
"primary_key": pk,
|
||||
"unique": unique,
|
||||
"foreign_key": None,
|
||||
"check": check,
|
||||
"index": unique, # All uniques come with an index
|
||||
}
|
||||
# Foreign key constraints
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT
|
||||
cons.constraint_name,
|
||||
LISTAGG(LOWER(cols.column_name), ',')
|
||||
WITHIN GROUP (ORDER BY cols.position),
|
||||
LOWER(rcols.table_name),
|
||||
LOWER(rcols.column_name)
|
||||
FROM
|
||||
user_constraints cons
|
||||
INNER JOIN
|
||||
user_cons_columns rcols
|
||||
ON rcols.constraint_name = cons.r_constraint_name AND rcols.position = 1
|
||||
LEFT OUTER JOIN
|
||||
user_cons_columns cols
|
||||
ON cons.constraint_name = cols.constraint_name
|
||||
WHERE
|
||||
cons.constraint_type = 'R' AND
|
||||
cons.table_name = UPPER(%s)
|
||||
GROUP BY cons.constraint_name, rcols.table_name, rcols.column_name
|
||||
""",
|
||||
[table_name],
|
||||
)
|
||||
for constraint, columns, other_table, other_column in cursor.fetchall():
|
||||
constraint = self.identifier_converter(constraint)
|
||||
constraints[constraint] = {
|
||||
"primary_key": False,
|
||||
"unique": False,
|
||||
"foreign_key": (other_table, other_column),
|
||||
"check": False,
|
||||
"index": False,
|
||||
"columns": columns.split(","),
|
||||
}
|
||||
# Now get indexes
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT
|
||||
ind.index_name,
|
||||
LOWER(ind.index_type),
|
||||
LOWER(ind.uniqueness),
|
||||
LISTAGG(LOWER(cols.column_name), ',')
|
||||
WITHIN GROUP (ORDER BY cols.column_position),
|
||||
LISTAGG(cols.descend, ',') WITHIN GROUP (ORDER BY cols.column_position)
|
||||
FROM
|
||||
user_ind_columns cols, user_indexes ind
|
||||
WHERE
|
||||
cols.table_name = UPPER(%s) AND
|
||||
NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM user_constraints cons
|
||||
WHERE ind.index_name = cons.index_name
|
||||
) AND cols.index_name = ind.index_name
|
||||
GROUP BY ind.index_name, ind.index_type, ind.uniqueness
|
||||
""",
|
||||
[table_name],
|
||||
)
|
||||
for constraint, type_, unique, columns, orders in cursor.fetchall():
|
||||
constraint = self.identifier_converter(constraint)
|
||||
constraints[constraint] = {
|
||||
"primary_key": False,
|
||||
"unique": unique == "unique",
|
||||
"foreign_key": None,
|
||||
"check": False,
|
||||
"index": True,
|
||||
"type": "idx" if type_ == "normal" else type_,
|
||||
"columns": columns.split(","),
|
||||
"orders": orders.split(","),
|
||||
}
|
||||
return constraints
|
||||
|
|
@ -0,0 +1,741 @@
|
|||
import datetime
|
||||
import uuid
|
||||
from functools import lru_cache
|
||||
from itertools import chain
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import DatabaseError, NotSupportedError
|
||||
from django.db.backends.base.operations import BaseDatabaseOperations
|
||||
from django.db.backends.utils import split_tzname_delta, strip_quotes, truncate_name
|
||||
from django.db.models import (
|
||||
AutoField,
|
||||
CompositePrimaryKey,
|
||||
Exists,
|
||||
ExpressionWrapper,
|
||||
Lookup,
|
||||
)
|
||||
from django.db.models.expressions import RawSQL
|
||||
from django.db.models.sql.where import WhereNode
|
||||
from django.utils import timezone
|
||||
from django.utils.encoding import force_bytes, force_str
|
||||
from django.utils.functional import cached_property
|
||||
from django.utils.regex_helper import _lazy_re_compile
|
||||
|
||||
from .base import Database
|
||||
from .utils import BulkInsertMapper, InsertVar, Oracle_datetime
|
||||
|
||||
|
||||
class DatabaseOperations(BaseDatabaseOperations):
|
||||
# Oracle uses NUMBER(5), NUMBER(11), and NUMBER(19) for integer fields.
|
||||
# SmallIntegerField uses NUMBER(11) instead of NUMBER(5), which is used by
|
||||
# SmallAutoField, to preserve backward compatibility.
|
||||
integer_field_ranges = {
|
||||
"SmallIntegerField": (-99999999999, 99999999999),
|
||||
"IntegerField": (-99999999999, 99999999999),
|
||||
"BigIntegerField": (-9999999999999999999, 9999999999999999999),
|
||||
"PositiveBigIntegerField": (0, 9999999999999999999),
|
||||
"PositiveSmallIntegerField": (0, 99999999999),
|
||||
"PositiveIntegerField": (0, 99999999999),
|
||||
"SmallAutoField": (-99999, 99999),
|
||||
"AutoField": (-99999999999, 99999999999),
|
||||
"BigAutoField": (-9999999999999999999, 9999999999999999999),
|
||||
}
|
||||
set_operators = {**BaseDatabaseOperations.set_operators, "difference": "MINUS"}
|
||||
|
||||
# TODO: colorize this SQL code with style.SQL_KEYWORD(), etc.
|
||||
_sequence_reset_sql = """
|
||||
DECLARE
|
||||
table_value integer;
|
||||
seq_value integer;
|
||||
seq_name user_tab_identity_cols.sequence_name%%TYPE;
|
||||
BEGIN
|
||||
BEGIN
|
||||
SELECT sequence_name INTO seq_name FROM user_tab_identity_cols
|
||||
WHERE table_name = '%(table_name)s' AND
|
||||
column_name = '%(column_name)s';
|
||||
EXCEPTION WHEN NO_DATA_FOUND THEN
|
||||
seq_name := '%(no_autofield_sequence_name)s';
|
||||
END;
|
||||
|
||||
SELECT NVL(MAX(%(column)s), 0) INTO table_value FROM %(table)s;
|
||||
SELECT NVL(last_number - cache_size, 0) INTO seq_value FROM user_sequences
|
||||
WHERE sequence_name = seq_name;
|
||||
WHILE table_value > seq_value LOOP
|
||||
EXECUTE IMMEDIATE 'SELECT "'||seq_name||'".nextval%(suffix)s'
|
||||
INTO seq_value;
|
||||
END LOOP;
|
||||
END;
|
||||
/"""
|
||||
|
||||
# Oracle doesn't support string without precision; use the max string size.
|
||||
cast_char_field_without_max_length = "NVARCHAR2(2000)"
|
||||
cast_data_types = {
|
||||
"AutoField": "NUMBER(11)",
|
||||
"BigAutoField": "NUMBER(19)",
|
||||
"SmallAutoField": "NUMBER(5)",
|
||||
"TextField": cast_char_field_without_max_length,
|
||||
}
|
||||
|
||||
def cache_key_culling_sql(self):
|
||||
cache_key = self.quote_name("cache_key")
|
||||
return (
|
||||
f"SELECT {cache_key} "
|
||||
f"FROM %s "
|
||||
f"ORDER BY {cache_key} OFFSET %%s ROWS FETCH FIRST 1 ROWS ONLY"
|
||||
)
|
||||
|
||||
# EXTRACT format cannot be passed in parameters.
|
||||
_extract_format_re = _lazy_re_compile(r"[A-Z_]+")
|
||||
|
||||
def date_extract_sql(self, lookup_type, sql, params):
|
||||
extract_sql = f"TO_CHAR({sql}, %s)"
|
||||
extract_param = None
|
||||
if lookup_type == "week_day":
|
||||
# TO_CHAR(field, 'D') returns an integer from 1-7, where 1=Sunday.
|
||||
extract_param = "D"
|
||||
elif lookup_type == "iso_week_day":
|
||||
extract_sql = f"TO_CHAR({sql} - 1, %s)"
|
||||
extract_param = "D"
|
||||
elif lookup_type == "week":
|
||||
# IW = ISO week number
|
||||
extract_param = "IW"
|
||||
elif lookup_type == "quarter":
|
||||
extract_param = "Q"
|
||||
elif lookup_type == "iso_year":
|
||||
extract_param = "IYYY"
|
||||
else:
|
||||
lookup_type = lookup_type.upper()
|
||||
if not self._extract_format_re.fullmatch(lookup_type):
|
||||
raise ValueError(f"Invalid loookup type: {lookup_type!r}")
|
||||
# https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/EXTRACT-datetime.html
|
||||
return f"EXTRACT({lookup_type} FROM {sql})", params
|
||||
return extract_sql, (*params, extract_param)
|
||||
|
||||
def date_trunc_sql(self, lookup_type, sql, params, tzname=None):
|
||||
sql, params = self._convert_sql_to_tz(sql, params, tzname)
|
||||
# https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/ROUND-and-TRUNC-Date-Functions.html
|
||||
trunc_param = None
|
||||
if lookup_type in ("year", "month"):
|
||||
trunc_param = lookup_type.upper()
|
||||
elif lookup_type == "quarter":
|
||||
trunc_param = "Q"
|
||||
elif lookup_type == "week":
|
||||
trunc_param = "IW"
|
||||
else:
|
||||
return f"TRUNC({sql})", params
|
||||
return f"TRUNC({sql}, %s)", (*params, trunc_param)
|
||||
|
||||
# Oracle crashes with "ORA-03113: end-of-file on communication channel"
|
||||
# if the time zone name is passed in parameter. Use interpolation instead.
|
||||
# https://groups.google.com/forum/#!msg/django-developers/zwQju7hbG78/9l934yelwfsJ
|
||||
# This regexp matches all time zone names from the zoneinfo database.
|
||||
_tzname_re = _lazy_re_compile(r"^[\w/:+-]+$")
|
||||
|
||||
def _prepare_tzname_delta(self, tzname):
|
||||
tzname, sign, offset = split_tzname_delta(tzname)
|
||||
return f"{sign}{offset}" if offset else tzname
|
||||
|
||||
def _convert_sql_to_tz(self, sql, params, tzname):
|
||||
if not (settings.USE_TZ and tzname):
|
||||
return sql, params
|
||||
if not self._tzname_re.match(tzname):
|
||||
raise ValueError("Invalid time zone name: %s" % tzname)
|
||||
# Convert from connection timezone to the local time, returning
|
||||
# TIMESTAMP WITH TIME ZONE and cast it back to TIMESTAMP to strip the
|
||||
# TIME ZONE details.
|
||||
if self.connection.timezone_name != tzname:
|
||||
from_timezone_name = self.connection.timezone_name
|
||||
to_timezone_name = self._prepare_tzname_delta(tzname)
|
||||
return (
|
||||
f"CAST((FROM_TZ({sql}, '{from_timezone_name}') AT TIME ZONE "
|
||||
f"'{to_timezone_name}') AS TIMESTAMP)",
|
||||
params,
|
||||
)
|
||||
return sql, params
|
||||
|
||||
def datetime_cast_date_sql(self, sql, params, tzname):
|
||||
sql, params = self._convert_sql_to_tz(sql, params, tzname)
|
||||
return f"TRUNC({sql})", params
|
||||
|
||||
def datetime_cast_time_sql(self, sql, params, tzname):
|
||||
# Since `TimeField` values are stored as TIMESTAMP change to the
|
||||
# default date and convert the field to the specified timezone.
|
||||
sql, params = self._convert_sql_to_tz(sql, params, tzname)
|
||||
convert_datetime_sql = (
|
||||
f"TO_TIMESTAMP(CONCAT('1900-01-01 ', TO_CHAR({sql}, 'HH24:MI:SS.FF')), "
|
||||
f"'YYYY-MM-DD HH24:MI:SS.FF')"
|
||||
)
|
||||
return (
|
||||
f"CASE WHEN {sql} IS NOT NULL THEN {convert_datetime_sql} ELSE NULL END",
|
||||
(*params, *params),
|
||||
)
|
||||
|
||||
def datetime_extract_sql(self, lookup_type, sql, params, tzname):
|
||||
sql, params = self._convert_sql_to_tz(sql, params, tzname)
|
||||
if lookup_type == "second":
|
||||
# Truncate fractional seconds.
|
||||
return f"FLOOR(EXTRACT(SECOND FROM {sql}))", params
|
||||
return self.date_extract_sql(lookup_type, sql, params)
|
||||
|
||||
def datetime_trunc_sql(self, lookup_type, sql, params, tzname):
|
||||
sql, params = self._convert_sql_to_tz(sql, params, tzname)
|
||||
# https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/ROUND-and-TRUNC-Date-Functions.html
|
||||
trunc_param = None
|
||||
if lookup_type in ("year", "month"):
|
||||
trunc_param = lookup_type.upper()
|
||||
elif lookup_type == "quarter":
|
||||
trunc_param = "Q"
|
||||
elif lookup_type == "week":
|
||||
trunc_param = "IW"
|
||||
elif lookup_type == "hour":
|
||||
trunc_param = "HH24"
|
||||
elif lookup_type == "minute":
|
||||
trunc_param = "MI"
|
||||
elif lookup_type == "day":
|
||||
return f"TRUNC({sql})", params
|
||||
else:
|
||||
# Cast to DATE removes sub-second precision.
|
||||
return f"CAST({sql} AS DATE)", params
|
||||
return f"TRUNC({sql}, %s)", (*params, trunc_param)
|
||||
|
||||
def time_extract_sql(self, lookup_type, sql, params):
|
||||
if lookup_type == "second":
|
||||
# Truncate fractional seconds.
|
||||
return f"FLOOR(EXTRACT(SECOND FROM {sql}))", params
|
||||
return self.date_extract_sql(lookup_type, sql, params)
|
||||
|
||||
def time_trunc_sql(self, lookup_type, sql, params, tzname=None):
|
||||
# The implementation is similar to `datetime_trunc_sql` as both
|
||||
# `DateTimeField` and `TimeField` are stored as TIMESTAMP where
|
||||
# the date part of the later is ignored.
|
||||
sql, params = self._convert_sql_to_tz(sql, params, tzname)
|
||||
trunc_param = None
|
||||
if lookup_type == "hour":
|
||||
trunc_param = "HH24"
|
||||
elif lookup_type == "minute":
|
||||
trunc_param = "MI"
|
||||
elif lookup_type == "second":
|
||||
# Cast to DATE removes sub-second precision.
|
||||
return f"CAST({sql} AS DATE)", params
|
||||
return f"TRUNC({sql}, %s)", (*params, trunc_param)
|
||||
|
||||
def get_db_converters(self, expression):
|
||||
converters = super().get_db_converters(expression)
|
||||
internal_type = expression.output_field.get_internal_type()
|
||||
if internal_type in ["JSONField", "TextField"]:
|
||||
converters.append(self.convert_textfield_value)
|
||||
elif internal_type == "BinaryField":
|
||||
converters.append(self.convert_binaryfield_value)
|
||||
elif internal_type == "BooleanField":
|
||||
converters.append(self.convert_booleanfield_value)
|
||||
elif internal_type == "DateTimeField":
|
||||
if settings.USE_TZ:
|
||||
converters.append(self.convert_datetimefield_value)
|
||||
elif internal_type == "DateField":
|
||||
converters.append(self.convert_datefield_value)
|
||||
elif internal_type == "TimeField":
|
||||
converters.append(self.convert_timefield_value)
|
||||
elif internal_type == "UUIDField":
|
||||
converters.append(self.convert_uuidfield_value)
|
||||
# Oracle stores empty strings as null. If the field accepts the empty
|
||||
# string, undo this to adhere to the Django convention of using
|
||||
# the empty string instead of null.
|
||||
if expression.output_field.empty_strings_allowed:
|
||||
converters.append(
|
||||
self.convert_empty_bytes
|
||||
if internal_type == "BinaryField"
|
||||
else self.convert_empty_string
|
||||
)
|
||||
return converters
|
||||
|
||||
def convert_textfield_value(self, value, expression, connection):
|
||||
if isinstance(value, Database.LOB):
|
||||
value = value.read()
|
||||
return value
|
||||
|
||||
def convert_binaryfield_value(self, value, expression, connection):
|
||||
if isinstance(value, Database.LOB):
|
||||
value = force_bytes(value.read())
|
||||
return value
|
||||
|
||||
def convert_booleanfield_value(self, value, expression, connection):
|
||||
if value in (0, 1):
|
||||
value = bool(value)
|
||||
return value
|
||||
|
||||
# oracledb always returns datetime.datetime objects for
|
||||
# DATE and TIMESTAMP columns, but Django wants to see a
|
||||
# python datetime.date, .time, or .datetime.
|
||||
|
||||
def convert_datetimefield_value(self, value, expression, connection):
|
||||
if value is not None:
|
||||
value = timezone.make_aware(value, self.connection.timezone)
|
||||
return value
|
||||
|
||||
def convert_datefield_value(self, value, expression, connection):
|
||||
if isinstance(value, Database.Timestamp):
|
||||
value = value.date()
|
||||
return value
|
||||
|
||||
def convert_timefield_value(self, value, expression, connection):
|
||||
if isinstance(value, Database.Timestamp):
|
||||
value = value.time()
|
||||
return value
|
||||
|
||||
def convert_uuidfield_value(self, value, expression, connection):
|
||||
if value is not None:
|
||||
value = uuid.UUID(value)
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
def convert_empty_string(value, expression, connection):
|
||||
return "" if value is None else value
|
||||
|
||||
@staticmethod
|
||||
def convert_empty_bytes(value, expression, connection):
|
||||
return b"" if value is None else value
|
||||
|
||||
def deferrable_sql(self):
|
||||
return " DEFERRABLE INITIALLY DEFERRED"
|
||||
|
||||
def fetch_returned_insert_columns(self, cursor, returning_params):
|
||||
columns = []
|
||||
for param in returning_params:
|
||||
value = param.get_value()
|
||||
# Can be removed when cx_Oracle is no longer supported and
|
||||
# python-oracle 2.1.2 becomes the minimum supported version.
|
||||
if value == []:
|
||||
raise DatabaseError(
|
||||
"The database did not return a new row id. Probably "
|
||||
'"ORA-1403: no data found" was raised internally but was '
|
||||
"hidden by the Oracle OCI library (see "
|
||||
"https://code.djangoproject.com/ticket/28859)."
|
||||
)
|
||||
columns.append(value[0])
|
||||
return tuple(columns)
|
||||
|
||||
def no_limit_value(self):
|
||||
return None
|
||||
|
||||
def limit_offset_sql(self, low_mark, high_mark):
|
||||
fetch, offset = self._get_limit_offset_params(low_mark, high_mark)
|
||||
return " ".join(
|
||||
sql
|
||||
for sql in (
|
||||
("OFFSET %d ROWS" % offset) if offset else None,
|
||||
("FETCH FIRST %d ROWS ONLY" % fetch) if fetch else None,
|
||||
)
|
||||
if sql
|
||||
)
|
||||
|
||||
def last_executed_query(self, cursor, sql, params):
|
||||
# https://python-oracledb.readthedocs.io/en/latest/api_manual/cursor.html#Cursor.statement
|
||||
# The DB API definition does not define this attribute.
|
||||
statement = cursor.statement
|
||||
# Unlike Psycopg's `query` and MySQLdb`'s `_executed`, oracledb's
|
||||
# `statement` doesn't contain the query parameters. Substitute
|
||||
# parameters manually.
|
||||
if statement and params:
|
||||
if isinstance(params, (tuple, list)):
|
||||
params = {
|
||||
f":arg{i}": param for i, param in enumerate(dict.fromkeys(params))
|
||||
}
|
||||
elif isinstance(params, dict):
|
||||
params = {f":{key}": val for (key, val) in params.items()}
|
||||
for key in sorted(params, key=len, reverse=True):
|
||||
statement = statement.replace(
|
||||
key, force_str(params[key], errors="replace")
|
||||
)
|
||||
return statement
|
||||
|
||||
def last_insert_id(self, cursor, table_name, pk_name):
|
||||
sq_name = self._get_sequence_name(cursor, strip_quotes(table_name), pk_name)
|
||||
cursor.execute('"%s".currval' % sq_name)
|
||||
return cursor.fetchone()[0]
|
||||
|
||||
def lookup_cast(self, lookup_type, internal_type=None):
|
||||
if lookup_type in ("iexact", "icontains", "istartswith", "iendswith"):
|
||||
return "UPPER(%s)"
|
||||
if lookup_type != "isnull" and internal_type in (
|
||||
"BinaryField",
|
||||
"TextField",
|
||||
):
|
||||
return "DBMS_LOB.SUBSTR(%s)"
|
||||
return "%s"
|
||||
|
||||
def max_in_list_size(self):
|
||||
return 1000
|
||||
|
||||
def max_name_length(self):
|
||||
return 30
|
||||
|
||||
def pk_default_value(self):
|
||||
return "NULL"
|
||||
|
||||
def prep_for_iexact_query(self, x):
|
||||
return x
|
||||
|
||||
def process_clob(self, value):
|
||||
if value is None:
|
||||
return ""
|
||||
return value.read()
|
||||
|
||||
def quote_name(self, name):
|
||||
# SQL92 requires delimited (quoted) names to be case-sensitive. When
|
||||
# not quoted, Oracle has case-insensitive behavior for identifiers, but
|
||||
# always defaults to uppercase.
|
||||
# We simplify things by making Oracle identifiers always uppercase.
|
||||
if not name.startswith('"') and not name.endswith('"'):
|
||||
name = '"%s"' % truncate_name(name, self.max_name_length())
|
||||
# Oracle puts the query text into a (query % args) construct, so % signs
|
||||
# in names need to be escaped. The '%%' will be collapsed back to '%' at
|
||||
# that stage so we aren't really making the name longer here.
|
||||
name = name.replace("%", "%%")
|
||||
return name.upper()
|
||||
|
||||
def regex_lookup(self, lookup_type):
|
||||
if lookup_type == "regex":
|
||||
match_option = "'c'"
|
||||
else:
|
||||
match_option = "'i'"
|
||||
return "REGEXP_LIKE(%%s, %%s, %s)" % match_option
|
||||
|
||||
def return_insert_columns(self, fields):
|
||||
if not fields:
|
||||
return "", ()
|
||||
field_names = []
|
||||
params = []
|
||||
for field in fields:
|
||||
field_names.append(
|
||||
"%s.%s"
|
||||
% (
|
||||
self.quote_name(field.model._meta.db_table),
|
||||
self.quote_name(field.column),
|
||||
)
|
||||
)
|
||||
params.append(InsertVar(field))
|
||||
return "RETURNING %s INTO %s" % (
|
||||
", ".join(field_names),
|
||||
", ".join(["%s"] * len(params)),
|
||||
), tuple(params)
|
||||
|
||||
def __foreign_key_constraints(self, table_name, recursive):
|
||||
with self.connection.cursor() as cursor:
|
||||
if recursive:
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT
|
||||
user_tables.table_name, rcons.constraint_name
|
||||
FROM
|
||||
user_tables
|
||||
JOIN
|
||||
user_constraints cons
|
||||
ON (user_tables.table_name = cons.table_name
|
||||
AND cons.constraint_type = ANY('P', 'U'))
|
||||
LEFT JOIN
|
||||
user_constraints rcons
|
||||
ON (user_tables.table_name = rcons.table_name
|
||||
AND rcons.constraint_type = 'R')
|
||||
START WITH user_tables.table_name = UPPER(%s)
|
||||
CONNECT BY
|
||||
NOCYCLE PRIOR cons.constraint_name = rcons.r_constraint_name
|
||||
GROUP BY
|
||||
user_tables.table_name, rcons.constraint_name
|
||||
HAVING user_tables.table_name != UPPER(%s)
|
||||
ORDER BY MAX(level) DESC
|
||||
""",
|
||||
(table_name, table_name),
|
||||
)
|
||||
else:
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT
|
||||
cons.table_name, cons.constraint_name
|
||||
FROM
|
||||
user_constraints cons
|
||||
WHERE
|
||||
cons.constraint_type = 'R'
|
||||
AND cons.table_name = UPPER(%s)
|
||||
""",
|
||||
(table_name,),
|
||||
)
|
||||
return cursor.fetchall()
|
||||
|
||||
@cached_property
|
||||
def _foreign_key_constraints(self):
|
||||
# 512 is large enough to fit the ~330 tables (as of this writing) in
|
||||
# Django's test suite.
|
||||
return lru_cache(maxsize=512)(self.__foreign_key_constraints)
|
||||
|
||||
def sql_flush(self, style, tables, *, reset_sequences=False, allow_cascade=False):
|
||||
if not tables:
|
||||
return []
|
||||
|
||||
truncated_tables = {table.upper() for table in tables}
|
||||
constraints = set()
|
||||
# Oracle's TRUNCATE CASCADE only works with ON DELETE CASCADE foreign
|
||||
# keys which Django doesn't define. Emulate the PostgreSQL behavior
|
||||
# which truncates all dependent tables by manually retrieving all
|
||||
# foreign key constraints and resolving dependencies.
|
||||
for table in tables:
|
||||
for foreign_table, constraint in self._foreign_key_constraints(
|
||||
table, recursive=allow_cascade
|
||||
):
|
||||
if allow_cascade:
|
||||
truncated_tables.add(foreign_table)
|
||||
constraints.add((foreign_table, constraint))
|
||||
sql = (
|
||||
[
|
||||
"%s %s %s %s %s %s %s %s;"
|
||||
% (
|
||||
style.SQL_KEYWORD("ALTER"),
|
||||
style.SQL_KEYWORD("TABLE"),
|
||||
style.SQL_FIELD(self.quote_name(table)),
|
||||
style.SQL_KEYWORD("DISABLE"),
|
||||
style.SQL_KEYWORD("CONSTRAINT"),
|
||||
style.SQL_FIELD(self.quote_name(constraint)),
|
||||
style.SQL_KEYWORD("KEEP"),
|
||||
style.SQL_KEYWORD("INDEX"),
|
||||
)
|
||||
for table, constraint in constraints
|
||||
]
|
||||
+ [
|
||||
"%s %s %s;"
|
||||
% (
|
||||
style.SQL_KEYWORD("TRUNCATE"),
|
||||
style.SQL_KEYWORD("TABLE"),
|
||||
style.SQL_FIELD(self.quote_name(table)),
|
||||
)
|
||||
for table in truncated_tables
|
||||
]
|
||||
+ [
|
||||
"%s %s %s %s %s %s;"
|
||||
% (
|
||||
style.SQL_KEYWORD("ALTER"),
|
||||
style.SQL_KEYWORD("TABLE"),
|
||||
style.SQL_FIELD(self.quote_name(table)),
|
||||
style.SQL_KEYWORD("ENABLE"),
|
||||
style.SQL_KEYWORD("CONSTRAINT"),
|
||||
style.SQL_FIELD(self.quote_name(constraint)),
|
||||
)
|
||||
for table, constraint in constraints
|
||||
]
|
||||
)
|
||||
if reset_sequences:
|
||||
sequences = [
|
||||
sequence
|
||||
for sequence in self.connection.introspection.sequence_list()
|
||||
if sequence["table"].upper() in truncated_tables
|
||||
]
|
||||
# Since we've just deleted all the rows, running our sequence ALTER
|
||||
# code will reset the sequence to 0.
|
||||
sql.extend(self.sequence_reset_by_name_sql(style, sequences))
|
||||
return sql
|
||||
|
||||
def sequence_reset_by_name_sql(self, style, sequences):
|
||||
sql = []
|
||||
for sequence_info in sequences:
|
||||
no_autofield_sequence_name = self._get_no_autofield_sequence_name(
|
||||
sequence_info["table"]
|
||||
)
|
||||
table = self.quote_name(sequence_info["table"])
|
||||
column = self.quote_name(sequence_info["column"] or "id")
|
||||
query = self._sequence_reset_sql % {
|
||||
"no_autofield_sequence_name": no_autofield_sequence_name,
|
||||
"table": table,
|
||||
"column": column,
|
||||
"table_name": strip_quotes(table),
|
||||
"column_name": strip_quotes(column),
|
||||
"suffix": self.connection.features.bare_select_suffix,
|
||||
}
|
||||
sql.append(query)
|
||||
return sql
|
||||
|
||||
def sequence_reset_sql(self, style, model_list):
|
||||
output = []
|
||||
query = self._sequence_reset_sql
|
||||
for model in model_list:
|
||||
for f in model._meta.local_fields:
|
||||
if isinstance(f, AutoField):
|
||||
no_autofield_sequence_name = self._get_no_autofield_sequence_name(
|
||||
model._meta.db_table
|
||||
)
|
||||
table = self.quote_name(model._meta.db_table)
|
||||
column = self.quote_name(f.column)
|
||||
output.append(
|
||||
query
|
||||
% {
|
||||
"no_autofield_sequence_name": no_autofield_sequence_name,
|
||||
"table": table,
|
||||
"column": column,
|
||||
"table_name": strip_quotes(table),
|
||||
"column_name": strip_quotes(column),
|
||||
"suffix": self.connection.features.bare_select_suffix,
|
||||
}
|
||||
)
|
||||
# Only one AutoField is allowed per model, so don't
|
||||
# continue to loop
|
||||
break
|
||||
return output
|
||||
|
||||
def start_transaction_sql(self):
|
||||
return ""
|
||||
|
||||
def tablespace_sql(self, tablespace, inline=False):
|
||||
if inline:
|
||||
return "USING INDEX TABLESPACE %s" % self.quote_name(tablespace)
|
||||
else:
|
||||
return "TABLESPACE %s" % self.quote_name(tablespace)
|
||||
|
||||
def adapt_datefield_value(self, value):
|
||||
"""
|
||||
Transform a date value to an object compatible with what is expected
|
||||
by the backend driver for date columns.
|
||||
The default implementation transforms the date to text, but that is not
|
||||
necessary for Oracle.
|
||||
"""
|
||||
return value
|
||||
|
||||
def adapt_datetimefield_value(self, value):
|
||||
"""
|
||||
Transform a datetime value to an object compatible with what is expected
|
||||
by the backend driver for datetime columns.
|
||||
|
||||
If naive datetime is passed assumes that is in UTC. Normally Django
|
||||
models.DateTimeField makes sure that if USE_TZ is True passed datetime
|
||||
is timezone aware.
|
||||
"""
|
||||
|
||||
if value is None:
|
||||
return None
|
||||
|
||||
# oracledb doesn't support tz-aware datetimes
|
||||
if timezone.is_aware(value):
|
||||
if settings.USE_TZ:
|
||||
value = timezone.make_naive(value, self.connection.timezone)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Oracle backend does not support timezone-aware datetimes when "
|
||||
"USE_TZ is False."
|
||||
)
|
||||
|
||||
return Oracle_datetime.from_datetime(value)
|
||||
|
||||
def adapt_timefield_value(self, value):
|
||||
if value is None:
|
||||
return None
|
||||
|
||||
if isinstance(value, str):
|
||||
return datetime.datetime.strptime(value, "%H:%M:%S")
|
||||
|
||||
# Oracle doesn't support tz-aware times
|
||||
if timezone.is_aware(value):
|
||||
raise ValueError("Oracle backend does not support timezone-aware times.")
|
||||
|
||||
return Oracle_datetime(
|
||||
1900, 1, 1, value.hour, value.minute, value.second, value.microsecond
|
||||
)
|
||||
|
||||
def combine_expression(self, connector, sub_expressions):
|
||||
lhs, rhs = sub_expressions
|
||||
if connector == "%%":
|
||||
return "MOD(%s)" % ",".join(sub_expressions)
|
||||
elif connector == "&":
|
||||
return "BITAND(%s)" % ",".join(sub_expressions)
|
||||
elif connector == "|":
|
||||
return "BITAND(-%(lhs)s-1,%(rhs)s)+%(lhs)s" % {"lhs": lhs, "rhs": rhs}
|
||||
elif connector == "<<":
|
||||
return "(%(lhs)s * POWER(2, %(rhs)s))" % {"lhs": lhs, "rhs": rhs}
|
||||
elif connector == ">>":
|
||||
return "FLOOR(%(lhs)s / POWER(2, %(rhs)s))" % {"lhs": lhs, "rhs": rhs}
|
||||
elif connector == "^":
|
||||
return "POWER(%s)" % ",".join(sub_expressions)
|
||||
elif connector == "#":
|
||||
raise NotSupportedError("Bitwise XOR is not supported in Oracle.")
|
||||
return super().combine_expression(connector, sub_expressions)
|
||||
|
||||
def _get_no_autofield_sequence_name(self, table):
|
||||
"""
|
||||
Manually created sequence name to keep backward compatibility for
|
||||
AutoFields that aren't Oracle identity columns.
|
||||
"""
|
||||
name_length = self.max_name_length() - 3
|
||||
return "%s_SQ" % truncate_name(strip_quotes(table), name_length).upper()
|
||||
|
||||
def _get_sequence_name(self, cursor, table, pk_name):
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT sequence_name
|
||||
FROM user_tab_identity_cols
|
||||
WHERE table_name = UPPER(%s)
|
||||
AND column_name = UPPER(%s)""",
|
||||
[table, pk_name],
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
return self._get_no_autofield_sequence_name(table) if row is None else row[0]
|
||||
|
||||
def bulk_insert_sql(self, fields, placeholder_rows):
|
||||
field_placeholders = [
|
||||
BulkInsertMapper.types.get(
|
||||
getattr(field, "target_field", field).get_internal_type(), "%s"
|
||||
)
|
||||
for field in fields
|
||||
if field
|
||||
]
|
||||
query = []
|
||||
for row in placeholder_rows:
|
||||
select = []
|
||||
for i, placeholder in enumerate(row):
|
||||
# A model without any fields has fields=[None].
|
||||
if fields[i]:
|
||||
placeholder = field_placeholders[i] % placeholder
|
||||
# Add columns aliases to the first select to avoid "ORA-00918:
|
||||
# column ambiguously defined" when two or more columns in the
|
||||
# first select have the same value.
|
||||
if not query:
|
||||
placeholder = "%s col_%s" % (placeholder, i)
|
||||
select.append(placeholder)
|
||||
suffix = self.connection.features.bare_select_suffix
|
||||
query.append(f"SELECT %s{suffix}" % ", ".join(select))
|
||||
# Bulk insert to tables with Oracle identity columns causes Oracle to
|
||||
# add sequence.nextval to it. Sequence.nextval cannot be used with the
|
||||
# UNION operator. To prevent incorrect SQL, move UNION to a subquery.
|
||||
return "SELECT * FROM (%s)" % " UNION ALL ".join(query)
|
||||
|
||||
def subtract_temporals(self, internal_type, lhs, rhs):
|
||||
if internal_type == "DateField":
|
||||
lhs_sql, lhs_params = lhs
|
||||
rhs_sql, rhs_params = rhs
|
||||
params = (*lhs_params, *rhs_params)
|
||||
return (
|
||||
"NUMTODSINTERVAL(TO_NUMBER(%s - %s), 'DAY')" % (lhs_sql, rhs_sql),
|
||||
params,
|
||||
)
|
||||
return super().subtract_temporals(internal_type, lhs, rhs)
|
||||
|
||||
def bulk_batch_size(self, fields, objs):
|
||||
"""Oracle restricts the number of parameters in a query."""
|
||||
fields = list(
|
||||
chain.from_iterable(
|
||||
field.fields if isinstance(field, CompositePrimaryKey) else [field]
|
||||
for field in fields
|
||||
)
|
||||
)
|
||||
if fields:
|
||||
return self.connection.features.max_query_params // len(fields)
|
||||
return len(objs)
|
||||
|
||||
def conditional_expression_supported_in_where_clause(self, expression):
|
||||
"""
|
||||
Oracle supports only EXISTS(...) or filters in the WHERE clause, others
|
||||
must be compared with True.
|
||||
"""
|
||||
if isinstance(expression, (Exists, Lookup, WhereNode)):
|
||||
return True
|
||||
if isinstance(expression, ExpressionWrapper) and expression.conditional:
|
||||
return self.conditional_expression_supported_in_where_clause(
|
||||
expression.expression
|
||||
)
|
||||
if isinstance(expression, RawSQL) and expression.conditional:
|
||||
return True
|
||||
return False
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
import warnings
|
||||
|
||||
from django.utils.deprecation import RemovedInDjango60Warning
|
||||
|
||||
try:
|
||||
import oracledb
|
||||
|
||||
is_oracledb = True
|
||||
except ImportError as e:
|
||||
try:
|
||||
import cx_Oracle as oracledb # NOQA
|
||||
|
||||
warnings.warn(
|
||||
"cx_Oracle is deprecated. Use oracledb instead.",
|
||||
RemovedInDjango60Warning,
|
||||
stacklevel=2,
|
||||
)
|
||||
is_oracledb = False
|
||||
except ImportError:
|
||||
raise e from None
|
||||
|
|
@ -0,0 +1,252 @@
|
|||
import copy
|
||||
import datetime
|
||||
import re
|
||||
|
||||
from django.db import DatabaseError
|
||||
from django.db.backends.base.schema import (
|
||||
BaseDatabaseSchemaEditor,
|
||||
_related_non_m2m_objects,
|
||||
)
|
||||
from django.utils.duration import duration_iso_string
|
||||
|
||||
|
||||
class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
|
||||
sql_create_column = "ALTER TABLE %(table)s ADD %(column)s %(definition)s"
|
||||
sql_alter_column_type = "MODIFY %(column)s %(type)s%(collation)s"
|
||||
sql_alter_column_null = "MODIFY %(column)s NULL"
|
||||
sql_alter_column_not_null = "MODIFY %(column)s NOT NULL"
|
||||
sql_alter_column_default = "MODIFY %(column)s DEFAULT %(default)s"
|
||||
sql_alter_column_no_default = "MODIFY %(column)s DEFAULT NULL"
|
||||
sql_alter_column_no_default_null = sql_alter_column_no_default
|
||||
|
||||
sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
|
||||
sql_create_column_inline_fk = (
|
||||
"CONSTRAINT %(name)s REFERENCES %(to_table)s(%(to_column)s)%(deferrable)s"
|
||||
)
|
||||
sql_delete_table = "DROP TABLE %(table)s CASCADE CONSTRAINTS"
|
||||
sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
|
||||
|
||||
def quote_value(self, value):
|
||||
if isinstance(value, (datetime.date, datetime.time, datetime.datetime)):
|
||||
return "'%s'" % value
|
||||
elif isinstance(value, datetime.timedelta):
|
||||
return "'%s'" % duration_iso_string(value)
|
||||
elif isinstance(value, str):
|
||||
return "'%s'" % value.replace("'", "''")
|
||||
elif isinstance(value, (bytes, bytearray, memoryview)):
|
||||
return "'%s'" % value.hex()
|
||||
elif isinstance(value, bool):
|
||||
return "1" if value else "0"
|
||||
else:
|
||||
return str(value)
|
||||
|
||||
def remove_field(self, model, field):
|
||||
# If the column is an identity column, drop the identity before
|
||||
# removing the field.
|
||||
if self._is_identity_column(model._meta.db_table, field.column):
|
||||
self._drop_identity(model._meta.db_table, field.column)
|
||||
super().remove_field(model, field)
|
||||
|
||||
def delete_model(self, model):
|
||||
# Run superclass action
|
||||
super().delete_model(model)
|
||||
# Clean up manually created sequence.
|
||||
self.execute(
|
||||
"""
|
||||
DECLARE
|
||||
i INTEGER;
|
||||
BEGIN
|
||||
SELECT COUNT(1) INTO i FROM USER_SEQUENCES
|
||||
WHERE SEQUENCE_NAME = '%(sq_name)s';
|
||||
IF i = 1 THEN
|
||||
EXECUTE IMMEDIATE 'DROP SEQUENCE "%(sq_name)s"';
|
||||
END IF;
|
||||
END;
|
||||
/"""
|
||||
% {
|
||||
"sq_name": self.connection.ops._get_no_autofield_sequence_name(
|
||||
model._meta.db_table
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
def alter_field(self, model, old_field, new_field, strict=False):
|
||||
try:
|
||||
super().alter_field(model, old_field, new_field, strict)
|
||||
except DatabaseError as e:
|
||||
description = str(e)
|
||||
# If we're changing type to an unsupported type we need a
|
||||
# SQLite-ish workaround
|
||||
if "ORA-22858" in description or "ORA-22859" in description:
|
||||
self._alter_field_type_workaround(model, old_field, new_field)
|
||||
# If an identity column is changing to a non-numeric type, drop the
|
||||
# identity first.
|
||||
elif "ORA-30675" in description:
|
||||
self._drop_identity(model._meta.db_table, old_field.column)
|
||||
self.alter_field(model, old_field, new_field, strict)
|
||||
# If a primary key column is changing to an identity column, drop
|
||||
# the primary key first.
|
||||
elif "ORA-30673" in description and old_field.primary_key:
|
||||
self._delete_primary_key(model, strict=True)
|
||||
self._alter_field_type_workaround(model, old_field, new_field)
|
||||
# If a collation is changing on a primary key, drop the primary key
|
||||
# first.
|
||||
elif "ORA-43923" in description and old_field.primary_key:
|
||||
self._delete_primary_key(model, strict=True)
|
||||
self.alter_field(model, old_field, new_field, strict)
|
||||
# Restore a primary key, if needed.
|
||||
if new_field.primary_key:
|
||||
self.execute(self._create_primary_key_sql(model, new_field))
|
||||
else:
|
||||
raise
|
||||
|
||||
def _alter_field_type_workaround(self, model, old_field, new_field):
|
||||
"""
|
||||
Oracle refuses to change from some type to other type.
|
||||
What we need to do instead is:
|
||||
- Add a nullable version of the desired field with a temporary name. If
|
||||
the new column is an auto field, then the temporary column can't be
|
||||
nullable.
|
||||
- Update the table to transfer values from old to new
|
||||
- Drop old column
|
||||
- Rename the new column and possibly drop the nullable property
|
||||
"""
|
||||
# Make a new field that's like the new one but with a temporary
|
||||
# column name.
|
||||
new_temp_field = copy.deepcopy(new_field)
|
||||
new_temp_field.null = new_field.get_internal_type() not in (
|
||||
"AutoField",
|
||||
"BigAutoField",
|
||||
"SmallAutoField",
|
||||
)
|
||||
new_temp_field.column = self._generate_temp_name(new_field.column)
|
||||
# Add it
|
||||
self.add_field(model, new_temp_field)
|
||||
# Explicit data type conversion
|
||||
# https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf
|
||||
# /Data-Type-Comparison-Rules.html#GUID-D0C5A47E-6F93-4C2D-9E49-4F2B86B359DD
|
||||
new_value = self.quote_name(old_field.column)
|
||||
old_type = old_field.db_type(self.connection)
|
||||
if re.match("^N?CLOB", old_type):
|
||||
new_value = "TO_CHAR(%s)" % new_value
|
||||
old_type = "VARCHAR2"
|
||||
if re.match("^N?VARCHAR2", old_type):
|
||||
new_internal_type = new_field.get_internal_type()
|
||||
if new_internal_type == "DateField":
|
||||
new_value = "TO_DATE(%s, 'YYYY-MM-DD')" % new_value
|
||||
elif new_internal_type == "DateTimeField":
|
||||
new_value = "TO_TIMESTAMP(%s, 'YYYY-MM-DD HH24:MI:SS.FF')" % new_value
|
||||
elif new_internal_type == "TimeField":
|
||||
# TimeField are stored as TIMESTAMP with a 1900-01-01 date part.
|
||||
new_value = "CONCAT('1900-01-01 ', %s)" % new_value
|
||||
new_value = "TO_TIMESTAMP(%s, 'YYYY-MM-DD HH24:MI:SS.FF')" % new_value
|
||||
# Transfer values across
|
||||
self.execute(
|
||||
"UPDATE %s set %s=%s"
|
||||
% (
|
||||
self.quote_name(model._meta.db_table),
|
||||
self.quote_name(new_temp_field.column),
|
||||
new_value,
|
||||
)
|
||||
)
|
||||
# Drop the old field
|
||||
self.remove_field(model, old_field)
|
||||
# Rename and possibly make the new field NOT NULL
|
||||
super().alter_field(model, new_temp_field, new_field)
|
||||
# Recreate foreign key (if necessary) because the old field is not
|
||||
# passed to the alter_field() and data types of new_temp_field and
|
||||
# new_field always match.
|
||||
new_type = new_field.db_type(self.connection)
|
||||
if (
|
||||
(old_field.primary_key and new_field.primary_key)
|
||||
or (old_field.unique and new_field.unique)
|
||||
) and old_type != new_type:
|
||||
for _, rel in _related_non_m2m_objects(new_temp_field, new_field):
|
||||
if rel.field.db_constraint:
|
||||
self.execute(
|
||||
self._create_fk_sql(rel.related_model, rel.field, "_fk")
|
||||
)
|
||||
|
||||
def _alter_column_type_sql(
|
||||
self, model, old_field, new_field, new_type, old_collation, new_collation
|
||||
):
|
||||
auto_field_types = {"AutoField", "BigAutoField", "SmallAutoField"}
|
||||
# Drop the identity if migrating away from AutoField.
|
||||
if (
|
||||
old_field.get_internal_type() in auto_field_types
|
||||
and new_field.get_internal_type() not in auto_field_types
|
||||
and self._is_identity_column(model._meta.db_table, new_field.column)
|
||||
):
|
||||
self._drop_identity(model._meta.db_table, new_field.column)
|
||||
return super()._alter_column_type_sql(
|
||||
model, old_field, new_field, new_type, old_collation, new_collation
|
||||
)
|
||||
|
||||
def normalize_name(self, name):
|
||||
"""
|
||||
Get the properly shortened and uppercased identifier as returned by
|
||||
quote_name() but without the quotes.
|
||||
"""
|
||||
nn = self.quote_name(name)
|
||||
if nn[0] == '"' and nn[-1] == '"':
|
||||
nn = nn[1:-1]
|
||||
return nn
|
||||
|
||||
def _generate_temp_name(self, for_name):
|
||||
"""Generate temporary names for workarounds that need temp columns."""
|
||||
suffix = hex(hash(for_name)).upper()[1:]
|
||||
return self.normalize_name(for_name + "_" + suffix)
|
||||
|
||||
def prepare_default(self, value):
|
||||
return self.quote_value(value)
|
||||
|
||||
def _field_should_be_indexed(self, model, field):
|
||||
create_index = super()._field_should_be_indexed(model, field)
|
||||
db_type = field.db_type(self.connection)
|
||||
if (
|
||||
db_type is not None
|
||||
and db_type.lower() in self.connection._limited_data_types
|
||||
):
|
||||
return False
|
||||
return create_index
|
||||
|
||||
def _is_identity_column(self, table_name, column_name):
|
||||
if not column_name:
|
||||
return False
|
||||
with self.connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT
|
||||
CASE WHEN identity_column = 'YES' THEN 1 ELSE 0 END
|
||||
FROM user_tab_cols
|
||||
WHERE table_name = %s AND
|
||||
column_name = %s
|
||||
""",
|
||||
[self.normalize_name(table_name), self.normalize_name(column_name)],
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
return row[0] if row else False
|
||||
|
||||
def _drop_identity(self, table_name, column_name):
|
||||
self.execute(
|
||||
"ALTER TABLE %(table)s MODIFY %(column)s DROP IDENTITY"
|
||||
% {
|
||||
"table": self.quote_name(table_name),
|
||||
"column": self.quote_name(column_name),
|
||||
}
|
||||
)
|
||||
|
||||
def _get_default_collation(self, table_name):
|
||||
with self.connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT default_collation FROM user_tables WHERE table_name = %s
|
||||
""",
|
||||
[self.normalize_name(table_name)],
|
||||
)
|
||||
return cursor.fetchone()[0]
|
||||
|
||||
def _collate_sql(self, collation, old_collation=None, table_name=None):
|
||||
if collation is None and old_collation is not None:
|
||||
collation = self._get_default_collation(table_name)
|
||||
return super()._collate_sql(collation, old_collation, table_name)
|
||||
|
|
@ -0,0 +1,99 @@
|
|||
import datetime
|
||||
import decimal
|
||||
|
||||
from .base import Database
|
||||
|
||||
|
||||
class InsertVar:
|
||||
"""
|
||||
A late-binding cursor variable that can be passed to Cursor.execute
|
||||
as a parameter, in order to receive the id of the row created by an
|
||||
insert statement.
|
||||
"""
|
||||
|
||||
types = {
|
||||
"AutoField": int,
|
||||
"BigAutoField": int,
|
||||
"SmallAutoField": int,
|
||||
"IntegerField": int,
|
||||
"BigIntegerField": int,
|
||||
"SmallIntegerField": int,
|
||||
"PositiveBigIntegerField": int,
|
||||
"PositiveSmallIntegerField": int,
|
||||
"PositiveIntegerField": int,
|
||||
"BooleanField": int,
|
||||
"FloatField": Database.DB_TYPE_BINARY_DOUBLE,
|
||||
"DateTimeField": Database.DB_TYPE_TIMESTAMP,
|
||||
"DateField": Database.Date,
|
||||
"DecimalField": decimal.Decimal,
|
||||
}
|
||||
|
||||
def __init__(self, field):
|
||||
internal_type = getattr(field, "target_field", field).get_internal_type()
|
||||
self.db_type = self.types.get(internal_type, str)
|
||||
self.bound_param = None
|
||||
|
||||
def bind_parameter(self, cursor):
|
||||
self.bound_param = cursor.cursor.var(self.db_type)
|
||||
return self.bound_param
|
||||
|
||||
def get_value(self):
|
||||
return self.bound_param.getvalue()
|
||||
|
||||
|
||||
class Oracle_datetime(datetime.datetime):
|
||||
"""
|
||||
A datetime object, with an additional class attribute
|
||||
to tell oracledb to save the microseconds too.
|
||||
"""
|
||||
|
||||
input_size = Database.DB_TYPE_TIMESTAMP
|
||||
|
||||
@classmethod
|
||||
def from_datetime(cls, dt):
|
||||
return Oracle_datetime(
|
||||
dt.year,
|
||||
dt.month,
|
||||
dt.day,
|
||||
dt.hour,
|
||||
dt.minute,
|
||||
dt.second,
|
||||
dt.microsecond,
|
||||
)
|
||||
|
||||
|
||||
class BulkInsertMapper:
|
||||
BLOB = "TO_BLOB(%s)"
|
||||
DATE = "TO_DATE(%s)"
|
||||
INTERVAL = "CAST(%s as INTERVAL DAY(9) TO SECOND(6))"
|
||||
NCLOB = "TO_NCLOB(%s)"
|
||||
NUMBER = "TO_NUMBER(%s)"
|
||||
TIMESTAMP = "TO_TIMESTAMP(%s)"
|
||||
|
||||
types = {
|
||||
"AutoField": NUMBER,
|
||||
"BigAutoField": NUMBER,
|
||||
"BigIntegerField": NUMBER,
|
||||
"BinaryField": BLOB,
|
||||
"BooleanField": NUMBER,
|
||||
"DateField": DATE,
|
||||
"DateTimeField": TIMESTAMP,
|
||||
"DecimalField": NUMBER,
|
||||
"DurationField": INTERVAL,
|
||||
"FloatField": NUMBER,
|
||||
"IntegerField": NUMBER,
|
||||
"PositiveBigIntegerField": NUMBER,
|
||||
"PositiveIntegerField": NUMBER,
|
||||
"PositiveSmallIntegerField": NUMBER,
|
||||
"SmallAutoField": NUMBER,
|
||||
"SmallIntegerField": NUMBER,
|
||||
"TextField": NCLOB,
|
||||
"TimeField": TIMESTAMP,
|
||||
}
|
||||
|
||||
|
||||
def dsn(settings_dict):
|
||||
if settings_dict["PORT"]:
|
||||
host = settings_dict["HOST"].strip() or "localhost"
|
||||
return Database.makedsn(host, int(settings_dict["PORT"]), settings_dict["NAME"])
|
||||
return settings_dict["NAME"]
|
||||
|
|
@ -0,0 +1,22 @@
|
|||
from django.core import checks
|
||||
from django.db.backends.base.validation import BaseDatabaseValidation
|
||||
|
||||
|
||||
class DatabaseValidation(BaseDatabaseValidation):
|
||||
def check_field_type(self, field, field_type):
|
||||
"""Oracle doesn't support a database index on some data types."""
|
||||
errors = []
|
||||
if field.db_index and field_type.lower() in self.connection._limited_data_types:
|
||||
errors.append(
|
||||
checks.Warning(
|
||||
"Oracle does not support a database index on %s columns."
|
||||
% field_type,
|
||||
hint=(
|
||||
"An index won't be created. Silence this warning if "
|
||||
"you don't care about it."
|
||||
),
|
||||
obj=field,
|
||||
id="fields.W162",
|
||||
)
|
||||
)
|
||||
return errors
|
||||
Loading…
Add table
Add a link
Reference in a new issue