Повышение производительности PL/SQL с использованием конвейерных табличных функций

Использование конвейерных табличных функций PL/SQLКонвейерные функции сочетают элегантность и простоту PL/SQL с производительностью SQL. Разработка и поддержка сложных преобразований данных не требует значительных усилий в PL/SQL, но для достижения высокопроизводительной обработки данных приходится часто прибегать к решениям на базе SQL. Конвейерные функции заполняют пробел между двумя технологиями, но у них также имеются свои уникальные особенности, которые делают их превосходным инструментом оптимизации.


Оглавление статьи[Показать]


В этой статье будут приведены примеры типичных требований к обработке данных и их оптимизации с применением конвейерных функций. Мы рассмотрим следующие темы:

Основной синтаксис конвейерных табличных функций был рассмотрен в этой статье. Напомню, что конвейерные функции вызываются в секции FROM команд SQL и используются для получения данных, как если бы они были реляционными таблицами или другими источниками строк. В отличие от стандартных табличных функций, которые должны полностью обработать свои данные, прежде чем передавать коллекцию данных (возможно, достаточно большую) на сторону контекста вызова, конвейерные табличные функции поставляют свои результаты клиенту почти сразу же после их подготовки. Другими словами, конвейерные функции не материализуют весь результирующий набор, и эта оптимизация существенно сокращает затраты памяти PGA. Другая отличительная особенность конвейерных функций — возможность их вызова в контексте параллельного запроса. Я много раз использовал эти средства для повышения эффективности своих программ; далее я покажу, когда и как вы можете воспользоваться ими в своих программах.

 

Замена вставки на базе строк загрузкой на базе конвейерных функций

Для демонстрации возможностей конвейерных функций представим типичный сценарий, который нам нужно довести до современного уровня. На базе примера stockpivot я запрограммировал простую построчную загрузку с выборкой исходных данных stockpivot и преобразованием каждой записи в две строки для вставки. Программа содержится в пакете и выглядит так:


PROCEDURE load_stocks_legacy IS

   CURSOR c_source_data IS
      SELECT ticker, open_price, close_price, trade_date 
      FROM stocktable;

   r_source_data stockpivot_pkg.stocktable_rt; 
   r_target_data stockpivot_pkg.tickertable_rt;

BEGIN
   OPEN c_source_data;
   LOOP
      FETCH c_source_data INTO r_source_data;
      EXIT WHEN c_source_data%NOTFOUND;

      /* Цена на момент открытия... */
      r_target_data.ticker      := r_source_data.ticker;
      r_target_data.price_type  := 'O';
      r_target_data.price       := r_source_data.open_price;
      r_target_data.price_date  := r_source_data.trade_date;
      INSERT INTO tickertable VALUES r_target_data;

      /* Цена на момент закрытия... */
      r_target_data.price_type := 'C';
      r_target_data.price      := r_source_data.close_price;
      INSERT INTO tickertable VALUES r_target_data;

   END LOOP;
   CLOSE c_source_data;
END load_stocks_legacy;

Я регулярно встречаю подобный код, и еще со времен Oracle8i Database обычно использую BULK COLLECT и FORALL в качестве основного средства оптимизации (когда логика слишком сложна для решения на базе SQL). Альтернативное решение (описание которого я впервые увидел у Тома Кайта) основано на использовании вставки на базе наборов из конвейерной функции. Другими словами, конвейерная функция используется для всех преобразований данных и подготовительной логики, но загрузка данных в целевую таблицу реализуется отдельно через вставку из набора. Освоив этот мощный прием, я успешно использую его для оптимизации своих проектов.

 

Реализация конвейерной функции

Как было показано в этом блоге, создание конвейерной функции следует начать с анализа данных, которые она должна возвращать. Для этого необходимо создать объектный тип, определяющий одну строку возвращаемых данных конвейерной функции:


CREATE TYPE stockpivot_ot AS OBJECT 
( ticker	VARCHAR2(10)
, price_type VARCHAR2D)
, price	NUMBER
, price_date DATE 
);

Также необходимо создать коллекцию таких объектов, определяющих возвращаемый тип функции:


CREATE TYPE stockpivot_ntt AS TABLE OF stockpivot_ot;

Преобразовать унаследованный код в конвейерную функцию несложно. Работа начинается с определения спецификации функции в заголовке. Также необходимо включить процедуру загрузки, которая будет описана позднее:


CREATE PACKAGE stockpivot_pkg AS

   TYPE stocktable_rct IS REF CURSOR
      RETURN stocktable%ROWTYPE;

   <...>

   FUNCTION pipe_stocks(
            p_source_data IN stockpivot_pkg.stocktable_rct 
            ) RETURN stockpivot_ntt PIPELINED;

   PROCEDURE load_stocks;

END stockpivot_pkg;

Моя конвейерная функция получает сильную ссылку REF CURSOR (впрочем, в данной ситуации также можно использовать слабую ссылку). Параметр-курсор не является обязательным — с таким же успехом его можно было объявить в самой функции (как было сделано в старой процедуре). Впрочем, параметр-курсор все равно понадобится в следующих итерациях конвейерной функции, поэтому я добавил его.

Реализация функции выглядит так:


1   FUNCTION pipe_stocks(
2            p_source_data IN stockpivot_pkg.stocktable_rct
3            ) RETURN stockpivot_ntt PIPELINED IS
4
5      r_target_data stockpivot_ot := stockpivot_ot(NULL, NULL, NULL, NULL);
6      r_source_data stockpivot_pkg.stocktable_rt;
7
8   BEGIN
9      LOOP
10        FETCH p_source_data INTO r_source_data;
11        EXIT WHEN p_source_data%NOTFOUND;
12
13        /* Первая строка... */
14        r_target_data.ticker := r_source_data.ticker;
15        r_target_data.price_type := 'O';
16        r_target_data.price := r_source_data.open_price;
17        r_target_data.price_date := r_source_data.trade_date;
18        PIPE ROW (r_target_data);
19
20        /* Вторая строка... */
21        r_target_data.price_type := 'C';
22        r_target_data.price := r_source_data.close_price;
23        PIPE ROW (r_target_data);
24
25     END LOOP;
26     CLOSE p_source_data;
27     RETURN;
28  END pipe_stocks;

Кроме общего синтаксиса конвейерных функций, основная часть кода конвейерной функции легко узнается по старой версии. Основные различия перечислены в следующей таблице.

Основные различия

 

Загрузка данных из конвейерной функции

Как видите, при небольшом количестве изменений в исходном коде я создал конвейерную функцию, которая подготавливает и передает все данные для загрузки в tickertable. Чтобы завершить преобразование старого кода, остается написать дополнительную процедуру для вставки данных в таблицу:


   PROCEDURE load_stocks IS
   BEGIN

      INSERT INTO tickertable (ticker, price_type, price, price_date) 
      SELECT ticker, price_type, price, price_date 
      FROM TABLE(
              stockpivot_pkg.pipe_stocks(
                 CURSOR(SELECT * FROM stocktable)));

   END load_stocks;

На этом преобразование построчного кода в решение с конвейерной функцией можно считать законченным. Как оно работает в сравнении с оригиналом? В процессе тестирования я создал stocktable как внешнюю таблицу в виде файла из 500 000 записей. Старая версия с построчным преобразованием завершалась за 57 секунд (со вставкой 1 миллиона записей в tickertable), а вставка с конвейерной функцией выполнялась всего за 16 секунд.

