fded6ce46cb167faaae559ff93b050c2b7d18ff1
max
  Mon Jun 26 08:59:00 2023 -0700
Porting hgGeneGraph to python3. refs #31563

diff --git src/hg/pyLib/pymysql/cursors.py src/hg/pyLib/pymysql/cursors.py
new file mode 100644
index 0000000..666970b
--- /dev/null
+++ src/hg/pyLib/pymysql/cursors.py
@@ -0,0 +1,496 @@
+import re
+from . import err
+
+
+#: Regular expression for :meth:`Cursor.executemany`.
+#: executemany only supports simple bulk insert.
+#: You can use it to load large dataset.
+RE_INSERT_VALUES = re.compile(
+    r"\s*((?:INSERT|REPLACE)\b.+\bVALUES?\s*)"
+    + r"(\(\s*(?:%s|%\(.+\)s)\s*(?:,\s*(?:%s|%\(.+\)s)\s*)*\))"
+    + r"(\s*(?:ON DUPLICATE.*)?);?\s*\Z",
+    re.IGNORECASE | re.DOTALL,
+)
+
+
+class Cursor:
+    """
+    This is the object you use to interact with the database.
+
+    Do not create an instance of a Cursor yourself. Call
+    connections.Connection.cursor().
+
+    See `Cursor <https://www.python.org/dev/peps/pep-0249/#cursor-objects>`_ in
+    the specification.
+    """
+
+    #: Max statement size which :meth:`executemany` generates.
+    #:
+    #: Max size of allowed statement is max_allowed_packet - packet_header_size.
+    #: Default value of max_allowed_packet is 1048576.
+    max_stmt_length = 1024000
+
+    def __init__(self, connection):
+        self.connection = connection
+        self.description = None
+        self.rownumber = 0
+        self.rowcount = -1
+        self.arraysize = 1
+        self._executed = None
+        self._result = None
+        self._rows = None
+
+    def close(self):
+        """
+        Closing a cursor just exhausts all remaining data.
+        """
+        conn = self.connection
+        if conn is None:
+            return
+        try:
+            while self.nextset():
+                pass
+        finally:
+            self.connection = None
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *exc_info):
+        del exc_info
+        self.close()
+
+    def _get_db(self):
+        if not self.connection:
+            raise err.ProgrammingError("Cursor closed")
+        return self.connection
+
+    def _check_executed(self):
+        if not self._executed:
+            raise err.ProgrammingError("execute() first")
+
+    def _conv_row(self, row):
+        return row
+
+    def setinputsizes(self, *args):
+        """Does nothing, required by DB API."""
+
+    def setoutputsizes(self, *args):
+        """Does nothing, required by DB API."""
+
+    def _nextset(self, unbuffered=False):
+        """Get the next query set"""
+        conn = self._get_db()
+        current_result = self._result
+        if current_result is None or current_result is not conn._result:
+            return None
+        if not current_result.has_next:
+            return None
+        self._result = None
+        self._clear_result()
+        conn.next_result(unbuffered=unbuffered)
+        self._do_get_result()
+        return True
+
+    def nextset(self):
+        return self._nextset(False)
+
+    def _ensure_bytes(self, x, encoding=None):
+        if isinstance(x, str):
+            x = x.encode(encoding)
+        elif isinstance(x, (tuple, list)):
+            x = type(x)(self._ensure_bytes(v, encoding=encoding) for v in x)
+        return x
+
+    def _escape_args(self, args, conn):
+        if isinstance(args, (tuple, list)):
+            return tuple(conn.literal(arg) for arg in args)
+        elif isinstance(args, dict):
+            return {key: conn.literal(val) for (key, val) in args.items()}
+        else:
+            # If it's not a dictionary let's try escaping it anyways.
+            # Worst case it will throw a Value error
+            return conn.escape(args)
+
+    def mogrify(self, query, args=None):
+        """
+        Returns the exact string that is sent to the database by calling the
+        execute() method.
+
+        This method follows the extension to the DB API 2.0 followed by Psycopg.
+        """
+        conn = self._get_db()
+
+        if args is not None:
+            query = query % self._escape_args(args, conn)
+
+        return query
+
+    def execute(self, query, args=None):
+        """Execute a query
+
+        :param str query: Query to execute.
+
+        :param args: parameters used with query. (optional)
+        :type args: tuple, list or dict
+
+        :return: Number of affected rows
+        :rtype: int
+
+        If args is a list or tuple, %s can be used as a placeholder in the query.
+        If args is a dict, %(name)s can be used as a placeholder in the query.
+        """
+        while self.nextset():
+            pass
+
+        query = self.mogrify(query, args)
+
+        result = self._query(query)
+        self._executed = query
+        return result
+
+    def executemany(self, query, args):
+        # type: (str, list) -> int
+        """Run several data against one query
+
+        :param query: query to execute on server
+        :param args:  Sequence of sequences or mappings.  It is used as parameter.
+        :return: Number of rows affected, if any.
+
+        This method improves performance on multiple-row INSERT and
+        REPLACE. Otherwise it is equivalent to looping over args with
+        execute().
+        """
+        if not args:
+            return
+
+        m = RE_INSERT_VALUES.match(query)
+        if m:
+            q_prefix = m.group(1) % ()
+            q_values = m.group(2).rstrip()
+            q_postfix = m.group(3) or ""
+            assert q_values[0] == "(" and q_values[-1] == ")"
+            return self._do_execute_many(
+                q_prefix,
+                q_values,
+                q_postfix,
+                args,
+                self.max_stmt_length,
+                self._get_db().encoding,
+            )
+
+        self.rowcount = sum(self.execute(query, arg) for arg in args)
+        return self.rowcount
+
+    def _do_execute_many(
+        self, prefix, values, postfix, args, max_stmt_length, encoding
+    ):
+        conn = self._get_db()
+        escape = self._escape_args
+        if isinstance(prefix, str):
+            prefix = prefix.encode(encoding)
+        if isinstance(postfix, str):
+            postfix = postfix.encode(encoding)
+        sql = bytearray(prefix)
+        args = iter(args)
+        v = values % escape(next(args), conn)
+        if isinstance(v, str):
+            v = v.encode(encoding, "surrogateescape")
+        sql += v
+        rows = 0
+        for arg in args:
+            v = values % escape(arg, conn)
+            if isinstance(v, str):
+                v = v.encode(encoding, "surrogateescape")
+            if len(sql) + len(v) + len(postfix) + 1 > max_stmt_length:
+                rows += self.execute(sql + postfix)
+                sql = bytearray(prefix)
+            else:
+                sql += b","
+            sql += v
+        rows += self.execute(sql + postfix)
+        self.rowcount = rows
+        return rows
+
+    def callproc(self, procname, args=()):
+        """Execute stored procedure procname with args
+
+        procname -- string, name of procedure to execute on server
+
+        args -- Sequence of parameters to use with procedure
+
+        Returns the original args.
+
+        Compatibility warning: PEP-249 specifies that any modified
+        parameters must be returned. This is currently impossible
+        as they are only available by storing them in a server
+        variable and then retrieved by a query. Since stored
+        procedures return zero or more result sets, there is no
+        reliable way to get at OUT or INOUT parameters via callproc.
+        The server variables are named @_procname_n, where procname
+        is the parameter above and n is the position of the parameter
+        (from zero). Once all result sets generated by the procedure
+        have been fetched, you can issue a SELECT @_procname_0, ...
+        query using .execute() to get any OUT or INOUT values.
+
+        Compatibility warning: The act of calling a stored procedure
+        itself creates an empty result set. This appears after any
+        result sets generated by the procedure. This is non-standard
+        behavior with respect to the DB-API. Be sure to use nextset()
+        to advance through all result sets; otherwise you may get
+        disconnected.
+        """
+        conn = self._get_db()
+        if args:
+            fmt = f"@_{procname}_%d=%s"
+            self._query(
+                "SET %s"
+                % ",".join(
+                    fmt % (index, conn.escape(arg)) for index, arg in enumerate(args)
+                )
+            )
+            self.nextset()
+
+        q = "CALL %s(%s)" % (
+            procname,
+            ",".join(["@_%s_%d" % (procname, i) for i in range(len(args))]),
+        )
+        self._query(q)
+        self._executed = q
+        return args
+
+    def fetchone(self):
+        """Fetch the next row"""
+        self._check_executed()
+        if self._rows is None or self.rownumber >= len(self._rows):
+            return None
+        result = self._rows[self.rownumber]
+        self.rownumber += 1
+        return result
+
+    def fetchmany(self, size=None):
+        """Fetch several rows"""
+        self._check_executed()
+        if self._rows is None:
+            return ()
+        end = self.rownumber + (size or self.arraysize)
+        result = self._rows[self.rownumber : end]
+        self.rownumber = min(end, len(self._rows))
+        return result
+
+    def fetchall(self):
+        """Fetch all the rows"""
+        self._check_executed()
+        if self._rows is None:
+            return ()
+        if self.rownumber:
+            result = self._rows[self.rownumber :]
+        else:
+            result = self._rows
+        self.rownumber = len(self._rows)
+        return result
+
+    def scroll(self, value, mode="relative"):
+        self._check_executed()
+        if mode == "relative":
+            r = self.rownumber + value
+        elif mode == "absolute":
+            r = value
+        else:
+            raise err.ProgrammingError("unknown scroll mode %s" % mode)
+
+        if not (0 <= r < len(self._rows)):
+            raise IndexError("out of range")
+        self.rownumber = r
+
+    def _query(self, q):
+        conn = self._get_db()
+        self._last_executed = q
+        self._clear_result()
+        conn.query(q)
+        self._do_get_result()
+        return self.rowcount
+
+    def _clear_result(self):
+        self.rownumber = 0
+        self._result = None
+
+        self.rowcount = 0
+        self.description = None
+        self.lastrowid = None
+        self._rows = None
+
+    def _do_get_result(self):
+        conn = self._get_db()
+
+        self._result = result = conn._result
+
+        self.rowcount = result.affected_rows
+        self.description = result.description
+        self.lastrowid = result.insert_id
+        self._rows = result.rows
+
+    def __iter__(self):
+        return iter(self.fetchone, None)
+
+    Warning = err.Warning
+    Error = err.Error
+    InterfaceError = err.InterfaceError
+    DatabaseError = err.DatabaseError
+    DataError = err.DataError
+    OperationalError = err.OperationalError
+    IntegrityError = err.IntegrityError
+    InternalError = err.InternalError
+    ProgrammingError = err.ProgrammingError
+    NotSupportedError = err.NotSupportedError
+
+
+class DictCursorMixin:
+    # You can override this to use OrderedDict or other dict-like types.
+    dict_type = dict
+
+    def _do_get_result(self):
+        super(DictCursorMixin, self)._do_get_result()
+        fields = []
+        if self.description:
+            for f in self._result.fields:
+                name = f.name
+                if name in fields:
+                    name = f.table_name + "." + name
+                fields.append(name)
+            self._fields = fields
+
+        if fields and self._rows:
+            self._rows = [self._conv_row(r) for r in self._rows]
+
+    def _conv_row(self, row):
+        if row is None:
+            return None
+        return self.dict_type(zip(self._fields, row))
+
+
+class DictCursor(DictCursorMixin, Cursor):
+    """A cursor which returns results as a dictionary"""
+
+
+class SSCursor(Cursor):
+    """
+    Unbuffered Cursor, mainly useful for queries that return a lot of data,
+    or for connections to remote servers over a slow network.
+
+    Instead of copying every row of data into a buffer, this will fetch
+    rows as needed. The upside of this is the client uses much less memory,
+    and rows are returned much faster when traveling over a slow network
+    or if the result set is very big.
+
+    There are limitations, though. The MySQL protocol doesn't support
+    returning the total number of rows, so the only way to tell how many rows
+    there are is to iterate over every row returned. Also, it currently isn't
+    possible to scroll backwards, as only the current row is held in memory.
+    """
+
+    def _conv_row(self, row):
+        return row
+
+    def close(self):
+        conn = self.connection
+        if conn is None:
+            return
+
+        if self._result is not None and self._result is conn._result:
+            self._result._finish_unbuffered_query()
+
+        try:
+            while self.nextset():
+                pass
+        finally:
+            self.connection = None
+
+    __del__ = close
+
+    def _query(self, q):
+        conn = self._get_db()
+        self._last_executed = q
+        self._clear_result()
+        conn.query(q, unbuffered=True)
+        self._do_get_result()
+        return self.rowcount
+
+    def nextset(self):
+        return self._nextset(unbuffered=True)
+
+    def read_next(self):
+        """Read next row"""
+        return self._conv_row(self._result._read_rowdata_packet_unbuffered())
+
+    def fetchone(self):
+        """Fetch next row"""
+        self._check_executed()
+        row = self.read_next()
+        if row is None:
+            return None
+        self.rownumber += 1
+        return row
+
+    def fetchall(self):
+        """
+        Fetch all, as per MySQLdb. Pretty useless for large queries, as
+        it is buffered. See fetchall_unbuffered(), if you want an unbuffered
+        generator version of this method.
+        """
+        return list(self.fetchall_unbuffered())
+
+    def fetchall_unbuffered(self):
+        """
+        Fetch all, implemented as a generator, which isn't to standard,
+        however, it doesn't make sense to return everything in a list, as that
+        would use ridiculous memory for large result sets.
+        """
+        return iter(self.fetchone, None)
+
+    def __iter__(self):
+        return self.fetchall_unbuffered()
+
+    def fetchmany(self, size=None):
+        """Fetch many"""
+        self._check_executed()
+        if size is None:
+            size = self.arraysize
+
+        rows = []
+        for i in range(size):
+            row = self.read_next()
+            if row is None:
+                break
+            rows.append(row)
+            self.rownumber += 1
+        return rows
+
+    def scroll(self, value, mode="relative"):
+        self._check_executed()
+
+        if mode == "relative":
+            if value < 0:
+                raise err.NotSupportedError(
+                    "Backwards scrolling not supported by this cursor"
+                )
+
+            for _ in range(value):
+                self.read_next()
+            self.rownumber += value
+        elif mode == "absolute":
+            if value < self.rownumber:
+                raise err.NotSupportedError(
+                    "Backwards scrolling not supported by this cursor"
+                )
+
+            end = value - self.rownumber
+            for _ in range(end):
+                self.read_next()
+            self.rownumber = value
+        else:
+            raise err.ProgrammingError("unknown scroll mode %s" % mode)
+
+
+class SSDictCursor(DictCursorMixin, SSCursor):
+    """An unbuffered cursor, which returns results as a dictionary"""