Chapter 5 Loading the Warehouse Section 5.2.2 Oracle Change Data Capture CREATE USER oltp IDENTIFIED BY oltp DEFAULT TABLESPACE users TEMPORARY TABLESPACE temp QUOTA UNLIMITED ON users; GRANT connect, resource TO oltp ; CREATE USER oltpsubscr IDENTIFIED BY oltpsubscr DEFAULT TABLESPACE users TEMPORARY TABLESPACE temp QUOTA UNLIMITED ON users; GRANT connect, resource TO oltpsubscr ; CREATE USER oltppub IDENTIFIED BY oltppub QUOTA UNLIMITED ON SYSTEM QUOTA UNLIMITED ON SYSAUX; GRANT CREATE SESSION TO oltppub; GRANT CREATE TABLE TO oltppub; GRANT CREATE TABLESPACE TO oltppub; GRANT UNLIMITED TABLESPACE TO oltppub; GRANT SELECT_CATALOG_ROLE TO oltppub; GRANT EXECUTE_CATALOG_ROLE TO oltppub; GRANT CREATE SEQUENCE TO oltppub; GRANT CONNECT, RESOURCE, DBA TO oltppub; CREATE TABLE oltp.orders (order_id varchar2(8) NOT NULL, product_id varchar2(8) NOT NULL, customer_id varchar2(10) NOT NULL, purchase_date date NOT NULL, purchase_time number(4,0) NOT NULL, purchase_price number(6,2) NOT NULL, shipping_charge number(5,2) NOT NULL, today_special_offer varchar2(1) NOT NULL, sales_person_id varchar2(20) NOT NULL, payment_method varchar2(10) NOT NULL ) TABLESPACE users ; ----------------------------------------------------- Synchronous CDC BEGIN DBMS_CDC_PUBLISH.CREATE_CHANGE_SET (change_set_name =>'EASYDW_SCS', description => 'Synchronous Change set for EasyDW', change_source_name => 'SYNC_SOURCE' ); DBMS_CDC_PUBLISH.CREATE_CHANGE_TABLE ( owner => 'oltppub', change_table_name => 'ORDERS_SYNCH_CT', change_set_name => 'EASYDW_SCS', source_schema => 'oltp', source_table => 'ORDERS', column_type_list => 'order_id varchar2(8),product_id varchar2(8),' ||'customer_id varchar2(10), purchase_date date,' ||'purchase_time number(4,0),purchase_price number(6,2),' ||'shipping_charge number(5,2), ' ||'today_special_offer varchar2(1),' ||'sales_person varchar2(20), ' ||'payment_method varchar2(10)', capture_values => 'both', rs_id => 'y', row_id => 'n', user_id => 'n', timestamp => 'n', object_id => 'n', source_colmap => 'y', target_colmap => 'y', options_string => 'TABLESPACE USERS' ); END; / GRANT SELECT ON ORDERS_SYNCH_CT TO OLTPSUBSCR ; SQL> SELECT CHANGE_TABLE_NAME FROM CHANGE_TABLES; ----------------------------------------------------- Asynchronous CDC ALTER DATABASE FORCE LOGGING; ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; ALTER TABLE oltp.orders ADD SUPPLEMENTAL LOG GROUP log_group_orders (order_id,product_id,customer_id, purchase_date,purchase_time,purchase_price, shipping_charge,today_special_offer) ALWAYS; EXECUTE DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE (GRANTEE=>'oltppub'); EXECUTE DBMS_CAPTURE_ADM.PREPARE_TABLE_INSTANTIATION (TABLE_NAME=>'oltp.orders'); BEGIN DBMS_CDC_PUBLISH.CREATE_CHANGE_SET ( change_set_name => 'EASYDW_ACS', description => 'Asynchronous Change set for purchases info', change_source_name => 'HOTLOG_SOURCE', stop_on_ddl => 'y'); END; / BEGIN DBMS_CDC_PUBLISH.CREATE_CHANGE_TABLE ( owner => 'oltppub', change_table_name => 'ORDERS_ASYNCH_CT', change_set_name => 'EASYDW_ACS', source_schema => 'OLTP', source_table => 'ORDERS', column_type_list => 'order_id varchar2(8),product_id varchar2(8),' ||'customer_id varchar2(10), purchase_date date,' ||'purchase_time number(4,0),purchase_price number(6,2),' ||'shipping_charge number(5,2),today_special_offer varchar2(1),' ||'sales_person varchar2(20), payment_method varchar2(10)', capture_values => 'both', rs_id => 'y', row_id => 'n', user_id => 'n', timestamp => 'n', object_id => 'n', source_colmap => 'n', target_colmap => 'y', options_string => 'TABLESPACE USERS'); END; / GRANT SELECT ON ORDERS_ASYNCH_CT TO OLTPSUBSCR ; BEGIN DBMS_CDC_PUBLISH.ALTER_CHANGE_SET ( change_set_name => 'EASYDW_ACS', enable_capture => 'y' ); END; / ----------------------------------------------------- Subscribing to Change Data Accessing Asynchronous CDC: BEGIN DBMS_CDC_SUBSCRIBE.CREATE_SUBSCRIPTION (SUBSCRIPTION_NAME => 'ORDERS_SUB', CHANGE_SET_NAME => 'EASYDW_ACS', DESCRIPTION => 'Changes to orders table'); END; / Creating the subscriber view: BEGIN DBMS_CDC_SUBSCRIBE.SUBSCRIBE (SUBSCRIPTION_NAME => 'ORDERS_SUB', SOURCE_SCHEMA => 'oltp', SOURCE_TABLE => 'orders', COLUMN_LIST => 'order_id,product_id,' ||'customer_id, purchase_date,' ||'purchase_time,purchase_price,' ||'shipping_charge, today_special_offer,' ||'sales_person, payment_method', SUBSCRIBER_VIEW => 'ORDERS_VIEW' ); END; / Activating the subscription: EXECUTE DBMS_CDC_SUBSCRIBE.ACTIVATE_SUBSCRIPTION (SUBSCRIPTION_NAME => 'ORDERS_SUB'); Processing the Change Data: INSERT INTO oltp.orders (order_id,product_id,customer_id, purchase_date, purchase_time, purchase_price,shipping_charge, today_special_offer, sales_person_id,payment_method) VALUES ('123','SP1031', 'AB123495', to_date('01-JAN-2004', 'dd-mon-yyyy'), 1031,156.45,6.95,'N','SMITH','VISA'); INSERT INTO oltp.orders (order_id,product_id,customer_id, purchase_date, purchase_time,purchase_price,shipping_charge, today_special_offer, sales_person_id,payment_method) VALUES ('123','SP1031','AB123495', to_date('01-FEB-2004', 'dd-mon-yyyy'), 1031,156.45,6.95,'N','SMITH','VISA'); commit; Step 1 Extend the Window BEGIN DBMS_CDC_SUBSCRIBE.EXTEND_WINDOW(SUBSCRIPTION_NAME => 'ORDERS_SUB'); END; / Step 2 Select Data from the Subscriber View SELECT OPERATION$, CSCN$, COMMIT_TIMESTAMP$, RSID$, CUSTOMER_ID, ORDER_ID, PAYMENT_METHOD, PRODUCT_ID, PURCHASE_DATE, PURCHASE_PRICE, PURCHASE_TIME, SALES_PERSON, SHIPPING_CHARGE, TODAY_SPECIAL_OFFER FROM ORDERS_VIEW; Step 3 Purge the Window EXECUTE DBMS_CDC_SUBSCRIBE.PURGE_WINDOW (SUBSCRIPTION_NAME => 'ORDERS_SUB'); End the Subscription EXECUTE DBMS_CDC_SUBSCRIBE.DROP_SUBSCRIPTION (SUBSCRIPTION_NAME => 'ORDERS_SUB'); ----------------------------------------------------- Section 5.4.1 The Control File - Load product dimension LOAD DATA INFILE 'product.dat' append INTO TABLE product FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY "'" 5.4 Loading the Warehouse 185 (product_id, product_name, category, cost_price, sell_price, weight, shipping_charge, manufacturer, supplier) ----------------------------------------------------- Section 5.4.2 SQL*Loader Direct Path Load of a Single Partition Create the tablespace: CREATE TABLESPACE purchases_jan2005 DATAFILE 'C:\ORACLE\PRODUCT\10.1.0\ORADATA\EASYDW\PURCHASESJAN2005.f' SIZE 5M REUSE AUTOEXTEND ON DEFAULT STORAGE (INITIAL 64K NEXT 64K PCTINCREASE 0 MAXEXTENTS UNLIMITED); Add a partition: ALTER TABLE easydw.purchases ADD PARTITION purchases_jan2005 VALUES LESS THAN (TO_DATE('01-02-2005', 'DD-MM-YYYY')) PCTFREE 0 PCTUSED 99 STORAGE (INITIAL 64K NEXT 64K PCTINCREASE 0) TABLESPACE purchases_jan2005; Disable RI Constraints and Triggers: SQL> ALTER TABLE purchases DISABLE CONSTRAINT fk_time; SQL> ALTER TABLE purchases DISABLE CONSTRAINT fk_product_id; SQL> ALTER TABLE purchases DISABLE CONSTRAINT fk_customer_id; SELECT TABLE_NAME, CONSTRAINT_NAME, STATUS FROM USER_CONSTRAINTS WHERE TABLE_NAME = 'PURCHASES'; SELECT TRIGGER_NAME, STATUS FROM ALL_TRIGGERS WHERE TABLE_NAME = 'PURCHASES'; Load the Data: OPTIONS (DIRECT=TRUE) UNRECOVERABLE LOAD DATA INFILE 'purchases.dat' BADFILE 'purchases.bad' APPEND INTO TABLE purchases PARTITION (purchases_jan2005) (product_id position (1-6) char, time_key position (7-17) date "DD-MON-YYYY", customer_id position (18-25) char, ship_date position (26-36) date "DD-MON-YYYY", purchase_price position (37-43) decimal external, shipping_charge position (44-49) integer external, today_special_offer position (50) char) sqlldr USERID=easydw/easydw CONTROL=purchases.ctl LOG=purchases.log DIRECT=TRUE SKIP_INDEX_MAINTENANCE=TRUE Transformations Using SQL*Loader: LOAD DATA INFILE 'product.dat' append INTO TABLE product WHEN product_id != BLANKS FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY "'" (product_id "upper(:product_id)", product_name, category, cost_price, sell_price, weight, shipping_charge, manufacturer, supplier) ----------------------------------------------------- Section 5.4.3 Loading the Warehouse using Data Pump CREATE OR REPLACE DIRECTORY data_file_dir AS 'C:\datafiles\'; CREATE OR REPLACE DIRECTORY log_file_dir AS 'C:\logfiles\'; GRANT READ, WRITE ON DIRECTORY data_file_dir TO easydw; GRANT READ, WRITE ON DIRECTORY log_file_dir TO easydw; The expdp_par.txt file: SCHEMAS=(OLTP) INCLUDE=TABLE:"IN ('ORDERS')" QUERY=OLTP.ORDERS:"WHERE purchase_date BETWEEN to_date('01-dec-2004','dd-mon-yyyy') AND to_date('31-dec-2004','dd-mon-yyyy')" DIRECTORY=data_file_dir DUMPFILE=exp1.dmp LOGFILE=log_file_dir:exporders2004.log Exporting the data: expdp oltp/oltp@easydw parfile=expdp_par.txt Importing the data: impdp easydw/easydw@easydw DIRECTORY=data_file_dir DUMPFILE=exp1.dmp LOGFILE=log_file_dir:imporders2004.log REMAP_TABLESPACE=users:stage REMAP_SCHEMA=oltp:easydw ----------------------------------------------------- Section 5.4.4 External Tables Creating the external table CREATE TABLE new_products (product_id VARCHAR2(8), product_name VARCHAR2(30), category VARCHAR2(4), cost_price NUMBER (6,2), sell_price NUMBER (6,2), weight NUMBER (4,2), shipping_charge NUMBER (5,2), manufacturer VARCHAR2(20), supplier VARCHAR2(10)) ORGANIZATION EXTERNAL (TYPE ORACLE_LOADER DEFAULT DIRECTORY data_file_dir ACCESS PARAMETERS (RECORDS DELIMITED BY NEWLINE CHARACTERSET US7ASCII BADFILE log_file_dir:'product.bad' LOGFILE log_file_dir:'product.log' FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY "'") LOCATION ('product.dat') ) REJECT LIMIT UNLIMITED PARALLEL; Loading data from an external table: INSERT INTO easydw.product (product_id, product_name, category, cost_price, sell_price, weight, shipping_charge, manufacturer, supplier) SELECT product_id, product_name, category, cost_price, sell_price, weight, (shipping_charge * 1.10), manufacturer, supplier FROM new_products; EXPLAIN PLAN FOR INSERT INTO easydw.product (product_id, product_name, category, cost_price, sell_price, weight, shipping_charge, manufacturer, supplier) SELECT product_id, product_name, category, cost_price, sell_price, weight, (shipping_charge * 1.10), manufacturer, supplier FROM new_products; Using Data Pump external Tables to Move Data Create the external table: CREATE TABLE purchases_xt ORGANIZATION EXTERNAL ( TYPE ORACLE_DATAPUMP DEFAULT DIRECTORY xt_dir LOCATION ('purch_xt.dmp') ) AS SELECT * FROM purchases; Input from the external table file created above: CREATE TABLE purchases_xt2 (product_id VARCHAR2(8) , time_key DATE , customer_id VARCHAR2(10) , ship_date DATE, purchase_price NUMBER(6,2), shipping_charge NUMBER(5,2), today_special_offer VARCHAR2(1) ) ORGANIZATION EXTERNAL ( TYPE ORACLE_DATAPUMP DEFAULT DIRECTORY xt_dir LOCATION ('purch_xt.dmp') ); ----------------------------------------------------- Section 5.4.5 Transportable Tablespaces Step 1: CREATE TABLESPACE orders DATAFILE 'C:\ORACLE\PRODUCT\10.1.0\ORADATA\EASYDW\orders.f' SIZE 5M REUSE AUTOEXTEND ON DEFAULT STORAGE (INITIAL 64K PCTINCREASE 0 MAXEXTENTS UNLIMITED); Step 2: CREATE TABLE oltp.apr_orders TABLESPACE orders AS SELECT * FROM purchases WHERE ship_date BETWEEN to_date('01-APR-2004', 'dd-on-yyyy') AND to_date('30-APR-2004', 'dd-mon-yyyy'); Step 3: ALTER TABLESPACE orders READ ONLY; Step 4: expdp system/manager TRANSPORT_TABLESPACE=orders DIRECTORY=data_file_dir DUMPFILE=expdat.dmp Step 5: SELECT d.PLATFORM_NAME, tp.ENDIAN_FORMAT FROM V$TRANSPORTABLE_PLATFORM tp, V$DATABASE d WHERE tp.PLATFORM_NAME = d.PLATFORM_NAME; Step 7: impdp easydw/easydw@easydw TRANSPORT_DATAFILES=c:\oracle\product\10.1.0\oradata\easydw\ORDERS_5 DIRECTORY=tt2 DUMPFILE=expdat.dmp logfile=log_file_dir:imporders2004.log REMAP_SCHEMA=(oltp:easydw) Step 8: ALTER TABLESPACE orders READ WRITE; ----------------------------------------------------- Section 5.4.6 SQL Merge CREATE TABLE easydw.customer_changes (customer_id VARCHAR2(10), gender VARCHAR2(1), tax_rate NUMBER, city VARCHAR2(15), state VARCHAR2(10), region VARCHAR2(15), postal_code VARCHAR2(10), country VARCHAR2(20), occupation VARCHAR2(15)) ORGANIZATION EXTERNAL (TYPE ORACLE_LOADER DEFAULT DIRECTORY data_file_dir ACCESS PARAMETERS ( records delimited by newline characterset us7ascii badfile log_file_dir:'cus_changes.bad' logfile log_file_dir:'cust_changes.log' fields terminated by ',' optionally enclosed by "'") location ('customer_changes.dat') ) REJECT LIMIT UNLIMITED NOPARALLEL; MERGE INTO easydw.customer c USING easydw.customer_changes cc ON (c.customer_id = cc.customer_id) WHEN MATCHED THEN UPDATE SET c.city=cc.city, c.state=cc.state, c.postal_code=cc.postal_code, c.gender=cc.gender, c.country=cc.country, c.region=cc.region, c.tax_rate=cc.tax_rate, c.occupation=cc.occupation WHEN NOT MATCHED THEN INSERT (customer_id, city, state, postal_code, gender, region, country, tax_rate, occupation) VALUES (cc.customer_id, cc.city, cc.state, cc.postal_code, cc.gender, cc.region, cc.country, cc.tax_rate, cc.occupation) ; Insert only MERGE: MERGE INTO easydw.customer c USING easydw.customer_changes cc ON (c.customer_id = cc.customer_id) WHEN NOT MATCHED THEN INSERT (customer_id, city, state, postal_code, gender, region, country, tax_rate, occupation) VALUES (cc.customer_id, cc.city, cc.state, cc.postal_code, cc.gender, cc.region, cc.country, cc.tax_rate, cc.occupation) ; MERGE with conditional insert and update: MERGE INTO easydw.customer c USING easydw.customer_changes cc ON (c.customer_id = cc.customer_id) WHEN MATCHED THEN UPDATE SET c.city=cc.city, c.state=cc.state, c.postal_code=cc.postal_code, c.gender=cc.gender, c.country=cc.country, c.region=cc.region, c.tax_rate=cc.tax_rate, c.occupation=cc.occupation WHERE cc.postal_code not like '%-%' WHEN NOT MATCHED THEN INSERT (customer_id, city, state, postal_code, gender, region, country, tax_rate, occupation) VALUES (cc.customer_id, cc.city, cc.state, cc.postal_code, cc.gender, cc.region, cc.country, cc.tax_rate, cc.occupation) WHERE cc.postal_code not like '%-%' ; MERGE with delete: MERGE INTO easydw.customer c USING easydw.customer_changes cc ON (c.customer_id = cc.customer_id) WHEN MATCHED THEN UPDATE SET c.city=cc.city, c.state=cc.state, c.postal_code=cc.postal_code, c.gender=cc.gender, c.country=cc.country, c.region=cc.region, c.tax_rate=cc.tax_rate, c.occupation=cc.occupation DELETE WHERE (cc.tax_rate=-1) WHEN NOT MATCHED THEN INSERT (customer_id, city, state, postal_code, gender, region, country, tax_rate, occupation) VALUES (cc.customer_id, cc.city, cc.state, cc.postal_code, cc.gender, cc.region, cc.country, cc.tax_rate, cc.occupation) ; ----------------------------------------------------- Section 5.5.1 Transformations The REGEXP example: CREATE TABLE regexp (chardata varchar2(50)); INSERT INTO regexp VALUES ('A theory concerning'); INSERT INTO regexp VALUES ('the origin of the universe, is the Big '); INSERT INTO regexp VALUES ('Bang.'); COMMIT; SELECT * FROM regexp WHERE regexp_like (chardata, '*the*'); SELECT * FROM regexp WHERE regexp_like(chardata, 'B.{1}g'); SELECT regexp_instr(chardata, 'B.{1}g') FROM regexp WHERE regexp_like(chardata, 'B.{1}g') ; INSERT INTO regexp VALUES ('123-456-7890'); INSERT INTO regexp VALUES ('(111)-222-3333'); INSERT INTO regexp VALUES ('333-444-5555'); SELECT regexp_replace(chardata, '(\()(.*)(\))-(.*)-(.*)', '\2-\4-\5') AS transformed_string FROM regexp; ----------------------------------------------------- Section 5.5.3 Looking up the Warehouse Key UPDATE APR_ORDERS A SET A.PRODUCT_ID = (SELECT P.PRODUCT_ID FROM PRODUCT P WHERE A.PRODUCT_ID = P.PRODUCT_CODE); ----------------------------------------------------- Section 5.5.4 Table Functions CREATE TYPE purchases_record as OBJECT (product_id VARCHAR2(8), time_key DATE, customer_id VARCHAR2(10), ship_date DATE, purchase_price NUMBER(6,2), shipping_charge NUMBER(5,2), today_special_offer VARCHAR2(1)); CREATE TYPE purchases_table AS TABLE of purchases_record; CREATE PACKAGE cur_pack AS TYPE ref_cur_type IS REF CURSOR; END cur_pack; CREATE OR REPLACE FUNCTION transform(inputrecs IN cur_pack.ref_cur_type) RETURN purchases_table PIPELINED IS product_id VARCHAR2(8); time_key DATE; customer_id VARCHAR2(10); ship_date DATE; purchase_price NUMBER(6,2); shipping_charge NUMBER(5,2); today_special_offer VARCHAR2(1); BEGIN LOOP FETCH inputrecs INTO product_id, time_key,customer_id, ship_date,purchase_price,shipping_charge, today_special_offer; EXIT WHEN INPUTRECS%NOTFOUND; product_id := REPLACE(product_id, '-',''); shipping_charge :=(shipping_charge+shipping_charge*.10); PIPE ROW(purchases_record( product_id, time_key, customer_id, ship_date, purchase_price, shipping_charge, today_special_offer)); END LOOP; CLOSE inputrecs; RETURN; END; SELECT * FROM TABLE(transform(CURSOR(SELECT * FROM apr_orders))); CREATE TABLE TEST AS SELECT * FROM TABLE(transform(CURSOR(SELECT * FROM apr_orders)));