Если учесть, что это была самая первая и простая реализация конвейерной функции, улучшение производительности выглядит вполне достойно. Тем не менее оно все же уступает производительности простого решения с BULK COLLECT и FORALL (которое в моих тестах отрабатывало всего за 5 секунд), поэтому в конвейерную функцию загрузки данных необходимо внести кое-какие изменения.

Но перед этим обратите внимание на то, что я сохранил построчную выборку из основного курсора и ничего не сделал для сокращения «дорогостоящего» переключения контекста (для чего потребуется выборка BULK COLLECT). Почему же новая версия работает быстрее старой?

Прежде всего из-за того, что операции DML с наборами (такие, как INSERT...SELECT в моей реализации) почти всегда выполняются намного быстрее, чем построчные процедурные решения. В этом конкретном случае выигрыш был обусловлен внутренней оптимизацией вставок с наборами. А именно при вставке INSERT...SELECT база банных сохраняет намного меньше данных отмены, чем для обычной вставки (INSERT.VALUES). Другими словами, при вставке 100 строк в одной команде будет сгенерировано меньше информации отмены, чем при вставке 100 строк по одной строке.

Для моей исходной таблицы с 1 миллионом строк объем данных отмены составил свыше 270 Мбайт. При конвейерной загрузке он сократился до 37 Мбайт — согласитесь, весьма заметная экономия!

В своем примере я для наглядности отказался от сложных преобразований данных. Всегда следует предполагать, что правила обработки данных достаточно сложны, чтобы для их реализации потребовалось решение с конвейерной функцией PL/ SQL. В противном случае для преобразования данных большого объема было бы проще воспользоваться решением на базе SQL с аналитическими функциями, подзапросами и выражениями CASE.

 

Оптимизация конвейерных функций посредством выборки из массива

Старый код был усовершенствован реализацией на базе конвейерной функции, но работа еще не закончена. Есть и другие возможности оптимизации, и обработка должна по крайней мере не уступать по скорости решению с BULK COLLECT и FORALL. Обратите внимание: в моем решении используется построчная выборка из основного курсора. Следовательно, первая возможность для оптимизации — использование массовой выборки BULK COLLECT.

Для начала я добавлю в свою спецификацию пакета размер массива по умолчанию. Оптимальный размер массива зависит от конкретных требований к обработки данных, но я всегда начинаю тестирование со 100, а дальше действую по ситуации. Также в спецификацию пакета будет добавлен тип ассоциативного массива (впрочем, его с таким же успехом можно было объявить в теле) для массовой выборки из курсора. Наконец, я добавлю в сигнатуру конвейерной функции второй параметр для управления размером выборки (конечно, это не обязательно — просто полезная привычка). Теперь спецификация выглядит так:


CREATE PACKAGE stockpivot_pkg AS
   <...>
   c_default_limit CONSTANT PLS_INTEGER := 100;

   TYPE stocktable_aat IS TABLE OF stocktable%ROWTYPE
      INDEX BY PLS_INTEGER;

   FUNCTION pipe_stocks_array(
            p_source_data IN stockpivot_pkg.stocktable_rct,
            p_limit_size IN PLS_INTEGER DEFAULT stockpivot_pkg.c_default_limit
            ) RETURN stockpivot_ntt PIPELINED;
   <...>
END stockpivot_pkg;

Сама функция очень похожа на исходную версию:


   FUNCTION pipe_stocks_array(
            p_source_data IN stockpivot_pkg.stocktable_rct,
            p_limit_size IN PLS_INTEGER DEFAULT stockpivot_pkg.c_default_limit
            ) RETURN stockpivot_ntt PIPELINED IS   

      r_target_data stockpivot_ot := stockpivot_ot(NULL, NULL, NULL, NULL);
      aa_source_data stockpivot_pkg.stocktable_aat;

   BEGIN
      LOOP
         FETCH p_source_data BULK COLLECT INTO aa_source_data LIMIT p_limit_size;
         EXIT WHEN aa_source_data.COUNT = 0;

         /* Обработка пакета из (p_limit_size) записей... */
         FOR i IN 1 .. aa_source_data.COUNT LOOP
            /* Первая строка ... */
            r_target_data.ticker := aa_source_data(i).ticker;
            r_target_data.price_type := 'O';
            r_target_data.price := aa_source_data(i).open_price;
            r_target_data.price_date := aa_source_data(i).trade_date;
            PIPE ROW (r_target_data);

            /* Вторая строка... */
            r_target_data.price_type := 'C';
            r_target_data.price := aa_source_data(i).close_price;
            PIPE ROW (r_target_data);
         END LOOP;
      END LOOP;
      CLOSE p_source_data;
      RETURN;
   END pipe_stocks_array;

Единственное отличие от исходной версии — использование BULK COLLECT...LIMIT. Процедура загрузки осталась прежней, только теперь она ссылается на версию конвейерной функции с массивом. Время загрузки сокращается до 6 секунд исключительно за счет сокращения переключения контекста из PL/SQL. Теперь решение с конвейерной функцией по производительности сравнимо с решением на базе BULK COLLECT и FORALL.

 

Использование параллельных конвейерных функций для достижения оптимальной производительности

Переход от вставки к конвейерной функции уже обеспечил неплохой выигрыш по быстродействию. Тем не менее остается еще одна возможность, которая повысит производительность по сравнению с любым другим решением: использование параллельной поддержки конвейерных функций. В следующей итерации я включаю параллельную поддержку для функции stockpivot, для чего в сигнатуру функции добавляется еще одна секция:


CREATE PACKAGE stockpivot_pkg AS 
   <...>
   FUNCTION pipe_stocks_parallel(
            p_source_data IN stockpivot_pkg.stocktable_rct
            p_limit_size IN PLS_INTEGER DEFAULT stockpivot_pkg.c_default_limit 
            ) RETURN stockpivot_ntt 
              PIPELINED
              PARALLEL_ENABLE (PARTITION p_source_data BY ANY);
   <...>
END stockpivot_pkg;

Используя схему группировки ANY, я приказываю базе данных Oracle случайным образом распределять исходные данные между параллельными процессами. Это связано с тем, что порядок получения и обработки исходных данных функцией не влияет на результат (то есть межстрочные зависимости отсутствуют). Конечно, так бывает не всегда.

 

включение параллельного выполнения конвейерных функций

Помимо синтаксиса включения параллельного выполнения в спецификации и теле реализация функции не отличается от предыдущего примера (полный код пакета содержится в файле stockpivot_setup.sql на сайте). Однако я должен принять меры к тому, чтобы загрузка из tickertable выполнялась параллельно. Сначала нужно включить параллельные операции DML на уровне сеанса. Когда это будет сделано, параллельный запрос вызывается одним из следующих способов:

Параллельные запросы/DML относятся к функциональности Oracle Database Enterprise Edition. Если вы используете издание Standard Edition или Standard Edition One, вы не сможете использовать параллельный режим конвейерных функций.

В своей процедуре я включил параллельный режим DML на уровне сеанса и использовал рекомендации для задания степени параллелизма (DOP) 4:


PROCEDURE load_stocks_parallel IS 
BEGIN
   EXECUTE IMMEDIATE 'ALTER SESSION ENABLE PARALLEL DML';
   INSERT /*+ PARALLEL(t, 4) */ INTO tickertable t 
          (ticker, price_type, price, price_date)
   SELECT ticker, price_type, price, price_date 
   FROM TABLE(
           stockpivot_pkg.pipe_stocks_parallel(
              CURSOR(SELECT /*+ PARALLEL(s, 4) */ * FROM stocktable s)));
END load_stocks_parallel;

Время загрузки сокращается до 3 секунд — существенное улучшение по сравнению с исходной версией, а также всеми остальными версиями конвейерной функции. Конечно, при относительно быстрых операциях затраты на запуск параллельных процессов отразятся на общем времени, но несмотря на это мне удалось добиться почти 50% ускорения. Тот факт, что параллельные вставки используют метод прямой вставки (вместо традиционной) также означает, что генерируемые данные отмены снова уменьшаются — теперь они занимают всего 25 Кбайт!

В коммерческих системах приходится оптимизировать процессы, которые выполняются по часу и более, поэтому выигрыш от параллельных конвейерных операций будет значительным как в относительном, так и в абсолютном выражении.

При использовании параллельных конвейерных функций курсор-источник должен передаваться в параметре REF CURSOR. В последовательных конвейерных функциях курсор-источник может быть встроен в саму функцию (хотя я в своих примерах этого не сделал). Кроме того, для функций с группировкой по типу ANY ссылка REF CURSOR может быть как слабо, так и сильно типизованной, но с группировкой HASH или RANGE допускается только сильная типизация.

 

Конвейерные функции при оптимизации операций слияния

Как видите, последовательные или параллельные конвейерные функции могут серьезно рассматриваться как механизм оптимизации крупномасштабной загрузки данных. Тем не менее загрузка не всегда подразумевает вставку, как в примере stockpivot. Многие загрузки данных выполняются в пошаговом режиме и требуют периодического слияния новых и измененных данных. К счастью, принцип объединения преобразований PL/ SQL с наборами SQL также применим и к операциям слияния (и обновления).

 

Традиционное слияние в коде PL/SQL

Следующая процедура взята из примера employee_pkg. Требуется включить в базу данных большое количество записей employee, но в унаследованном коде используется старый метод PL/SQL — программа сначала пытается выполнить обновление и выполняет вставку только в том случае, если операция обновления не обнаружила ни одной подходящей записи в целевой таблице:


PROCEDURE upsert_employees IS 
   n PLS_INTEGER := 0;
BEGIN
   FOR r_emp IN (SELECT * FROM employees_staging) LOOP 
      UPDATE employees 
      SET <...>
      WHERE employee_id = r_emp.employee_id;
      IF SQL%ROWCOUNT = 0 THEN
         INSERT INTO employees (<...>)
         VALUES (<...>);
      END IF;
   END LOOP;
END upsert_employees;

Часть кода была удалена для краткости, но методика «обновления со вставкой» видна достаточно четко. Обратите внимание на использование неявного цикла FOR со счетчиком, который выиграет от оптимизации выборки из массива, введенной в PL/SQL в Oracle Database 10g.

Для тестирования этой процедуры я создал громадную таблицу из 500 000 записей, после чего вставил 250 000 из них в таблицу работников, чтобы обеспечить равномерное разбиение между обновлением и вставкой. Это «примитивное слияние» на базе PL/ SQL завершилось за 46 секунд.

 

Использование конвейерных функций для слияния с наборами

Преобразование этого примера в операцию SQL MERGE с конвейерной функцией также не вызывает особых проблем. Сначала я создаю типы вспомогательных объектов и вложенных таблиц (за подробностями обращайтесь к файлу employees_merge_setup. sql), затем объявляю функцию в заголовке пакета:


CREATE PACKAGE employee_pkg AS

   c_default_limit CONSTANT PLS_INTEGER := 100;

   TYPE employee_rct IS REF CURSOR RETURN employees_staging%ROWTYPE;
   TYPE employee_aat IS TABLE OF employees_staging%ROWTYPE 
      INDEX BY PLS_INTEGER;
   <...>

   FUNCTION pipe_employees(
            p_source_data IN employee_pkg.employee_rct
            p_limit_size IN PLS_INTEGER DEFAULT employee_pkg.c_default_limit 
            ) RETURN employee_ntt 
              PIPELINED
              PARALLEL_ENABLE (PARTITION p_source_data BY ANY);
END employee_pkg;

Я включаю для конвейерной функции поддержку параллельного режима и использую схему группировки ANY, как и прежде. Реализация функции выглядит так:


   FUNCTION pipe_employees(
            p_source_data IN employee_pkg.employee_rct,
            p_limit_size IN PLS_INTEGER DEFAULT employee_pkg.c_default_limit 
            ) RETURN employee_ntt 
              PIPELINED
              PARALLEL_ENABLE (PARTITION p_source_data BY ANY) IS 
      aa_source_data employee_pkg.employee_aat;
   BEGIN
      LOOP
         FETCH p_source_data BULK COLLECT INTO aa_source_data LIMIT p_limit_size;
         EXIT WHEN aa_source_data.COUNT = 0;
         FOR i IN 1 .. aa_source_data.COUNT LOOP 
            PIPE ROW (
               employee_ot( aa_source_data(i).employee_id,
                            <snip>
                            SYSDATE ));
         END LOOP;
      END LOOP;
      CLOSE p_source_data;
      RETURN;
END pipe_employees;

Функция просто извлекает исходные данные в массив и передает их в правильном формате. Теперь ее можно использовать в команде MERGE, упакованной в процедуре пакета employee_pkg:


PROCEDURE merge_employees IS
BEGIN
   EXECUTE IMMEDIATE 'ALTER SESSION ENABLE PARALLEL DML';
   MERGE /*+ PARALLEL(e, 4) */
      INTO employees e 
      USING TABLE(
               employee_pkg.pipe_employees(
                  CURSOR(SELECT /*+ PARALLEL(es, 4) */ *
                         FROM employees_staging es))) s 
      ON (e.employee_id = s.employee_id)
   WHEN MATCHED THEN 
      UPDATE
      SET <snip>
   WHEN NOT MATCHED THEN 
      INSERT ( <snip> )
      VALUES ( <snip> );
END merge_employees;

Команда SQL MERGE с параллельной конвейерной функцией сокращает время загрузки более чем на 50% до 21 секунды. Безусловно, использование параллельных конвейерных функций как источника строк для операций SQL может стать полезным инструментом оптимизации крупномасштабной загрузки данных.

 

Параллельные конвейерные функции при асинхронной выгрузке данных

До настоящего момента я продемонстрировал два типа загрузки данных, для которых переход на параллельную конвейерную функцию принес очевидную пользу. Параллельным режимом конвейерных функций также можно воспользоваться для выгрузки данных (я еще не видел ни одной крупной корпоративной системы хранения данных, которая бы не извлекала данные для передачи в другие системы).

 

Типичная программа извлечения данных

Представьте следующую ситуацию: я ежедневно извлекаю все данные своей биржевой торговли (хранимые в таблице tickertable) для передачи в контрольное подразделение, которое рассчитывает получить неструктурированный файл с разделителями. Для этого я пишу простую программу выгрузки данных из курсора:


PROCEDURE legacy_unload(
          p_source IN SYS_REFCURSOR, 
          p_filename IN VARCHAR2, 
          p_directory IN VARCHAR2,
          p_limit_size IN PLS_INTEGER DEFAULT unload_pkg.c_default_limit
          ) IS
   TYPE row_aat IS TABLE OF VARCHAR2(32767)
      INDEX BY PLS_INTEGER; 
   aa_rows row_aat;
   v_name VARCHAR2(128) := p_filename || '.txt'; 
   v_file UTL_FILE.FILE_TYPE;
BEGIN
   v_file := UTL_FILE.FOPEN( p_directory, v_name, 'w', c_maxline );
   LOOP
      FETCH p_source BULK COLLECT INTO aa_rows LIMIT p_limit_size;
      EXIT WHEN aa_rows.COUNT = 0;
      FOR i IN 1 .. aa_rows.COUNT LOOP
         UTL_FILE.PUT_LINE(v_file, aa_rows(i));
      END LOOP;
   END LOOP;
   CLOSE p_source;
   UTL_FILE.FCLOSE(v_file);
END legacy_unload;

Я просто перебираю курсор, переданный в параметре, используя выборку в массив с размером 100, и записываю каждую группу записей в приемный файл с использованием UTL_FILE. Курсор-источник имеет всего один столбец — при подготовке курсора исходные столбцы объединяются конкатенацией с разделителями.

В ходе тестирования выгрузка 1 миллиона строк tickertable в неструктурированный файл заняла всего 24 секунды (я заранее несколько раз просканировал tickertable, чтобы снизить влияние физического ввода/вывода). Однако средняя длина строки tickertable составляла всего 25 байт, поэтому выгрузка происходила очень быстро. Коммерческие системы записывают существенно больший объем данных (как по длине строки, так и по числу строк), и выгрузка может занять десятки минут.

 

Выгрузка данных с использованием параллельной конвейерной функции

Если этот сценарий вам знаком по вашим системам, рассмотрите возможность оптимизации за счет использования параллельных конвейерных функций. Если проанализировать приведенный пример старого кода, все манипуляции с данными можно разместить в конвейерной функции (так как операции DML отсутствуют). Так почему бы не взять логику выборки из курсора и операции с UTL_FILE и не поместить их в параллельную конвейерную функцию? Это позволит существенно ускорить выгрузку данных в несколько файлов за счет применения параллельных запросов Oracle.

Конечно, конвейерные функции обычно возвращают данные в специфическом порядке, но в данном случае строки записываются в файл, и возвращать их клиенту не нужно. Вместо этого я возвращаю по одной строке на параллельный процесс с простейшими метаданными для описания сеансовой информации и количества извлеченных строк. При этом используются следующие вспомогательные типы:


CREATE TYPE unload_ot AS OBJECT ( 
file_name VARCHAR2(128)
, no_records NUMBER 
, session_id NUMBER );
CREATE TYPE unload_ntt AS TABLE OF unload_ot;

Моя реализация базируется на унаследованной обработке с добавлением некоторой дополнительной настройки, необходимой для возвращения метаданных:


1   FUNCTION parallel_unload(
2            p_source     IN   SYS_REFCURSOR,
3            p_filename   IN   VARCHAR2,
4            p_directory  IN   VARCHAR2,
5            p_limit_size IN   PLS_INTEGER   DEFAULT unload_pkg.c_default_limit
6            )
7      RETURN unload_ntt
8      PIPELINED PARALLEL_ENABLE (PARTITION p_source BY ANY) AS
9      aa_rows	row_aat;
10     v_sid	NUMBER := SYS_CONTEXT('USERENV','SID');
11     v_name	VARCHAR2(128) := p_filename   ||  '_'  ||  v_sid || '.txt';
12     v_file	UTL_FILE.FILE_TYPE;
13     v_lines	PLS_INTEGER;
14   BEGIN
15     v_file := UTL_FILE.FOPEN(p_directory, v_name, 'w', c_maxline);
16     LOOP
17        FETCH p_source BULK COLLECT INTO aa_rows LIMIT p_limit_size;
18        EXIT WHEN aa_rows.COUNT = 0;
19        FOR i IN 1 .. aa rows.COUNT LOOP
20           UTL_FILE.PUT_LINE(v_file, aa_rows(i));
21        END LOOP;
22     END LOOP;
23     v_lines := p_source%ROWCOUNT;
24     CLOSE p_source;
25     UTL_FILE.FCLOSE(v_file);
26     PIPE ROW (unload_ot(v_name, v_lines, v_sid));
27     RETURN;
28   END parallel_unload;

 

Ключевые аспекты кода этой функции представлены в следующей таблице.

 ANY

 

Я с минимальными усилиями включаю параллельный режим выгрузки данных, используя конвейерную функцию как асинхронный механизм разбиения. Теперь посмотрим, как вызвать новую версию:


SELECT *
FROM TABLE(
        unload_pkg.parallel_unload(
           p_source => CURSOR(SELECT /*+ PARALLEL(t, 4) */
                                     ticker     || ',' ||
                                     price_type || ',' ||
                                     price      || ',' ||
                                     TO_CHAR(price_date,'YYYYMMDDHH24MISS')
                              FROM tickertable t),
           p_filename => 'tickertable',
           p_directory => 'DIR' ));
Тестовый вывод из SQL*Plus:

FILE_NAME                      NO_RECORDS SESSION_ID
------------------------------ ---------- ----------
tickertable_144.txt                260788        144
tickertable_142.txt                252342        142
tickertable_127.txt                233765        127
tickertable_112.txt                253105        112

4 rows selected.

Elapsed: 00:00:12.21

В моей тестовой системе с четырьмя параллельными процессами время обработки сократилось примерно вдвое. Помните, что для небольших интервалов, как в этом примере, затраты на инициализацию параллельного запуска могут повлиять на время обработки. Если извлечение данных занимает несколько минут и более, потенциальная экономия может быть существенно выше.

Этот способ оптимизации можно и дальше улучшить, «оптимизируя» вызовы UTL_FILE через механизм буферизации. Пример реализации приведен в функции PARALLEL_UNLOAD_BUFFERED в файле parallel_unload_setup.sql. Вместо того чтобы выводить каждую строку в файл немедленно, я присоединяю строки к большому буферу VARCHAR2 (также можно было использовать коллекцию) и периодически сбрасывать его содержимое в файл. Сокращение количества вызовов UTL_FILE в моей системе уменьшило время выгрузки почти наполовину — всего до 7 секунд.

  

Влияние группировки и режима потоковой передачи в параллельных конвейерных функциях

Все примеры параллельных конвейерных функций, приводившиеся до настоящего момента, использовали схему группировки ANY, потому что между строками исходных данных нет зависимостей. Некоторые параметры группировки и потоковой передачи управляют способом распределения входных данных и их упорядочения в параллельных процессах. Напомню:

PARTITION p_cursor BY ANY
PARTITION p_cursor BY RANGE(столбцы_курсора)
PARTITION p_cursor BY HASH(столбцы_курсора)
CLUSTER p_cursor BY (столбцы_курсора)
ORDER p_cursor BY (столбцы_курсора)

Выбор режима зависит от требований к обработке данных. Например, если вы должны позаботиться о том, чтобы все заказы конкретного клиента обрабатывались вместе, но в хронологическом порядке, используйте группировку HASH с режимом ORDER. Чтобы все данные обрабатывались в порядке наступления событий, используйте комбинацию RANGE/ORDER.

 

Относительная производительность разных комбинаций

Характеристики производительности этих режимов зависят от предполагаемого порядка сортировки. В следующей таблице приведены данные о времени, необходимом для передачи 1 миллиона строк tickertable через параллельную конвейерную функцию (с DOP 4) с использованием разных параметров группировки и потоковой передачи.

Режим группировки Режим потоковой передачи Затраченное время
ANY - 5.37
ANY ORDER 8.06
ANY CLUSTER 9.58
HASH - 7.48
HASH ORDER 7.84
HASH CLUSTER 8.10
RANGE - 9.84
RANGE ORDER 10.59.
RANGE CLUSTER 10.90


Как и следовало ожидать, группировки ANY и HASH показывают сравнимые результаты (хотя неупорядоченный режим ANY явно оказывается самым быстрым), но механизм RANGE значительно медленнее. Это вполне логично, потому что исходные данные должны быть упорядочены перед тем, как база данных сможет распределить их. В самих параллельных процессах упорядочение выполняется быстрее, чем группировка для всех режимов (возможно, кого-то этот результат удивит, потому что группировке не нужно упорядочивать весь набор данных), Впрочем, в вашем случае результаты могут быть иными.

 

Группировка с асимметричными данными

Еще один фактор, который следует принять во внимание при группировке — распределение нагрузки между параллельными процессами. Режимы ANY и HASH приводят к относительно равномерному распределению данных между параллельными процессами независимо от количества строк в источнике. Тем не менее в зависимости от характеристик данных режим RANGE может привести к неравномерному распределению, особенно если значения столбцов разбиения асимметричны. Если один параллельный процесс получает слишком большую часть данных, это может свести к нулю все преимущества параллельных конвейерных функций.

Во всех моих вызовах конвейерных функций передается параметр REF CURSOR, получаемый при помощи функции CURSOR(SELECT...). Альтернативное решение — подготовка переменной REF CURSOR с использованием конструкции OPEN ref cursor FOR... и передача этой переменной вместо вызова CURSOR(SELECT...). Если вы выберете этот вариант, остерегайтесь ошибки 5349930! При использовании параллельных конвейерных функций эта ошибка может привести к неожиданному аварийному завершению параллельного процесса с исключением ORA-01008.

 

Конвейерные функции и затратный оптимизатор

Примеры этого блога демонстрируют применение конвейерных функций как простых источников, генерирующих данные для сценариев загрузки и выгрузки. Тем не менее в какой-то момент вам может потребоваться объединить конвейерную функцию с другим источником строк (например, таблица, представление или промежуточный вывод других объединений в плане выполнения SQL). Статистика источника строк (мощность, распределение данных, NULL и т. д.) играет важную роль для построения эффективного плана выполнения, но в случае конвейерных функций (а на самом деле любых табличных функций) у затратного оптимизатора (CBO, Cost-Based Optimizer) не слишком много информации для работы.

 

Эвристика мощности для конвейерных табличных функций

В Oracle Database 11g Release 1 и более ранних версиях затратный оптимизатор применяет эвристику мощности (cardinality) к конвейерным и табличным функциям в командах SQL, что иногда приводит к неэффективным планам выполнения. Мощность по умолчанию зависит от значения параметра инициализации DB_BL0CK_SIZE, но для базы данных со стандартным размером блока 8 Кбайт Oracle использует эвристику 8168 строк. Это легко продемонстрировать при помощи конвейерной функции, которая передает подмножество столбцов из таблицы employees. Используя Autotrace в SQL* Plus для генерирования плана выполнения, я получаю следующий результат:


SQL> SELECT *
   2 FROM TABLE(pipe_employees) e;
----------------------------------------------------------
Plan hash value: 1802204150

--------------------------------------------------------------------
| Id | Operation                       | Name               | Rows |
--------------------------------------------------------------------
| 0 | SELECT STATEMENT                 |                    | 8168 |
| 1 | COLLECTION ITERATOR PICKLER FETCH| PIPE_EMPLOYEES     |      |
--------------------------------------------------------------------

Конвейерная функция возвращает 50 000 строк, поэтому при объединении функции с таблицей departments возникает риск получить неоптимальный план:


SQL>   SELECT	*
  2    FROM	departments             d
  3    ,	TABLE(pipe_employees)   e
  4    WHERE	d.department_id	= e.department_id;

Execution Plan
----------------------------------------------------------
Plan hash value: 4098497386

----------------------------------------------------------------------
| Id | Operation                          | Name               | Rows |
----------------------------------------------------------------------
| 0 | SELECT STATEMENT                    |                    | 8168 |
| 1 |  MERGE JOIN                         |                    | 8168 |
| 2 |   TABLE ACCESS BY INDEX ROWID       | DEPARTMENTS        |   27 |
| 3 |    INDEX FULL SCAN                  | DEPT_ID_PK         |   27 |
|*4 |   SORT JOIN                         |                    | 8168 |
| 5 |    COLLECTION ITERATOR PICKLER FETCH| PIPE_EMPLOYEES     |      |
----------------------------------------------------------------------

Как прогнозировалось выше, такой план выполнения наверняка будет неоптимальным; вряд ли объединение сортировкой со слиянием будет более эффективным, чем хеш- объединение в этом сценарии. Итак, как же повлиять на работу оптимизатора? В данном примере я могу использовать простые рекомендации (такие, как LEADING и USE_HASH для переопределения решения, принятого оптимизатором на основании затрат) и обеспечить хеш-объединение между таблицей и конвейерной функцией. Однако для более сложных команд SQL трудно предоставить все рекомендации, необходимые для «фиксации» плана выполнения. Часто бывает лучше предоставить CBO более полную статистику для принятия решений. Это можно сделать двумя способами:

Оба метода будут продемонстрированы для функции pipe_employees.

 

Использование динамической выборки конвейерных функций

Динамическая выборка — чрезвычайно полезная функция, которая позволяет оптимизатору сформировать небольшую статическую выборку из одного или нескольких объектов в фазе разбора. Можете использовать динамическую выборку, если вы не собрали статистику по всем таблицам запроса или же при использовании временных объектов (например, глобальных временных таблиц). Начиная с версии 11.1.0.7 база данных Oracle может использовать динамическую выборку для таблиц или конвейерных функций.

Чтобы увидеть, на что влияет эта возможность, я повторю предыдущий запрос с включением рекомендации DYNAMIC_SAMPLING для функции pipe_employees:


SQL> SELECT /*+ DYNAMIC_SAMPLING(e 5) */
  2         *
  3 FROM    departments           d
  4 ,       TABLE(pipe_employees) e
  5 WHERE   d.department_id = e.department_id;

Execution Plan
----------------------------------------------------------
Plan hash value: 815920909

---------------------------------------------------------------------
| Id | Operation                       | Name               | Rows  |
---------------------------------------------------------------------
| 0 | SELECT STATEMENT                 |                    | 50000 |
|*1 | HASH JOIN                        |                    | 50000 |
| 2 | TABLE ACCESS FULL                | DEPARTMENTS        |    27 |
| 3 | COLLECTION ITERATOR PICKLER FETCH| PIPE_EMPLOYEES     |       |
---------------------------------------------------------------------

На этот раз CBO правильно вычисляет 50 000 строк, которые возвращает моя функция, и генерирует более подходящий план. Обратите внимание: я использую термин «вычисляет», а не «оценивает». Дело в том, что в версии 11.1.0.7 и выше оптимизатор осуществляет стопроцентную выборку данных таблицы или конвейерной функции независимо от используемого уровня динамической выборки. Я использовал уровень 5, но с таким же успехом мог использовать любой уровень от 2 до 10 — результат остался бы прежним. Конечно, это означает, что динамическая выборка может быть сопряжена с высокими затратами вычислительных ресурсов или времени, если она применяется в запросах с большими объемами данных или долго выполняемыми конвейерными функциями.

 

Передача статистики мощности оптимизатору

Единственная информация, которую можно явно передать оптимизатору для конвейерной функции, — это мощность. Как это часто бывает в Oracle, это можно сделать несколькими способами:

 

Extensible Optimizer и мощность конвейерной функции

Расширение оптимизатора является частью Oracle-реализации Data Cartridge —набора интерфейсов, позволяющих расширять встроенную функциональность базы данных пользовательским кодом и алгоритмами (обычно хранящимися в объектных типах). Для конвейерных и табличных функций база данных предоставляет интерфейс, предназначенный специально для оценок мощности. В следующем простом примере для функции pipe_employees я связываю конвейерную функцию с особым объектным типом, который передает CBO информацию о мощности функции. Спецификация функции pipe_employees выглядит так:


FUNCTION pipe_employees(
         p_cardinality IN INTEGER DEFAULT 1 
         ) RETURN employee_ntt PIPELINED

Обратите внимание на параметр p_cardinality. В теле pipe_employees этот параметр не используется; в нем я буду сообщать CBO количество строк, которое, как ожидается, будет возвращать моя функция. А поскольку для Extensible Optimizer это должно делаться через интерфейсный тип, сначала я создам спецификацию типа объектного типа для интерфейса:


1  CREATE TYPE pipelined_stats_ot AS OBJECT (
2
3     dummy INTEGER,
4
5     STATIC FUNCTION ODCIGetInterfaces (
6                     p_interfaces OUT SYS.ODCIObjectList
7                     ) RETURN NUMBER,
8
9     STATIC FUNCTION ODCIStatsTableFunction (
10                    p_function IN SYS.ODCIFuncInfo,
11                    p_stats OUT SYS.ODCITabFuncStats,
12                    p_args IN SYS.ODCIArgDescList,
13                    p_cardinality IN INTEGER
14                    ) RETURN NUMBER
15  );

Важные моменты этой спецификации отмечены в следующей таблице.

Строки Описание
3 Все объектные типы должны иметь хотя бы один атрибут, поэтому я включил фиктивный атрибут с именем dummy, хотя он и не нужен в этом примере
5 и 9 Методы являются частью интерфейса Extensible Optimizer. Также доступно несколько других методов, но эти два необходимы для реализации интерфейса мощности для моей конвейерной функции
10-12 Эти параметры ODCIStatsTableFunction являются обязательными. Позиции и типы данных параметров жестко фиксированы (в отличие от их имен)
13 Все параметры конвейерной или табличной функции должны быть повторены в ассоциированном статистическом типе. В моем примере pipe_employees имеет единственный параметр p_cardinality, который также должен быть включен в сигнатуру ODCIStatsTableFunction


Мой алгоритм мощности реализуется в теле типа следующим образом:


1  CREATE TYPE BODY pipelined_stats_ot AS
2
3     STATIC FUNCTION ODCIGetInterfaces (
4                     p_interfaces OUT SYS.ODCIObjectList
5                     ) RETURN NUMBER IS
6     BEGIN
7        p_interfaces := SYS.ODCIObjectList(
8                           SYS.ODCIObject ('SYS', 'ODCISTATS2')
9                           );
10       RETURN ODCIConst.success;
11    END ODCIGetInterfaces;
12
13    STATIC FUNCTION ODCIStatsTableFunction (
14                    p_function IN SYS.ODCIFuncInfo,
15                    p_stats OUT SYS.ODCITabFuncStats,
16                    p_args IN SYS.ODCIArgDescList,
17                    p_cardinality IN INTEGER
18                    ) RETURN NUMBER IS
19    BEGIN
20       p_stats := SYS.ODCITabFuncStats(NULL);
21       p_stats.num_rows := p_cardinality;
22       RETURN ODCIConst.success;
23    END ODCIStatsTableFunction;
24
25   END;

Эта реализация интерфейса очень проста; ключевые ее аспекты перечислены в следующей таблице.

Строки Описание
3-11 Обязательное присваивание, необходимое для баз данных Oracle. Никакая пользовательская логика здесь не нужна
20-21 Мой алгоритм мощности. Для передачи CBO информации о мощности функции используется OUT-параметр p_stats. Статистический тип будет содержать ссылку на значение, переданное в параметре p_cardinality моей функции pipe_employees. Во время оптимизации
запроса CBO вызовет метод ODCIStatsTableFunction для получения значения параметра
p_stats и использует его в своих вычислениях


Итак, у меня имеется конвейерная функция и статистический тип. Остается лишь связать два объекта командой SQL ASSOCIATE STATISTICS. Именно эта связь делает возможным только что описанное «волшебство»:


ASSOCIATE STATISTICS WITH FUNCTIONS pipe_employees USING pipelined_stats_ot;

Теперь все готово к тестированию. Я повторю предыдущий запрос со включением количества строк, которое, как ожидается, вернет моя конвейерная функция (50 000):


SQL> SELECT *
  2 FROM   departments d
  3 ,      TABLE(pipe_employees(50000)) e
  4 WHERE  d.department_id = e.department_id;

Execution Plan
----------------------------------------------------------
Plan hash value: 815920909

---------------------------------------------------------------------
| Id | Operation                       | Name               | Rows  |
---------------------------------------------------------------------
| 0 | SELECT STATEMENT                 |                    | 50000 |
|*1 | HASH JOIN                        |                    | 50000 |
| 2 | TABLE ACCESS FULL                | DEPARTMENTS        |    27 |
| 3 | COLLECTION ITERATOR PICKLER FETCH| PIPE_EMPLOYEES     |       |
---------------------------------------------------------------------

На этот раз CBO получает и использует предполагаемую мощность, и план выполнения выглядит именно так, как я ожидал. Мне даже не пришлось использовать рекомендации!

В большинстве случаев при точных входных данных CBO принимает хорошие решения, как доказывает этот пример. Конечно, мы также видим в действии «волшебство» Extensible Optimizer. Я передаю предполагаемую мощность в параметре функции pipe_employees, а в фазе оптимизации база данных обращается к этому параметру через сопутствующий статистический тип и использует его для правильного назначения мощности источника записей (с использованием моего алгоритма). На мой взгляд, это довольно впечатляющая возможность.

И последнее: вам определенно стоит выбрать свой способ систематической передачи информации о мощности конвейерных функций. Я продемонстрировал лишь одну из возможностей — собственно, мне следовало бы добавить параметр p_cardinality во все конвейерные функции и связать их с интерфейсным типом pipelined_statistics_ot. Алгоритмы, используемые в интерфейсных типах, могут быть настолько сложными, насколько потребуется. Они могут зависеть от других параметров функций (например, вы можете возвращать разные мощности в зависимости от значений параметров). А может, вы предпочтете хранить предполагаемые значения мощности в таблице, к которой интерфейсный тип будет обращаться с запросом. Помните, что эту возможность можно использовать многими разными способами.

 

Конвейерные функции при загрузке сложных данных

В моем примере stockpivot каждая входная строка преобразовывалась в две выходные строки с такой же структурой записи. Остальные примеры передавали одну выходную строку с неизменной структурой записи. Но преобразования и загрузка не всегда проходят так просто. Очень часто из одной промежуточной таблицы данные загружаются сразу в несколько таблиц — могут ли конвейерные функции пригодиться и в такой ситуации?

Да, могут; многотабличная загрузка также может оптимизироваться при помощи конвейерных функций. Сама функция может передавать столько разных типов записей, сколько потребуется, и условные или безусловные многотабличные вставки могут заполнять соответствующие таблицы нужными атрибутами.

 

Один источник, два приемника

Рассмотрим пример загрузки данных клиентов и адресов из одного файла. Предположим, одна запись клиента может содержать до трех адресов. Это означает, что для каждого клиента генерируется до четырех записей, например:

CUSTOMER_ID LAST_NAME  ADDRESS_ID STREET_ADDRESS                 PRIMARY
----------- ---------- ---------- ------------------------------ -------
       1060 Kelley          60455 7310 Breathing Street          Y
       1060 Kelley         119885 7310 Breathing Street          N
   103317 Anderson          65045 57 Aguadilla Drive             Y
   103317 Anderson          65518 117 North Union Avenue         N
   103317 Anderson          61112 27 South Las Vegas Boulevard   N

 

Лишние подробности удалены, но из этого примера видно, что у Келли в системе зарегистрированы два адреса, а у Андерсона — три. В моем сценарии загрузки данных для каждого клиента в таблицу клиентов должна добавляться одна запись, а в таблицу адресов — все записи адресов, которые связаны с клиентом.

 

Передача нескольких типов записей из конвейерных функций

Как конвейерная функция может одновременно генерировать записи клиентов и адресов? Как ни странно, у этой задачи есть два относительно простых решения:

 

Объектно-реляционные возможности

Начнем с первого метода, поскольку он обеспечивает самое элегантное решение. Сначала необходимо создать четыре типа для описания данных:

Для демонстрационных целей я ограничился небольшим количеством атрибутов. В нашем примере будут использоваться следующие типы:


-- Супертип...
CREATE TYPE customer_ot AS OBJECT 
( customer_id NUMBER 
) NOT FINAL;

-- Коллекция объектов супертипа...
CREATE TYPE customer_ntt AS TABLE OF customer_ot;

-- Подтип с информацией о клиенте...
CREATE TYPE customer_detail_ot UNDER customer_ot 
( first_name VARCHAR2(20)
, last_name VARCHAR2(60)
, birth_date DATE 
) FINAL;

-- Подтип с информацией об адресе...
CREATE TYPE address_detail_ot UNDER customer_ot 
( address_id     NUMBER
, primary        VARCHAR2(1)
, street_address VARCHAR2(40)
, postal_code    VARCHAR2(10)
) FINAL;

В двух словах об объектных типах: поддержка заменяемости типов в Oracle означает, что я могу создавать строки как типа customer_detail_ot, так и address_detail_ot, и использовать их везде, где ожидается супертип customer_ot supertype. Таким образом, если я создаю конвейерную функцию для передачи коллекции супертипа, это означает, что я также могу передавать строки любого из подтипов. Это всего лишь один пример того, какие простые и элегантные решения можно создавать с использованием объектноориентированных иерархий типов.

 

Универсальная конвейерная функция

Взгляните, как выглядит тело конвейерной функции, а затем я объясню основные концепции:


1   FUNCTION customer_transform_multi(
2            p_source IN customer_staging_rct,
3            p_limit_size IN PLS_INTEGER DEFAULT customer_pkg.c_default_limit
4            )
5      RETURN customer_ntt
6      PIPELINED
7      PARALLEL_ENABLE (PARTITION p_source BY HASH(customer_id))
8      ORDER p_source BY (customer_id, address_id) IS
9
10      aa_source customer_staging_aat;
11      v_customer_id customer_staging.customer_id%TYPE := ?1;
12      /* Значение по умолчанию должно быть отлично от null */
13  BEGIN
14     LOOP
15        FETCH p_source BULK COLLECT INTO aa_source LIMIT p_limit_size;
16        EXIT WHEN aa_source.COUNT = 0;
17
18        FOR i IN 1 .. aa_source.COUNT LOOP
19
20          /* Передается только первый экземпляр описания клиента... */
21          IF aa_source(i).customer_id != v_customer_id THEN
22             PIPE ROW ( customer_detail_ot( aa_source(i).customer_id,
23                                            aa_source(i).first_name,
24                                            aa_source(i).last_name,
25                                            aa_source(i).birth_date ));
26          END IF;
27
28          PIPE ROW( address_detail_ot( aa_source(i).customer_id,
29                                       aa_source(i).address_id,
30                                       aa_source(i).primary,
31                                       aa_source(i).street_address,
32                                       aa_source(i).postal_code ));
33
34          /* Сохранение идентификатора для логики "прерывания"... */
35          v_customer_id := aa_source(i).customer_id;
36
37       END LOOP;
38    END LOOP;
39    CLOSE p_source;
40    RETURN;
41 END customer_transform_multi;

У этой функции включен параллельный режим, и она обрабатывает исходные данные в массивах для достижения максимальной производительности. Основные концепции, относящиеся к универсальности типов, описаны в следующей таблице.

Строки Описание
5 Функция возвращает коллекцию с элементами супертипа, которые могут использоваться для передачи подтипов
7-8 В данных присутствуют зависимости, поэтому используется хеш группировка с упорядоченной потоковой передачей. Записи каждого клиента должны обрабатываться вместе, потому что атрибуты клиента берутся только из первой записи
21-26 Если это первая запись конкретного клиента, передается строка типа CUSTOMER_DETAIL_OT.
Для каждого клиента передается только одна запись с информацией о клиенте
28-32 Для каждой записи извлекается адресная информация и передается строка типа ADDRESS_DETAIL_OT

 

Универсальная конвейерная функция в запросах

Итак, у меня имеется одна функция, способная генерировать строки двух разных типов и структур. Попробуем запросить несколько строк из этой функции в SQL*Plus:


SQL> SELECT *
  2  FROM TABLE(
  3          customer_pkg.customer_transform_multi(
  4             CURSOR( SELECT * FROM customer_staging ) ) ) nt
  5  WHERE ROWNUM <= 5;

CUSTOMER ID
-----------
          1
          1
          1
          1
          2

Сюрприз — где данные? Хотя я использовал SELECT *, в результатах присутствует только столбец CUST0MER_ID. Причина проста: согласно определению, моя функция возвращает коллекцию с элементами супертипа customer_ot, который имеет всего один атрибут. Без явного программирования диапазона подтипов, возвращаемых функцией, база данных не покажет дополнительные атрибуты. Более того, при попытке обратиться к любому из атрибутов подтипов в приведенном формате запроса база данных инициирует исключение ORA-00904 (недействительный идентификатор).

К счастью, Oracle предоставляет два взаимозаменяемых способа обращения к экземплярам объектных типов: функцию VALUE и псевдостолбец OBJECT_VALUE. Посмотрим, как они работают:


SQL> SELECT VALUE(nt) AS object_instance -- также можно использовать nt.OBJECT_VALUE
  2  FROM TABLE(
  3          customer_pkg.customer_transform_multi(
  4             CURSOR( SELECT * FROM customer_staging ) ) ) nt
  5  WHERE ROWNUM <= 5;

0BJECT_INSTANCE(CUST0MER_ID)
-----------------------------------------------------------------------------------------
CUSTOMER_DETAIL_OT(1, 'Abigail', 'Kessel', '31/03/1949')
ADDRESS_DETAIL_OT(1, 12135, 'N', '37 North Coshocton Street', '78247') ADDRESS_DETAIL_OT(1, 12136, 'N', '47 East Sagadahoc Road', '90285') 
ADDRESS_DETAIL_OT(1, 12156, 'Y', '7 South 3rd Circle', '30828')
CUSTOMER_DETAIL_OT(2, 'Anne', 'KOCH', '23/09/1949')

Уже лучше. Теперь данные выглядят так, как их возвращает конвейерная функция, и я выполню с ними две операции. Сначала я определю тип каждой записи конструкцией IS OF, а затем использую функцию TREAT для понижения каждой записи к ее фактическому подтипу (до этого база данных считает, что мои данные относятся к супертипу, и не позволит обратиться к их атрибутам).

Запрос выглядит примерно так:


SQL> SELECT CASE
  2            WHEN VALUE(nt) IS OF TYPE (customer_detail_ot)
  3            THEN 'C'
  4            ELSE 'A'
  5         END                                    AS record_type
  6  ,      TREAT(VALUE(nt) AS customer_detail_ot) AS cust_rec
  7  ,      TREAT(VALUE(nt) AS address_detail_ot)  AS addr_rec
  8  FROM   TABLE(
  9            customer_pkg.customer_transform_multi(
 10               CURSOR( SELECT * FROM customer_staging ) ) ) nt
 11   WHERE  ROWNUM <= 5;

RECORD_TYPE CUST_REC                       ADDR_REC
----------- ------------------------------ ------------------------------
C           CUSTOMER_DETAIL_OT(1, 'Abigail
            ', 'Kessel', '31/03/1949')
A                                          ADDRESS_DETAIL_OT(1, 12135, 'N
                                           ', '37 North Coshocton Street'
                                           , '78247')
A                                          ADDRESS_DETAIL_OT(1, 12136, 'N
                                           ', '47 East Sagadahoc Road', '
                                           90285')
A                                          ADDRESS_DETAIL_OT(1, 12156, 'Y
                                           ', '7 South 3rd Circle', '3082
                                           8')
C           CUSTOMER_DETAIL_OT(2,'Anne',
            'KOCH', '23/09/1949')

Теперь данные имеют формат правильного подтипа, что позволяет мне обратиться к их атрибутам. Для этого предыдущий запрос упаковывается во встроенное представление, а в обращениях к атрибутам используется «точечная» запись:


SELECT ilv.record_type
,      NVL(ilv.cust_rec.customer_id,
           ilv.addr_rec.customer_id) AS customer_id
,      ilv.cust_rec.first_name       AS first_name
,      ilv.cust_rec.last_name        AS last_name
       <snip>
,      ilv.addr_rec.postal_code      AS postal_code
FROM  (
       SELECT CASE...
              <...>
       FROM   TABLE(
                 customer_pkg.customer_transform_multi(
                     CURSOR( SELECT * FROM customer_staging ) ) ) nt
       ) ilv;

 

Загрузка нескольких таблиц из универсальной конвейерной функции

Я удалил несколько строк из предыдущего примера, но закономерность уже понятна. Теперь у меня имеются все элементы, необходимые для многотабличной вставки в таблицы клиентов и адресов. Код загрузки данных выглядит так:


      INSERT FIRST
         WHEN record_type = 'C'
         THEN
            INTO customers
            VALUES (customer_id, first_name, last_name, birth_date)
         WHEN record_type = 'A'
         THEN
            INTO addresses
            VALUES (address_id, customer_id, primary, street_address,
                    postal_code)
      SELECT ilv.record_type
,            NVL(ilv.cust_rec.customer_id,
                 ilv.addr_rec.customer_id) AS customer_id
,            ilv.cust_rec.first_name AS first_name
,            ilv.cust_rec.last_name AS last_name
,            ilv.cust_rec.birth_date AS birth_date
,            ilv.addr_rec.address_id AS address_id
,            ilv.addr_rec.primary AS primary
,            ilv.addr_rec.street_address AS street_address
,            ilv.addr_rec.postal_code AS postal_code
FROM (
      SELECT CASE
                WHEN VALUE(nt) IS OF TYPE (customer_detail_ot)
                THEN 'C'
                ELSE 'A'
             END                                     AS record_type
      ,      TREAT(VALUE(nt) AS customer_detail_ot) AS cust_rec
      ,      TREAT(VALUE(nt) AS address_detail_ot) AS addr_rec
      FROM   TABLE(
                customer_pkg.customer_transform_multi(
                   CURSOR( SELECT * FROM customer_staging ))) nt
     ) ilv;

 

Команда INSERT FIRST позволяет выполнить сложную загрузку данных, использующую объектно-ориентированные возможности. Скорее всего, этот способ подойдет и вам.

 

Альтернативное решение

Также возможен другой путь: создание одной «широкой» объектной записи с передачей одной строки для каждого набора адресов. Следующее определение типа поможет понять, о чем я говорю, но полный код приведен в файлах multitype_setup.sql:


CREATE TYPE customer_address_ot AS OBJECT 
( customer_id	       NUMBER
, first_name	       VARCHAR2(20)
, last_name            VARCHAR2(60)
, birth_date	       DATE
, addr1_address_id     NUMBER 
, addr1_primary	       VARCHAR2(1)
, addr1_street_address VARCHAR2(40)
, addr1_postal_code    VARCHAR2(10)
, addr2_address_id     NUMBER 
, addr2_primary	       VARCHAR2(1)
, addr2_street_address VARCHAR2(40)
, addr2_postal_code    VARCHAR2(10)
, addr3_address_id     NUMBER 
, addr3_primary        VARCHAR2(1)
, addr3_street_address VARCHAR2(40)
, addr3_postal_code    VARCHAR2(10)
, CONSTRUCTOR FUNCTION customer_address_ot 
     RETURN SELF AS RESULT
);

Как видите, каждый из трех экземпляров адреса «денормализуется» на соответствующие атрибуты. Каждая строка, переданная из функции, трансформируется в четыре строки условной командой INSERT ALL. Синтаксис INSERT проще (а в этом конкретном примере быстрее) метода с заменяемыми типами. Выбор метода зависит от конкретных обстоятельств; однако следует учесть, что при возрастании количества атрибутов производительность денормализованного метода может ухудшиться. Впрочем, я успешно использовал этот метод для оптимизации загрузки данных, вставляющей до 9 записей в 4 таблицы для каждой финансовой операции в торговом приложении.

При использовании строк со многими столбцами (по аналогии с приведенным денормализованным примером) следует ожидать ухудшения производительности реализации с конвейерной функцией. Например, я проводил тестирование последовательной массовой конвейерной загрузки 50 000 строк по сравнению с построчной вставкой, использующей большое количество столбцов по 10 байт. В Oracle9i второе решение стало превосходить конвейерное уже на 50 столбцах. К счастью, во всех основных версиях Oracle Database 10g и Oracle Database 11g этот порог находится где-то в диапазоне от 100 до 150 столбцов.

 

В завершение о конвейерных функциях

При обсуждении конвейерных функций я привел несколько сценариев, в которых такие функции (последовательные или параллельные) могут повысить эффективность загрузки и извлечения данных. Некоторые из описанных методов пригодятся как средство оптимизации. Однако я не рекомендую переводить всю кодовую базу на конвейерные функции! Это специфический инструмент, который находит применение лишь в ограниченном подмножестве задач обработки данных. Но если вам понадобится реализовать сложные преобразования, слишком громоздкие для представления в SQL (обычно это аналитические функции, выражения CASE, подзапросы и даже пресловутая секция MODEL), инкапсуляция их в конвейерных функциях способна обеспечить существенный прирост эффективности.

 

Вас заинтересует / Intresting for you:

Встроенные методы коллекций PL...
Встроенные методы коллекций PL... 14863 просмотров sepia Tue, 29 Oct 2019, 09:54:01
Управление приложениями PL/SQL...
Управление приложениями PL/SQL... 4656 просмотров Stas Belkov Thu, 16 Jul 2020, 06:20:48
Работа с коллекциями PL/SQL на...
Работа с коллекциями PL/SQL на... 42725 просмотров sepia Tue, 29 Oct 2019, 10:38:40
Перегрузка подпрограмм: програ...
Перегрузка подпрограмм: програ... 6614 просмотров Денис Sun, 23 Sep 2018, 12:59:00
Печать
Войдите чтобы комментировать