tag:blogger.com,1999:blog-10417980526706084482023-11-18T15:39:31.399-08:00We Do StreamsGreghttp://www.blogger.com/profile/10814882236396913542noreply@blogger.comBlogger38125tag:blogger.com,1999:blog-1041798052670608448.post-54352280108496529662010-02-18T01:30:00.000-08:002010-02-18T02:18:44.655-08:00Streams Implementer's GuideA book about Oracle is like a UFO: unprobable and exceptional. I wish I know better the 80 volumes of the Oracle Database documentation. However, a few times, a few books have provided me with such a different way to look and consider things that it has transformed my way of thinking. A book can turn something you couldn't get into something obvious. And I'm sure you know what and who I'm talking about: let's call it talent.<br /><br />That's why, despite the many disappointments, I continue to buy books about Oracle and continue to hope, like you can discover new genius friends, that the next book I'll buy will stay near me for a while.<br /><br />So when the <a href="http://www.packtpub.com/oracle-11g-streams-implementers-guide/book">Streams Implementer's Guide</a> Editor sent me a free copy of this book, UFO among UFOs, I've started to hope and wish again. And like a made on purpose, I just got my copy right now at the time I'm leaving for holidays. I don't know yet if I'll spend a few minutes or days reading it but it's probably worth to mention it. And, by the way, "How can you get to the point where you would start a book about Oracle Streams ?"<br /><br />I'll share my enthusiasm when I'll be back. Nice subject and challenge at a time Oracle replication technology is for sure living one of its greatest paradoxes since 9i Release 2...<br /><br />To be continued!Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com0tag:blogger.com,1999:blog-1041798052670608448.post-69843074043880027852009-12-18T01:33:00.000-08:002010-01-09T14:26:11.052-08:00Oracle 11gR2, GoldenGate and Koala Jump StartIt was some time ago now, but <a href="http://www.oracle.com/technology/products/goldengate/htdocs/statement-of-direction-gg.pdf">Oracle GoldenGate statement of direction</a> made me sad. Sometimes, I still wish the person who wrote it is somehow as far away as possible from the database server development team. But I know that's not the case. The sentence that is killing me for sure is:<br /><blockquote>"Given the strategic nature of Oracle GoldenGate, Oracle Streams will continue to be supported, but will not be actively enhanced. Rather, the best elements of Oracle Streams will be evaluated for inclusion with Oracle GoldenGate."</blockquote><h2>Some Background</h2><span class="fullpost">Even today, a few weeks after I've read GoldenGate documentation, tested the product and I've listed GoldenGate benefits for myself (openness, simplicity, completeness, ...). I'm still getting some trouble swallowing I will miss some of the unique features of Streams.<br /><br />Not that I'm intolerant or I'm stuck to the "good old time". I mean, I know there are bad and good things about Streams. During the past years, I've spent quite some energy<sup>(*)</sup> on leveraging what is so unique in it and I believe most of its advantages come from its "built-inside" architecture. To a point where it could be very difficult for Oracle to provide Streams strengths to GoldenGate because their architectures are so different.<br /><br />You may wonder what I am talking about. Well, I'm talking about:<br /><ul><li>Streams in-memory end-to-end propagation that provides an extremely scalable framework without the need to log changes in another separate file (trail)<br /></li><li>The reliability of the staging (the database) and propagation (buffered queues).<br /></li><li>Streams lighweight footprint that relies (most of the time) on internal object ids, can easily be offloaded to a separate server without any database copy and doesn't access original data at all<br /></li><li>The ability Streams has to treat set of changes without "deferring" the constraints<br /></li><li>The flexibility of the framework that is running inside the database with hooks at every stage you can completely personalize and extend to your needs<br /></li><li>The openness for developers that can directly use the APIs to provide advanced features to their applications by leveraging their Oracle knowledge <br /></li><li>The omnipotence and ubiquity of the solution that is already installed with Oracle 11g, work with RAC no change, is extended by the Change Data Capture feature and easy to share with the outside with AQ or XStream.<br /></li><li>The maturity, ease and simplicity it has gained over the years of hard work by the development team and you can leverage by a simple SQL script<br /></li><li>The cost model that comes for free with Oracle Database Enterprise Edition and with a limited set of features with 11g Standard Edition<br /></li></ul>At some point, I was hoping XStream would be the missing link between Streams and GoldenGate so that GoldenGate could leverage the power of Oracle Database 11g. That would for sure have been my preferred scenario and the fact XStream is licenced with GoldenGate made me hope for a while. But the reading of the statement of direction suggests a very different end.<br /><br />Now Oracle GoldenGate is very likely to, if not succeed, do a better job than Streams to bring real-time data integration to the mass ;-). Nevertheless, I have no clue how Oracle could, in a timely manner, provide Streams strengths to it. Interesting challenge for the product managers and development team... The future will tell us!<h2>Koala jump start</h2>As you can guess, time has come for all of us to learn more about Oracle GoldenGate. Since my laptop runs Karmic Koala, I've given it a try on Ubuntu 9.10. It's not supported but did not face any issue so far. I've downloaded and installed GoldenGate from <a href="http://edelivery.oracle.com/">Oracle E-Delivery</a> and I've choosen the following distribution:<br /><ul><li>Select a product Pack: "Oracle Fusion Middleware"</li><li>Platform: "Linux x86"</li><li>Description: "Oracle GoldenGate on Oracle Media Pack for Linux x86"</li><li>Name: "Oracle GoldenGate V10.4.0.x for Oracle 11g on RedHat 5.0"</li></ul>To perform the installation, I've just unzip/untar the file in a directory and I've set the environment variable so that I can access my Oracle Database:<pre>. oraenv<br /><em>ORACLE_SID = [WHITE] ?</em> BLACK<br /><em>The Oracle base for ORACLE_HOME=/u01/app/oracle/product/11.2.0/db_1 is /u01/app/oracle</em></pre>That done, you should be ready to use Oracle GoldenGate; run the command line interface like below:<pre>./ggsci <br /><br /><em>Oracle GoldenGate Command Interpreter for Oracle<br />Version 10.4.0.19 Build 002<br />Linux, x86, 32bit (optimized), Oracle 11 on Sep 29 2009 08:50:50<br /><br />Copyright (C) 1995, 2009, Oracle and/or its affiliates. All rights reserved.</em><br /><br />exit</pre><h2>Prepare the database</h2>There are a few settings to check on the database. It must be in archivelog mode and GoldenGate must be able to access the archivelogs and redologs:<pre>sqlplus / as sysdba<br /><br />archive log list<br /><em>Database log mode Archive Mode<br />Automatic archival Enabled<br />Archive destination /u01/app/oracle/oradata/BLACK/archivelogs<br />Oldest online log sequence 1<br />Next log sequence to archive 3<br />Current log sequence 3</em></pre>I've set the <code>NLS_LANG</code> so that it matches the one from my database:<pre>sqlplus / as sysdba<br /><br />select parameter, value<br /> from nls_database_parameters<br /> where parameter in ('NLS_LANGUAGE',<br /> 'NLS_TERRITORY',<br /> 'NLS_CHARACTERSET',<br /> 'NLS_LENGTH_SEMANTICS');<br /><br /><em>parameter VALUE<br />-------------------- -------------<br />NLS_LANGUAGE AMERICAN<br />NLS_TERRITORY AMERICA<br />NLS_CHARACTERSET WE8MSWIN1252<br />NLS_LENGTH_SEMANTICS BYTE</em><br /><br />exit<br /><br />export NLS_LANG=AMERICAN_AMERICA.WE8MSWIN1252</pre>The configuration has to rely on a user with a high level of privileges to perform several operations, like extracting the content of an UDT or a LOBs with flashback queries. It should also be able to set supplemental log groups to the tables that are part of the replication:<pre>sqlplus / as sysdba<br /><br />create user gg<br /> identified by gg<br /> default tablespace users<br /> temporary tablespace temp;<br /><br />grant create session, resource, dba to gg;</pre>The database must also have the minimal supplemental logging enabled:<pre>alter database add supplemental log data;<br /><br />alter system switch logfile;</pre>In my case, that was it. Obviously, based on what you want to do, your database may require more changes. It could require some tables to audit the DDL or to store checkpoints to be created. To get the complete list of what needs to be done, check <a href="http://www.oracle.com/technology/documentation/goldengate.html"> GoldenGate's documentation</a> and more specifically the "Oracle Installation and Setup Guide".<h2>A Demo Schema</h2>I did not try to setup anything advanced. I've just replicated <code>scott.dept</code> in a <code>demo</code> schema. To make the example even more simple, I've considered nobody was accessing the table and I did not pay any attention to pending transactions or out-of-sync instantiation. I've just created a table named <code>demo.dept</code> and loaded it with <code>scott.dept</code>'s data:<pre>create user demo identified by demo<br /> default tablespace users<br /> temporary tablespace temp<br /> quota unlimited on users;<br /><br />grant connect, resource to demo;<br /><br />create table demo.dept<br /> ( deptno number(2,0),<br /> dname varchar2(14),<br /> loc varchar2(13),<br /> constraint pk_dept primary key(deptno))<br /> tablespace users;<br /><br />insert into demo.dept<br /> select * from scott.dept;<br /><br />commit;</pre><h2>Configure GoldenGate Process Manager</h2>Once the database configured, refer to the "Administration Guide" to continue and get more details about GoldenGate setup. First, it requires some directories to store its configuration, logs and trail files. You can choose an alternate location for them but that wasn't really my concern either:<pre>./ggsci <br /><br />create subdirs<br /><br /><em>Creating subdirectories under current directory /gg<br /><br />Parameter files /gg/dirprm: created<br />Report files /gg/dirrpt: created<br />Checkpoint files /gg/dirchk: created<br />Process status files /gg/dirpcs: created<br />SQL script files /gg/dirsql: created<br />Database definitions files /gg/dirdef: created<br />Extract data files /gg/dirdat: created<br />Temporary files /gg/dirtmp: created<br />Veridata files /gg/dirver: created<br />Veridata Lock files /gg/dirver/lock: created<br />Veridata Out-Of-Sync files /gg/dirver/oos: created<br />Veridata Out-Of-Sync XML files /gg/dirver/oosxml: created<br />Veridata Parameter files /gg/dirver/params: created<br />Veridata Report files /gg/dirver/report: created<br />Veridata Status files /gg/dirver/status: created<br />Veridata Trace files /gg/dirver/trace: created<br />Stdout files /gg/dirout: created</em></pre>Once done, I've edited the Manager configuration file named <code>MGR</code> to set the <code>port</code> parameter and I've started it:<pre>edit params mgr<br /><br />view params mgr<br /><em>port 7809</em><br /><br />start manager<br /><br />status manager<br /><em>Manager is running (IP port arkzoyd.7809).</em></pre><h2>Source Table Supplemental Log Group</h2>Like Streams (there is no secret!), GoldenGate needs to be able to identify rows to apply captured changes. It provides some generic tools to enable and check additional logging to the tables from <code>ggsci</code>:<pre>dblogin userid gg, password gg<br /><em>Successfully logged into database.</em><br /><br />add trandata scott.dept<br /><em>Logging of supplemental redo data enabled for table SCOTT.DEPT.</em><br /><br />info trandata scott.dept<br /><em>Logging of supplemental redo log data is enabled for table SCOTT.DEPT</em></pre><h2>Parameter Files</h2>I've named the extract <code>scott</code>. It captures changes made to the <code>SCOTT.DEPT</code> table and send them to the remote trail file that, in my case, is managed by the same manager. I've named the replicat <code>demo</code>. The parameter files for <code>scott</code> and <code>demo</code> looks like the ones below:<pre>edit params scott<br /><br />view params scott<br /><br /><em>extract scott<br />userid gg, password gg<br />rmthost localhost mgrport 7809<br />rmttrail SC<br />table SCOTT.DEPT;</em><br /><br />edit params demo<br /><br />view params demo<br /><br /><em>replicat demo<br />assumetargetdefs<br />userid gg, password gg<br />map SCOTT.DEPT, target DEMO.DEPT;</em></pre><blockquote>Note:<br />With Oracle, you have to use double-quote to manage case-sensitive table names. However, that's not the case with all the database engines. As a result, depending on the parameter, GoldenGate may or may not differentiate strings with different cases. To avoid any issue, I use uppercase for the parameter values, unless I want to specifically use a different case.</blockquote><h2>Extract and Replicat</h2>Once the parameter files defined, I've added the extract, the replicat and the trail files from <code>ggsci</code>:<pre>add extract scott, tranlog, begin now<br /><em>EXTRACT added.</em><br /><br />add rmttrail SC, extract scott<br /><em>RMTTRAIL added.</em><br /><br />add replicat demo, exttrail SC, nodbcheckpoint, begin now<br /><em>REPLICAT added.</em></pre>And I've started them both:<pre>start er *<br /><em>Sending START request to MANAGER ...<br />EXTRACT SCOTT starting<br /><br />Sending START request to MANAGER ...<br />REPLICAT DEMO starting</em><br /><br />info all<br /><br /><em>Program Status Group Lag Time Since Chkpt<br />MANAGER RUNNING <br />EXTRACT RUNNING SCOTT 00:00:00 00:00:02 <br />REPLICAT RUNNING DEMO 00:00:00 00:00:08</em><br /><br />exit</pre><h2>Are Changes Replicated?</h2>Once the setup completed, I've tested the replication with the script below:<pre>sqlplus / as sysdba<br /><br />update scott.dept <br /> set dname='OPERATIONS2' <br /> where deptno=40;<br /><br />commit;<br /><br />select dname from demo.dept<br /> where deptno=40;<br /><br /><em>DNAME<br />--------------<br />OPERATIONS2</em><br /><br />update scott.dept <br /> set dname='OPERATIONS' <br /> where deptno=40;<br /><br />commit;<br /><br />select dname from demo.dept<br /> where deptno=40;<br /><br /><em>DNAME<br />--------------<br />OPERATIONS</em><br /><br />exit</pre><h2>Configuration Cleanup</h2>Obviously that's just a start. To avoid any issue with my next tests, I've cleaned up my configuration, once happy with it:<pre>./ggsci<br /><br />stop er *<br /><em>Sending STOP request to EXTRACT SCOTT ...<br />Request processed.<br /><br />Sending STOP request to REPLICAT DEMO ...<br />Request processed.</em><br /><br />delete er *<br /><em>Are you sure you want to delete all groups? y<br />Deleted EXTRACT SCOTT.<br />Deleted REPLICAT DEMO.</em><br /><br />stop manager<br /><em>Manager process is required by other GGS processes.<br />Are you sure you want to stop it (y/n)? y<br /><br />Sending STOP request to MANAGER ...<br />Request processed.<br />Manager stopped.</em><br /><br />exit</pre>And I've dropped the <code>demo</code> and <code>gg</code> users:<pre>sqlplus / as sysdba<br /><br />drop user gg cascade;<br />drop user demo cascade;<br /><br />exit</pre><font size="1">(*) I know at least one person that considers I'm a useless clown and the time I spend on Streams should be treated less mercifully. I admit it easily. I'm aware I'm not close to one percent of his knowledge about what stays an awesome technology. Anyway, good or bad, I've spent some energy on digging into Streams internals and I've always tried to listen to positive and negative feedback.</font></span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com9tag:blogger.com,1999:blog-1041798052670608448.post-80994832662880515262009-12-08T16:01:00.000-08:002009-12-16T14:22:24.871-08:00Hidden and Undocumented "Cardinality Feedback"I know this blog is supposed to be about Oracle Streams and AQ only... I could not resist to share one of my recent finding about some undocumented and automatic plan re-evaluation behavior I've faced with my 11.2.0.1 on Linux 32bits. Hopefully you'll forgive me this post...<br /><br />If you've read the Oracle Database 11.1 documentation, you've found a feature so called "Intelligent or Adaptive Cursor Sharing". From the documentation the feature seems related to bind peeking and you'll find an excellent article about it on the <a href="http://optimizermagic.blogspot.com/2009/04/update-on-adaptive-cursor-sharing.html">Optimizer Magic's "Update on Adaptive Cursor Sharing" blog.</a><span class="fullpost"> This story seems both very close and very different.<br /><br />However, like often with Oracle, that's more subtle than what it appears; I've found a case, you should be able to reproduce, where the query is reparsed, though there is not any bind variable.<h2>My demo schema</h2>The behavior my be related to some of my laptop characteristics. My very simple database is running 11.2.0.1 on Linux x86 32 bits with the following parameters:<blockquote>sga_target=260M<br />pga_aggregate_target=180M<br />filesystemio_options=setall</blockquote>I create a DEMO user and a set of tables, data and statistics. You can <a target="_blank" href="http://arkzoyd.free.fr/blog/demo_schema.sql">download the script from my website</a>.<h2>The query</h2>Execute the query below; the first time, you should figure out that it gets a first plan and its hash value is 1851413986:<pre>-- To change the behavior, set this parameter to none:<br />-- alter session set "_optimizer_extended_cursor_sharing_rel"=none;<br />-- Other than that, leave it to simple that is the default value for 11.2.0.1:<br />-- alter session set "_optimizer_extended_cursor_sharing_rel"=simple;<br /><br />set timing on<br />select /* GG */ t.year_id, sum(f.metric1)<br />from fact f, time t, dim2 d2, dim3 d3, dim4 d4<br />where f.time_id=t.time_id<br />and f.dim2_id=d2.dim2_id<br />and f.dim3_id1=d3.dim3_id1<br />and f.dim3_id2=d3.dim3_id2<br />and f.dim4_id=d4.dim4_id<br /> and d2.dim2_lib='Value 5'<br /> and d3.dim3_lib='Value (2,2)'<br /> and d4.dim4_l2='L2.1'<br /> and attr2='ZZ4'<br /> and t.time_id=trunc(t.time_id,'W')<br />group by t.year_id<br />order by t.year_id;<br /><br />YEAR_ID SUM(F.METRIC1)<br />--------- --------------<br />01-JAN-09 38490<br /><br />Elapsed: 00:00:06.10<br /><br />select *<br /> from table(dbms_xplan.display_cursor(format=>'basic note'));<br /><br />PLAN_TABLE_OUTPUT<br />-----------------<br />EXPLAINED SQL STATEMENT:<br />------------------------<br />select /* GG_2 */ t.year_id, sum(f.metric1) from fact f, time t, dim2<br />d2, dim3 d3, dim4 d4 where f.time_id=t.time_id and<br />f.dim2_id=d2.dim2_id and f.dim3_id1=d3.dim3_id1 and<br />f.dim3_id2=d3.dim3_id2 and f.dim4_id=d4.dim4_id and<br />d2.dim2_lib='Value 5' and d3.dim3_lib='Value (2,2)' and<br />d4.dim4_l2='L2.1' and attr2='ZZ4' and<br />t.time_id=trunc(t.time_id,'W') group by t.year_id order by t.year_id<br /><br />Plan hash value: 1851413986<br /><br />-----------------------------------------------------------------<br />| Id | Operation | Name |<br />-----------------------------------------------------------------<br />| 0 | SELECT STATEMENT | |<br />| 1 | SORT GROUP BY | |<br />| 2 | NESTED LOOPS | |<br />| 3 | NESTED LOOPS | |<br />| 4 | HASH JOIN | |<br />| 5 | PART JOIN FILTER CREATE | :BF0000 |<br />| 6 | NESTED LOOPS | |<br />| 7 | NESTED LOOPS | |<br />| 8 | MERGE JOIN CARTESIAN | |<br />| 9 | PARTITION RANGE ALL | |<br />| 10 | TABLE ACCESS FULL | TIME |<br />| 11 | BUFFER SORT | |<br />| 12 | TABLE ACCESS FULL | DIM3 |<br />| 13 | PARTITION RANGE ITERATOR | |<br />| 14 | PARTITION HASH ALL | |<br />| 15 | BITMAP CONVERSION TO ROWIDS | |<br />| 16 | BITMAP AND | |<br />| 17 | BITMAP INDEX SINGLE VALUE | FACT_TIME_IDX |<br />| 18 | BITMAP INDEX SINGLE VALUE | FACT_DIM3_IDX |<br />| 19 | TABLE ACCESS BY LOCAL INDEX ROWID| FACT |<br />| 20 | PARTITION HASH JOIN-FILTER | |<br />| 21 | TABLE ACCESS FULL | DIM2 |<br />| 22 | INDEX UNIQUE SCAN | DIM4_PK |<br />| 23 | TABLE ACCESS BY INDEX ROWID | DIM4 |<br />-----------------------------------------------------------------</pre>When I've executed the exact same query again, I've found a second plan with a hash value of 1094455219. Though the plan is better than the 1st one in my case, I would have expected the cursor to be reused:<pre>set timing on<br />select /* GG */ t.year_id, sum(f.metric1)<br />from fact f, time t, dim2 d2, dim3 d3, dim4 d4<br />where f.time_id=t.time_id<br />and f.dim2_id=d2.dim2_id<br />and f.dim3_id1=d3.dim3_id1<br />and f.dim3_id2=d3.dim3_id2<br />and f.dim4_id=d4.dim4_id<br /> and d2.dim2_lib='Value 5'<br /> and d3.dim3_lib='Value (2,2)'<br /> and d4.dim4_l2='L2.1'<br /> and attr2='ZZ4'<br /> and t.time_id=trunc(t.time_id,'W')<br />group by t.year_id<br />order by t.year_id;<br /><br />YEAR_ID SUM(F.METRIC1)<br />--------- --------------<br />01-JAN-09 38490<br /><br />Elapsed: 00:00:00.18<br /><br />select *<br /> from table(dbms_xplan.display_cursor(format=>'basic note'));<br /><br />PLAN_TABLE_OUTPUT<br />-----------------<br />EXPLAINED SQL STATEMENT:<br />------------------------<br />select /* GG_2 */ t.year_id, sum(f.metric1) from fact f, time t, dim2<br />d2, dim3 d3, dim4 d4 where f.time_id=t.time_id and<br />f.dim2_id=d2.dim2_id and f.dim3_id1=d3.dim3_id1 and<br />f.dim3_id2=d3.dim3_id2 and f.dim4_id=d4.dim4_id and<br />d2.dim2_lib='Value 5' and d3.dim3_lib='Value (2,2)' and<br />d4.dim4_l2='L2.1' and attr2='ZZ4' and<br />t.time_id=trunc(t.time_id,'W') group by t.year_id order by t.year_id<br /><br />Plan hash value: 1094455219<br /><br />--------------------------------------------------------------<br />| Id | Operation | Name |<br />--------------------------------------------------------------<br />| 0 | SELECT STATEMENT | |<br />| 1 | SORT GROUP BY | |<br />| 2 | HASH JOIN | |<br />| 3 | NESTED LOOPS | |<br />| 4 | NESTED LOOPS | |<br />| 5 | MERGE JOIN CARTESIAN | |<br />| 6 | MERGE JOIN CARTESIAN | |<br />| 7 | PARTITION HASH ALL | |<br />| 8 | TABLE ACCESS FULL | DIM2 |<br />| 9 | BUFFER SORT | |<br />| 10 | TABLE ACCESS FULL | DIM3 |<br />| 11 | BUFFER SORT | |<br />| 12 | PARTITION RANGE ALL | |<br />| 13 | TABLE ACCESS FULL | TIME |<br />| 14 | PARTITION RANGE ITERATOR | |<br />| 15 | PARTITION HASH ITERATOR | |<br />| 16 | BITMAP CONVERSION TO ROWIDS | |<br />| 17 | BITMAP AND | |<br />| 18 | BITMAP INDEX SINGLE VALUE | FACT_TIME_IDX |<br />| 19 | BITMAP INDEX SINGLE VALUE | FACT_DIM3_IDX |<br />| 20 | BITMAP INDEX SINGLE VALUE | FACT_DIM2_IDX |<br />| 21 | TABLE ACCESS BY LOCAL INDEX ROWID| FACT |<br />| 22 | TABLE ACCESS FULL | DIM4 |<br />--------------------------------------------------------------<br /><br />Note<br />-----<br /> - cardinality feedback used for this statement</pre>As you can see from the plan note something has changed in the calculation; if you collect the 10053 trace, you'll see, not only the cursor is re-parsed at the 2nd execution time but also some opt_estimate hints are used to correct the cardinality estimate. Interesting?<br /><h2>Conclusion</h2>From my tests, the value of _optimizer_extended_cursor_sharing_rel impacts that behavior. However, the data the CBO relies on to decide it's more clever to perform a plan change doesn't look obvious to me. In addition, the 1st time the query is parsed, the IS_SHAREABLE column of the query is set to 'Y' and if you query V$SQL_SHARED_CURSOR when the 2 cursors are in the shared pool, all you'll find no property to explain the plan change. I'm very curious about the how and the why... Any Idea?</span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com5tag:blogger.com,1999:blog-1041798052670608448.post-48862611197695621612009-10-17T06:04:00.001-07:002009-10-18T16:13:44.628-07:00Turning a Physical Standby and its Broker into a Logical StandbyTurning a Physical Standby into a Logical Standby is well documented. Check <a href="http://download.oracle.com/docs/cd/E11882_01/server.112/e10700/create_ls.htm#SBYDB00300">Oracle® Data Guard Concepts and Administration - 11g Release 2 (11.2) - Creating a Logical Standby Database</a> for all the details. Nevertheless, the documentation assumes you set up the log transport service manually and doesn't explain how to do it with the broker in place. You'll find below what needs to be done to perform that change with the broker in place. It's, by far, easier than by setting the <code>log_archive_dest_n</code> parameters.<span class="fullpost"><br /><blockquote>Notes:<br /><ul><li>It's a pity you cannot convert a physical Standby into a logical Standby from one-only <code>dgmgrl</code> command (yet!).</li><li>This post has been tested with Oracle Database 11g Release 2 (11.2.0.1) on Linux x86. </li><li>Before you proceed, make sure the temporary files specified correctly.</li></ul></blockquote><h2>A Physical Standby</h2>To begin, let's consider a data guard configuration made of 2 databases; <code>BLACK</code> is the primary and <code>WHITE</code> the physical standby, we'll transform the later into a logical standby database. The first step consists in stopping the redo apply process and checking the database is mounted:<br /><pre>edit configuration<br />set PROTECTION MODE AS MaxPerformance;<br /><br /><i>Succeeded.<br /></i><br />edit database white<br />set state=APPLY-OFF;<br /><br /><i>Succeeded.<br /></i><br />show database white;<br /><i><br />Database - white<br /><br />Role: PHYSICAL STANDBY<br />Intended State: </i><strong><i>APPLY-OFF</i></strong><i><br />Transport Lag: 0 seconds<br />Apply Lag: 0 seconds<br />Real Time Query: </i><strong><i>OFF</i></strong><i><br />Instance(s):<br />WHITE<br /><br />Database Status:<br />SUCCESS<br /></i><br />exit</pre><h2>Checking for datatypes and tables without keys</h2>Before you transform the standby database, you can check the database doesn't contain tables without unique keys and tables that cannot be maintained by the logical standby. Run the following queries on <code>BLACK</code>: <pre>select owner, table_name<br /> from dba_logstdby_not_unique<br /> where (owner, table_name) not in<br /> (select distinct owner, table_name<br /> from dba_logstdby_unsupported)<br /> and bad_column='Y';<br /><br /><i>no rows selected </i><br />select distinct owner, table_name<br /> from dba_logstdby_unsupported;<br /><i>[...]</i></pre><h2>Capturing the dictionary on the primary</h2>To proceed with the logical standby setup, you have to capture the dictionary on the source database (<code>BLACK</code>) and make sure there is no pending transactions. For that purpose, you can use the <code>dbms_logstdby</code> package like below:<br /><pre>. oraenv<br />BLACK<br /><br />sqlplus / as sysdba<br /><br />exec dbms_logstby.build;<br /><br />exit;</pre><h2>Applying the logs on the standby</h2>The standby database can now be synchronized to the point where the physical standby should be turned into a logical standby; to proceed with that step, proceed like below:<br /><pre>. oraenv<br />WHITE<br /><br />sqlplus / as sysdba<br /><br />alter database recover<br />to logical standby WHITE;<br /><br /><i>Database altered.</i></pre><h2>Opening the Physical Standby with ResetLogs</h2>The next step consists in opening the standby database with resetlogs:<br /><pre>shutdown<br /><i>ORA-01507: database not mounted<br />ORACLE instance shut down.</i><br /><br />startup mount<br /><i>ORACLE instance started.<br /><br />Total System Global Area 263639040 bytes<br />Fixed Size 1335892 bytes<br />Variable Size 104861100 bytes<br />Database Buffers 150994944 bytes<br />Redo Buffers 6447104 bytes<br />Database mounted.</i><br /><br />alter database open resetlogs;<br /><br /><i>Database altered.<br /></i><br />exit;</pre><h2>Rebuilding the broker configuration</h2>You can now change the data guard broker configuration to remove the physical standby configuration and add the logical standby instead:<br /><pre>. oraenv<br />BLACK<br /><br />dgmgrl /<br /><br />remove database white<br /> preserve destinations;<br /><br /><i>Removed database "white" from the configuration</i> <br /><br />show configuration<br /><br /><i>Configuration - worldwide<br /><br />Protection Mode: MaxPerformance<br />Databases:<br />black - Primary database<br /><br />Fast-Start Failover: DISABLED<br /><br />Configuration Status:<br />SUCCESS</i><br /><br />add database white<br /> as connect identifier is white<br /> maintained as logical;<br /><br /><i>Database "white" added</i><br /><br />enable configuration;<br /><br /><i>Enabled.</i><br /><br />show configuration;<br /><br /><i>Configuration - worldwide<br /><br />Protection Mode: MaxPerformance<br />Databases:<br />black - Primary database<br />white - Logical standby database<br /><br />Fast-Start Failover: DISABLED<br /><br />Configuration Status:<br />DISABLED</i><br /><br />enable configuration;<br /><br /><i>Enabled.</i><br /><br />show configuration<br /><br /><i>Configuration - worldwide<br /><br />Protection Mode: MaxPerformance<br />Databases:<br />black - Primary database<br />white - Logical standby database<br /><br />Fast-Start Failover: DISABLED<br /><br />Configuration Status:<br />SUCCESS</i></pre><blockquote>Note:<br />It can take a few minutes for the standby and primary to get to the right state. However, if it still fails after a few minutes, check the alert.log for any unexpected errors.</blockquote><h2>Changing Properties</h2>You can change some properties to ease the switchover and change the protection mode like below:<br /><pre>edit database white<br /> set property StaticConnectIdentifier=white;<br /><br /><i> Property "staticconnectidentifier" updated</i><br /><br />edit database black<br /> set property StaticConnectIdentifier=black;<br /><br /><i> Property "staticconnectidentifier" updated</i><br /><br />edit configuration<br /> set protection mode as MaxAvailability;<br /><br /><i> Succeeded.</i><br /><br />show configuration;<br /><br /><i>Configuration - worldwide<br /><br />Protection Mode: MaxAvailability<br />Databases:<br />black - Primary database<br />white - Logical standby database<br /><br />Fast-Start Failover: DISABLED<br /><br />Configuration Status:<br />SUCCESS<br /></i></pre><h2>Testing the Logical Standby</h2>Once the configuration done, you can test it; make sure your changes to the primary database are replicated to the logical standby database:<pre>. oraenv<br />BLACK<br /><br />sqlplus / as sysdba<br /><br />update scott.emp<br /> set sal=10000<br /> where ename='KING';<br /><br /><i>1 row updated.</i><br /><br />commit;<br /><br /><i>Commit complete.</i><br /><br />exit;<br /><br />. oraenv<br />WHITE<br /><br />sqlplus / as sysdba<br /><br />select sal<br /> from scott.emp<br /> where ename='KING';<br /><br />SAL<br />-----<br />10000</pre>You can also test the switchover:<br /><pre>. oraenv<br />BLACK<br /><br />dgmgrl /<br />switchover to white;<br /><br />show configuration;</pre><h2>Conclusion</h2>You may wonder what is the link between Streams and the Logical Standby. Obviously they both rely on the same technology but, in fact that's not the main reason why I'm investigating logical standby. No; instead that's because, with 11.2 you can now configure a Streams capture from a logical standby as described in <a href="http://download.oracle.com/docs/cd/E11882_01/server.112/e10700/manage_ls.htm#CHDEJJHH">Oracle® Data Guard Concepts and Administration - 11g Release 2 (11.2) - 10.6.6 Running an Oracle Streams Capture Process on a Logical Standby Database</a>.</span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com0tag:blogger.com,1999:blog-1041798052670608448.post-857712833855561542009-10-11T05:53:00.000-07:002009-10-11T15:15:31.287-07:00LOB ChunksTo continue with the previous test case, I'll insert a large enough LOB so that the change record is splitted into several chunks. To perform that operation :<br /><ul><li>Setup the example as described in <a href="http://wedostreams.blogspot.com/2009/10/streams-lcrs-and-lobs.html">Streams LCRs and LOBs</a> </li><li>Insert a LOB that is large enough to guaranty it will be deleted<br /></li></ul>What is interesting is less the output than the number of chunks and the size of each chunks.<span class="fullpost"><br /><h2>Insert a large CLOB</h2>The script below create a LOB as a temporary resource and, once built, insert it in SOURCE.T9:<br /><pre>connect source/source<br /><br />declare<br /> z clob;<br />begin<br /> DBMS_LOB.CREATETEMPORARY(z,true,DBMS_LOB.SESSION);<br /> for i in 1..20 loop<br /> dbms_lob.append(z,rpad('Z',1024,'Z'));<br /> end loop;<br /> insert into source.T9 values (2,z);<br />end;<br />/<br />commit;<br /><br />connect strmadmin/strmadmin<br />set lines 1000<br />set serveroutput on<br />exec print_xml_fromq('MYQUEUE');<br /></pre><h2>The result</h2>You'll find below the output that matches the set of LCRs generated by the one only insert:<br /><pre>---------------------------------------------------------<br /><ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"<br /> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"<br /> xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr<br /> http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd"><br /> <source_database_name>BLACK</source_database_name><br /> <command_type>INSERT</command_type><br /> <object_owner>SOURCE</object_owner><br /> <object_name>T9</object_name><br /> <transaction_id>5.5.913</transaction_id><br /> <scn>1448259</scn><br /> <new_values><br /> <new_value><br /> <column_name>ID</column_name><br /> <data><br /> <number>2</number><br /> </data><br /> </new_value><br /> <new_value><br /> <column_name>TEXT</column_name><br /> <data><br /> <varchar2 nil="true"><br /> </data><br /> <lob_information>EMPTY LOB</lob_information><br /> </new_value><br /> </new_values><br /></row_lcr><br />---------------------------------------------------------<br />---------------------------------------------------------<br /><ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"<br /> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"<br /> xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr<br /> http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd"><br /> <source_database_name>BLACK</source_database_name><br /> <command_type>LOB WRITE</command_type><br /> <object_owner>SOURCE</object_owner><br /> <object_name>T9</object_name><br /> <transaction_id>5.5.913</transaction_id><br /> <scn>1448259</scn><br /> <new_values><br /> <new_value><br /> <column_name>ID</column_name><br /> <data><br /> <number>2</number><br /> </data><br /> </new_value><br /> <new_value><br /> <column_name>TEXT</column_name><br /> <data><br /> <varchar2>ZZZ[...]ZZZ</varchar2><br /> </data><br /> <lob_information>LAST LOB CHUNK</lob_information><br /> <lob_offset>1</lob_offset><br /> </new_value><br /> </new_values><br /></row_lcr><br />---------------------------------------------------------<br />---------------------------------------------------------<br /><ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"<br /> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"<br /> xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr<br /> http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd"><br /> <source_database_name>BLACK</source_database_name><br /> <command_type>LOB WRITE</command_type><br /> <object_owner>SOURCE</object_owner><br /> <object_name>T9</object_name><br /> <transaction_id>5.5.913</transaction_id><br /> <scn>1448259</scn><br /> <new_values><br /> <new_value><br /> <column_name>ID</column_name><br /> <data><br /> <number>2</number><br /> </data><br /> </new_value><br /> <new_value><br /> <column_name>TEXT</column_name><br /> <data><br /> <varchar2>ZZZ[...]ZZZ</varchar2><br /> </data><br /> <lob_information>LAST LOB CHUNK</lob_information><br /> <lob_offset>8061</lob_offset><br /> </new_value><br /> </new_values><br /></row_lcr><br />---------------------------------------------------------<br />---------------------------------------------------------<br /><ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"<br /> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"<br /> xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr<br /> http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd"><br /> <source_database_name>BLACK</source_database_name><br /> <command_type>LOB WRITE</command_type><br /> <object_owner>SOURCE</object_owner><br /> <object_name>T9</object_name><br /> <transaction_id>5.5.913</transaction_id><br /> <scn>1448259</scn><br /> <new_values><br /> <new_value><br /> <column_name>ID</column_name><br /> <data><br /> <number>2</number><br /> </data><br /> </new_value><br /> <new_value><br /> <column_name>TEXT</column_name><br /> <data><br /> <varchar2>ZZZ[...]ZZZ</varchar2><br /> </data><br /> <lob_information>LAST LOB CHUNK</lob_information><br /> <lob_offset>16121</lob_offset><br /> </new_value><br /> </new_values><br /></row_lcr><br />---------------------------------------------------------<br />---------------------------------------------------------<br /><ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"<br /> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"<br /> xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr<br /> http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd"><br /> <source_database_name>BLACK</source_database_name><br /> <command_type>LOB TRIM</command_type><br /> <object_owner>SOURCE</object_owner><br /> <object_name>T9</object_name><br /> <transaction_id>5.5.913</transaction_id><br /> <scn>1448259</scn><br /> <new_values><br /> <new_value><br /> <column_name>ID</column_name><br /> <data><br /> <number>2</number><br /> </data><br /> </new_value><br /> <new_value><br /> <column_name>TEXT</column_name><br /> <data><br /> <varchar2 nil="true"><br /> </data><br /> <lob_information>LAST LOB CHUNK</lob_information><br /> <lob_operation_size>20480</lob_operation_size><br /> </new_value><br /> </new_values><br /></row_lcr><br />---------------------------------------------------------<br />---------------------------------------------------------<br /><ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"<br /> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"<br /> xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr<br /> http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd"><br /> <source_database_name>BLACK</source_database_name><br /> <command_type>UPDATE</command_type><br /> <object_owner>SOURCE</object_owner><br /> <object_name>T9</object_name><br /> <transaction_id>5.5.913</transaction_id><br /> <scn>1448259</scn><br /> <old_values><br /> <old_value><br /> <column_name>ID</column_name><br /> <data><br /> <number>2</number><br /> </data><br /> </old_value><br /> </old_values><br /></row_lcr><br />---------------------------------------------------------<br />No more messages</pre></span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com0tag:blogger.com,1999:blog-1041798052670608448.post-40917592577591702272009-10-09T12:03:00.000-07:002009-10-11T11:39:11.064-07:00Streams LCRs and LOBsLOBs (BLOB, CLOB, NCLOB) have been designed to provide a rich and efficient set of APIs. With securefile, LOB's storage model is highly efficient and optimized for compression. LOBs don't work like anything else within an Oracle database. You can check my previous post entitled <a href="http://wedostreams.blogspot.com/2009/09/triggers-and-lobs-synchronous-cdc-and.html">Triggers and LOBs</a>, if you are not convinced yet.<br /><br />As a result, Streams doesn't work with LOBs like anything else and using LOBs with Streams can quickly turn into a headache:<br /><ul><li>Many tools, including the ones provided by the documentation, don't provide a clean display of LCR with LOBs,<br /></li><li>Some packages require specific LOB settings, like <code>DBMS_APPLY_ADM.SET_DML_HANDLER</code> and its <code>assemble_lobs</code> parameter,</li><li>The streams feature set associated with LOB is quite different from the feature set associated with other types : no conflict detection, no support for synchronous capture mechanisms and several specific bugs (search for "Streams" and "LOB" in the Metalink Bug Search Engine). </li></ul>For all these reasons, it's very interesting to be able to figure out how LOBs and Streams work together. To help, I've built a very simple model that allows to see, if not exactly the content of LCRs containing LOBs at least a XML representation of them.<span class="fullpost"><br /><h2>The test case<br /></h2>You'll find below a description of the test case, I've built:<br /><br /><a onblur="try {parent.deselectBloggerImageGracefully();} catch(e) {}" href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgOgQpqywKZRb25bm0n6ZInlTWlB4tQPJXtpu7xZ4Rxx-Bcl-LGaDZo3lmAgEnYln3hdJuhzKzr22UaOVU2QR2zv0kEUKHelCsGxCx84Cu4QfnUPHQm_JwwHtUraMxmrx7bMBYHXg57C_U/s1600-h/sampleapp.png"><img style="margin: 0px auto 10px; display: block; text-align: center; cursor: pointer; width: 400px; height: 240px;" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgOgQpqywKZRb25bm0n6ZInlTWlB4tQPJXtpu7xZ4Rxx-Bcl-LGaDZo3lmAgEnYln3hdJuhzKzr22UaOVU2QR2zv0kEUKHelCsGxCx84Cu4QfnUPHQm_JwwHtUraMxmrx7bMBYHXg57C_U/s400/sampleapp.png" alt="" id="BLOGGER_PHOTO_ID_5391383251973720242" border="0" /></a>To work the test case is made of several parts<br /><ol><li>The source table is named SOURCE.T9. You can easily change its structures (from CLOB to BLOB for example). All the DML commands you'll execute and commit on that table will be captured.<br /></li><li>Because I use an asynchronous capture process, I need to make sure the change vectors are stored by LGWR in the redologs and archivelogs.<br /></li><li>I've created a strmadmin schema to stored all the objects I use for the setup (queue, procedure, rules...); I've named the capture process streams_capture; it captures the changes from the redologs and enqueue them in the streams_queue queue.</li><li>The apply process de-queues the LCRs from streams_queue<br /></li><li>It relies on a DML handler to manage the LCRs. That handler converts the LCRs into XML type objects with the DBMS_STREAMS.CONVERT_LCR_TO_XML procedure and enqueue them in the myqueue queue</li><li>The print_xml_fromq procedure is used to display the content of the XML type objects that match LCRs.<br /></li></ol><h2>The script</h2>The script below performs the whole setup of the test case described above:<br /><pre>connect / as sysdba<br /><br />col global_name format a30 new_value dbname<br />select global_name from global_name;<br /><br />create user source<br /> identified by source<br /> default tablespace users<br /> temporary tablespace temp;<br /><br />grant connect,resource to source;<br /><br />create table source.t9(<br /> id number,<br /> text clob,<br /> constraint t9_pk primary key (id))<br /> lob(text) store as securefile;<br /><br />var first_scn number;<br /><br />set serveroutput on<br />declare<br /> scn number;<br />begin<br /> dbms_capture_adm.build(<br /> first_scn => scn);<br /> dbms_output.put_line('First SCN Value = ' || scn);<br /> :first_scn := scn;<br />end;<br />/<br /><br />col first_scn format 999999999999 -<br />new_value first_scn<br />select :first_scn first_scn from dual;<br /><br />create tablespace streams_tbs<br /> datafile '/u01/app/oracle/oradata/BLACK/streams_tbs.dbf'<br /> size 25M autoextend on maxsize 256M;<br /><br />create user strmadmin identified by strmadmin<br /> default tablespace streams_tbs<br /> quota unlimited on streams_tbs;<br /><br />grant dba to strmadmin;<br />grant execute on dbms_aq to strmadmin;<br /><br />begin<br /> dbms_streams_adm.set_up_queue(<br /> queue_table => 'strmadmin.streams_queue_table',<br /> queue_name => 'strmadmin.streams_queue');<br />end;<br />/<br /><br />connect strmadmin/strmadmin<br /><br />var first_scn number;<br />exec :first_scn:=&&first_scn<br /><br />begin<br /> dbms_streams_adm.add_table_rules(<br /> table_name => 'source.t9',<br /> streams_type => 'capture',<br /> streams_name => 'streams_capture',<br /> queue_name => 'strmadmin.streams_queue',<br /> include_dml => true,<br /> include_ddl => false,<br /> include_tagged_lcr => false,<br /> source_database => '&&dbname',<br /> inclusion_rule => true,<br /> and_condition => null);<br />end;<br />/<br /><br />col capture_name format a15<br />col queue_name format a13<br />col first_scn format 999999999999<br />col start_scn format 999999999999<br />col rule_set_name format a11<br /><br />select capture_name,<br /> queue_name,<br /> first_scn,<br /> start_scn,<br /> rule_set_name<br /> from dba_capture;<br /><br />set lines 120<br />col streams_name format a16<br />col streams_type format a9<br />col rule_owner format a10<br />col table_name format a15<br />col rule_type format a8<br />col rule_name format a15<br /><br />select rule_owner,<br /> streams_name,<br /> streams_type,<br /> table_owner||'.'||table_name table_name,<br /> rule_type,<br /> rule_name<br /> from dba_streams_table_rules;<br /><br />select table_owner||'.'||table_name table_name,<br /> scn,<br /> timestamp,<br /> supplemental_log_data_pk<br /> from dba_capture_prepared_tables;<br /><br />connect strmadmin/strmadmin<br /><br />begin<br /> dbms_streams_adm.add_table_rules(<br /> table_name => 'source.t9',<br /> streams_type => 'apply',<br /> streams_name => 'streams_apply',<br /> queue_name => 'strmadmin.streams_queue',<br /> include_dml => true,<br /> include_ddl => false,<br /> include_tagged_lcr => false,<br /> source_database => '&&dbname',<br /> inclusion_rule => true);<br />end;<br />/<br /><br />col apply_name format a13<br />col queue_name format a13<br />col rule_set_name format a11<br /><br />select apply_name,<br /> queue_name,<br /> rule_set_name,<br /> status,<br /> message_delivery_mode<br /> from dba_apply;<br /><br />col instantiation_scn format 999999999999 -<br />new_value instantiation_scn<br /><br />select dbms_flashback.get_system_change_number instantiation_scn<br /> from dual;<br /><br />begin<br /> dbms_apply_adm.set_table_instantiation_scn(<br /> source_object_name => 'source.t9',<br /> source_database_name => 'BLACK',<br /> instantiation_scn => &&instantiation_scn);<br />end;<br />/<br /><br />col source_database format a6<br />col object format a10<br />col instantiation_scn format 999999999999<br /><br />select source_database,<br /> source_object_owner||'.'||source_object_name object,<br /> instantiation_scn<br /> from dba_apply_instantiated_objects;<br /><br />connect strmadmin/strmadmin<br /><br />begin<br /> dbms_aqadm.create_queue_table(<br /> queue_table => 'myqueue_table',<br /> queue_payload_type => 'sys.xmltype',<br /> multiple_consumers => false);<br /> dbms_aqadm.create_queue(<br /> queue_name => 'myqueue',<br /> queue_table => 'myqueue_table');<br /> dbms_aqadm.start_queue('myqueue');<br />end;<br />/<br /><br />create or replace procedure myhandler(in_any in anydata)<br />is<br /> enqueue_options DBMS_AQ.enqueue_options_t;<br /> message_properties DBMS_AQ.message_properties_t;<br /> recipients DBMS_AQ.aq$_recipient_list_t;<br /> message_handle RAW(16);<br />begin<br /> enqueue_options.visibility := dbms_aq.immediate;<br /> enqueue_options.delivery_mode := dbms_aq.persistent;<br /> dbms_aq.enqueue(<br /> queue_name => 'STRMADMIN.MYQUEUE',<br /> enqueue_options => enqueue_options,<br /> message_properties => message_properties,<br /> payload => dbms_streams.convert_lcr_to_xml(in_any),<br /> msgid => message_handle);<br />end;<br />/<br /><br />begin<br /> dbms_apply_adm.set_dml_handler(<br /> object_name => 'SOURCE.T9',<br /> object_type => 'TABLE',<br /> operation_name => 'DEFAULT',<br /> error_handler => false,<br /> user_procedure => 'strmadmin.myhandler',<br /> apply_name => 'STREAMS_APPLY',<br /> assemble_lobs => false);<br />end;<br />/<br /><br />exec dbms_capture_adm.start_capture('streams_capture');<br />exec dbms_apply_adm.start_apply('streams_apply');<br /><br />create or replace procedure print_xml_fromq(queue_name varchar2)<br />is<br /> dequeue_options dbms_aq.dequeue_options_t;<br /> message_properties dbms_aq.message_properties_t;<br /> message_handle raw(16);<br /> message sys.xmltype;<br /> v_out pls_integer;<br /> no_messages exception;<br /> pragma exception_init(no_messages, -25228);<br />begin<br /> dequeue_options.wait := dbms_aq.no_wait;<br /> dequeue_options.navigation := dbms_aq.first_message;<br /> dequeue_options.visibility := dbms_aq.immediate;<br /> dequeue_options.delivery_mode := dbms_aq.persistent;<br /> loop<br /> begin<br /> dbms_aq.dequeue(<br /> queue_name => queue_name,<br /> dequeue_options => dequeue_options,<br /> message_properties => message_properties,<br /> payload => message,<br /> msgid => message_handle);<br /> dbms_output.put_line('---------------------------------------------------------');<br /> dbms_output.put_line(message.extract('/*').getstringval());<br /> dbms_output.put_line('---------------------------------------------------------');<br /> dbms_output.put_line('');<br /> dequeue_options.navigation := dbms_aq.next_message;<br /> end;<br /> end loop;<br /> exception<br /> when no_messages then<br /> dbms_output.put_line('No more messages');<br /> commit;<br />end;<br />/<br /></pre><blockquote>If you are use to streams, there is not much that you don't know already in that script. The table contains a LOB but can easily be changed, even after you've started the capture to match you need. The very powerful feature I use and that comes with Streams is the ability to transform a LCR into a XML File with the <a href="http://download.oracle.com/docs/cd/E11882_01/appdev.112/e10577/d_streams.htm#CHDGBJJA">DBMS_STREAMS.CONVERT_LCR_TO_XML</a> procedure.</blockquote><h2>Play with LOBs</h2>Once the configuration done, you can use implicit LOB conversion or the LOB built-ins to create and modify LOB values from SQL and PL/SQL like below:<br /><pre>connect source/source<br /><br />insert into source.T9 values (1,empty_clob());<br />commit;<br /><br />declare<br /> my_x clob;<br />begin<br /> select text into my_x<br /> from source.T9 where id=1<br /> for update;<br /> dbms_lob.append(my_x,'Z');<br /> commit;<br />end;<br />/<br /><br />declare<br /> my_x clob;<br /> amount number;<br />begin<br /> select text into my_x<br /> from source.T9 where id=1<br /> for update;<br /> amount:=1;<br /> dbms_lob.erase(my_x,amount,1);<br /> commit;<br />end;<br />/<br /></pre><h2>Display XMLized LCRs<br /></h2>You can now display the content of the LCRs that have been captured with the script below:<br /><pre>connect strmadmin/strmadmin<br />set lines 1000<br />set serveroutput on<br />exec print_xml_fromq('MYQUEUE');</pre>The result looks like below. Pay attention to the various command type "LOB WRITE", "LOB TRIM" and "LOB ERASE":<br /><pre>---------------------------------------------------------<br /><row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr" <br /> xsi="http://www.w3.org/2001/XMLSchema-instance"<br /> schemalocation="http://xmlns.oracle.com/streams/schemas/lcr<br /> http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd"><br /> <source_database_name>BLACK</source_database_name><br /> <command_type>INSERT</command_type><br /> <object_owner>SOURCE</object_owner><br /> <object_name>T9</object_name><br /> <transaction_id>6.31.810</transaction_id><br /> <scn>1351988</scn><br /> <new_values><br /> <new_value><br /> <column_name>ID</column_name><br /> <data><br /> <number>1</number><br /> </data><br /> </new_value><br /> <new_value><br /> <column_name>TEXT</column_name><br /> <data><br /> <varchar2 nil="true"></varchar2><br /> <lob_information>EMPTY LOB</lob_information><br /> </data><br /> </new_value><br /> </new_values><br /></row_lcr><br />---------------------------------------------------------<br />---------------------------------------------------------<br /><row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr" <br /> xsi="http://www.w3.org/2001/XMLSchema-instance"<br /> schemalocation="http://xmlns.oracle.com/streams/schemas/lcr<br /> http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd"><br /> <source_database_name>BLACK</source_database_name><br /> <command_type>UPDATE</command_type><br /> <object_owner>SOURCE</object_owner><br /> <object_name>T9</object_name><br /> <transaction_id>6.31.810</transaction_id><br /> <scn>1351988</scn><br /> <old_values><br /> <old_value><br /> <column_name>ID</column_name><br /> <data><br /> <number>1</number><br /> </data><br /> </old_value><br /> </old_values><br /></row_lcr><br />---------------------------------------------------------<br />---------------------------------------------------------<br /><row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr" <br /> xsi="http://www.w3.org/2001/XMLSchema-instance"<br /> schemalocation="http://xmlns.oracle.com/streams/schemas/lcr<br /> http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd"><br /> <source_database_name>BLACK</source_database_name><br /> <command_type>LOB WRITE</command_type><br /> <object_owner>SOURCE</object_owner><br /> <object_name>T9</object_name><br /> <transaction_id>3.33.828</transaction_id><br /> <scn>1352031</scn><br /> <new_values><br /> <new_value><br /> <column_name>ID</column_name><br /> <data><br /> <number>1</number><br /> </data><br /> </new_value><br /> <new_value><br /> <column_name>TEXT</column_name><br /> <data><br /> <varchar2>Z</varchar2><br /> </data><br /> <lob_information>LAST LOB CHUNK</lob_information><br /> <lob_offset>1</lob_offset><br /> </new_value><br /> </new_values><br /></row_lcr><br />---------------------------------------------------------<br />---------------------------------------------------------<br /><row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr" <br /> xsi="http://www.w3.org/2001/XMLSchema-instance"<br /> schemalocation="http://xmlns.oracle.com/streams/schemas/lcr<br /> http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd"><br /> <source_database_name>BLACK</source_database_name><br /> <command_type>LOB TRIM</command_type><br /> <object_owner>SOURCE</object_owner><br /> <object_name>T9</object_name><br /> <transaction_id>3.33.828</transaction_id><br /> <scn>1352031</scn><br /> <new_values><br /> <new_value><br /> <column_name>ID</column_name><br /> <data><br /> <number>1</number><br /> </data><br /> </new_value><br /> <new_value><br /> <column_name>TEXT</column_name><br /> <data><br /> <varchar2 nil="true"></varchar2><br /> <lob_information>LAST LOB CHUNK</lob_information><br /> <lob_operation_size>1</lob_operation_size><br /> </data><br /> </new_value><br /> </new_values><br /></row_lcr><br />---------------------------------------------------------<br />---------------------------------------------------------<br /><row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr" <br /> xsi="http://www.w3.org/2001/XMLSchema-instance"<br /> schemalocation="http://xmlns.oracle.com/streams/schemas/lcr<br /> http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd"><br /> <source_database_name>BLACK</source_database_name><br /> <command_type>LOB ERASE</command_type><br /> <object_owner>SOURCE</object_owner><br /> <object_name>T9</object_name><br /> <transaction_id>7.19.625</transaction_id><br /> <scn>1352034</scn><br /> <new_values><br /> <new_value><br /> <column_name>ID</column_name><br /> <data><br /> <number>1</number><br /> </data><br /> </new_value><br /> <new_value><br /> <column_name>TEXT</column_name><br /> <data><br /> <varchar2 nil="true"></varchar2><br /> <lob_information>LAST LOB CHUNK</lob_information><br /> <lob_offset>1</lob_offset><br /> <lob_operation_size>1</lob_operation_size><br /> </data><br /> </new_value><br /> </new_values><br /></row_lcr><br />---------------------------------------------------------<br />No more messages<br /></pre><h2>Drop the test case</h2>Once your tests done, you can drop the whole test case running the script below:<br /><pre>connect / as sysdba<br /><br />col global_name format a30 new_value dbname<br />select global_name from global_name;<br /><br />exec dbms_apply_adm.stop_apply('STREAMS_APPLY')<br />exec dbms_capture_adm.stop_capture('STREAMS_CAPTURE')<br /><br />exec dbms_apply_adm.delete_all_errors('STREAMS_APPLY');<br />exec dbms_apply_adm.drop_apply('STREAMS_APPLY')<br />exec dbms_capture_adm.drop_capture('STREAMS_CAPTURE')<br /><br />drop procedure strmadmin.mydml_handler;<br /><br />begin<br />dbms_aqadm.stop_queue('strmadmin.myqueue');<br />dbms_aqadm.drop_queue('strmadmin.myqueue');<br />dbms_aqadm.drop_queue_table('strmadmin.myqueue_table', true);<br />end;<br />/<br /><br />begin<br />for i in (select source_object_owner||'.'||<br /> source_object_name name<br /> from dba_apply_instantiated_objects<br /> where source_object_owner in ('SOURCE'))<br />loop<br /> dbms_apply_adm.set_table_instantiation_scn(<br /> source_object_name => i.name,<br /> source_database_name => '&&dbname',<br /> instantiation_scn => null);<br />end loop;<br />end;<br />/<br /><br />begin<br />for i in (select object_owner||'.'||<br /> object_name name,<br /> operation_name,<br /> apply_name<br /> from dba_apply_dml_handlers<br /> where object_owner in ('SOURCE'))<br />loop<br /> dbms_apply_adm.set_dml_handler(<br /> object_name => i.name,<br /> object_type => 'TABLE',<br /> operation_name=> i.operation_name,<br /> user_procedure=> null,<br /> apply_name => i.apply_name);<br /> end loop;<br />end;<br />/<br /><br />exec dbms_streams_adm.remove_queue('strmadmin.streams_queue',true,true);<br /><br />drop user strmadmin cascade;<br /><br />drop tablespace streams_tbs<br /> including contents and datafiles;<br /><br />drop user source cascade;</pre>In a next blog post, I'll dig further into how LOBs work within LCRs. If you want write your own LCRs that contains LOB, you can check the documentation; there is a short section called "<a href="http://download.oracle.com/docs/cd/E11882_01/server.112/e12862/loblcrdemo.htm">Oracle® Streams Extended Examples - Logical Change Records With LOBs Example</a>"</span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com0tag:blogger.com,1999:blog-1041798052670608448.post-22532991342935068902009-10-05T09:08:00.002-07:002009-10-05T14:23:29.012-07:00My XStream FAQI have written 2 programs a few months back to use and experiment Oracle XStream; when I discovered Oracle had launched XStream right after closing the deal with GoldenGate (a few hours back from now...), I thought that might be clever to publish those Java programs help people set up a <a href="http://wedostreams.blogspot.com/2009/10/xstream-inbound-sample-java-program.html">XStream Inbound configuration</a> and <a href="http://wedostreams.blogspot.com/2009/10/xstream-outbound-sample-java-program.html">XStream Outbound configuration</a>. I really wish those 2 programs can help people.<br /><br />However, after a few conversations on IM, Twitter (<a href="http://twitter.com/arkzoyd">@arkzoyd</a>), and the blog, I'm now convinced I made it wrong. I should have started by the beginning instead of pushing those programs with no explanation. I should have talked about the reasons why it's important to track message positions. How subtle XStream actually is. What benefit you could get from XStream compared to what use to exist before.<br /><br />I'll try to correct the current situation and provide simple answers to the questions below:<br /><ul><li><a href="http://wedostreams.blogspot.com/2009/10/my-xstream-faq.html#q1">What is Oracle Streams?</a><br /></li><li><a href="http://wedostreams.blogspot.com/2009/10/my-xstream-faq.html#q2">Why could you not get to the same result before XStream?</a><br /></li><li><a href="http://wedostreams.blogspot.com/2009/10/my-xstream-faq.html#q3">What problem does it solve?</a><br /></li><li><a href="http://wedostreams.blogspot.com/2009/10/my-xstream-faq.html#q4">Why is it cool?</a><br /></li><li><a href="http://wedostreams.blogspot.com/2009/10/my-xstream-faq.html#q5">How does XStream relate to GoldenGate?</a><br /></li><li><a href="http://wedostreams.blogspot.com/2009/10/my-xstream-faq.html#q6">What does Oracle want to do with XStream?</a><br /></li><li><a href="http://wedostreams.blogspot.com/2009/10/my-xstream-faq.html#q7">How does XStream work?</a><br /></li><li><a href="http://wedostreams.blogspot.com/2009/10/my-xstream-faq.html#q8">How can I know more about XStream?</a><br /></li></ul>If you have some high level questions about XStream, feel free to comment this post; If you have technical issues, I suggest you directly post on the <a href="http://forums.oracle.com/forums/forum.jspa?forumID=70">Streams Forum</a>...<span class="fullpost"><br /><a name="q1"></a><h2>What is Oracle XStream?</h2>XStream is a new way to interact between Oracle Streams and the outside world. It uses a publish subscribe model that is different from the regular AQ queuing model. The purpose is to provide the same kind of features Oracle uses to propagate messages between databases to the outside world.<br /><br />As a result, with XStream, you can now propagate messages between a database and a program written in OCI or Java/OCI. In addition, you can guaranty that:<br /><ul><li>no messages are written on disks, everything stays in memory during the propagation to and from the program; and not only you can guaranty that behavior between the database and your program but also to the whole chain of programs from the source to the destination of your messages<br /></li><li>If something crashes whether it's a database instance or your program, you'll be able to recover all your messages, in the right order and without loosing a single one</li></ul>Obviously, if you face some bottleneck/saturation, you may decide, like Streams does internally, that it's more efficient for your messages to spill out of memory. In that case, you can turn them into persistent messages or you can slow down the capture or propagation processes.<br /><a name="q2"></a><h2>Why could you not get to the same result before XStream?</h2>There are 2 cases, XStream addresses: (1) Inbound messages and (2) Outbound messages.<br /><ul style="font-weight: bold; font-style: italic;"><li>Inbound messages</li></ul>Before XStream, to send a message to a database the simplest way was to make it persistent; you could for example : "insert the message into a persistent queue" or "insert it into a table". In both cases, Oracle database guaranties you will not loose your message. However, if the database you put your messages in isn't your message destination database, you would have to pay the price of turning messages persistent in that staging area.<br /><br />Obviously you could also decide to enqueue your messages into a non-persistent queue on the staging database and use a propagation job to send the message to its destination database. The problem with that method is that there is no guaranty that, if an instance crashes, the message will ever go to its destination; otherwise, non-persistent queue would be called persistent queues!<br /><br />To both guaranty the message to get to its destination, without the price of turning it persistent on the staging databases, until Oracle Database 11g Release 2, you could not rely on any Oracle database built-in... But now, you have XStream!<br /><ul style="font-weight: bold; font-style: italic;"><li>Outbound messages</li></ul> The principle is basically the same for outbound messages. A Streams captured message could be turned into a persistent message which guaranty you will not loose your message but require you pay the price of turning captured messages persistent on the staging database.<br /><br />You could also turn your messages into non-persistent messages; yes, it's possible, if say, you create an apply handler that enqueue the message in a buffered queue. In that case, there is no guaranty you will never loose any message.<br /><br />Like for Inbound messages, there is no built-in feature of Oracle you could use to guaranty a message from inside a database could be send to a destination outside the database without turning it into a persistent message on the staging database.<br /><a name="q3"></a><h2>What problem does it solve?</h2>As you can now guess, XStream enables you to propagate Streams messages between Oracle databases and the outside world in an extremely efficient way. It allows to propagate messages in-memory only (in most cases!) and, at the same time, it guaranties messages will be delivered to their destination.<br /><a name="q4"></a><h2>Why is it cool?</h2>Because Streams is cool! And providing the power of Streams to the outside world is VERY cool, don't you think? ;-)<br /><a name="q5"></a><h2>How does XStream relate to GoldenGate?</h2>XStream, like Oracle Active Data Guard is included into the GoldenGate license. But opposite to Active Data Guard, XStream cannot be licensed separately from GoldenGate. For now that's the only link between XStream and GoldenGate, even if we can guess GoldenGate could leverage the XStream feature sooner or later.<br /><a name="q6"></a><h2>What does Oracle want to do with XStream?</h2> That's actually a very good question and I assume we can only guess for now. There is a bunch of things Oracle could do with XStream, like the ones below:<br /><ul><li>leveraging the technology in GoldenGate more than just by a bundle</li><li>leveraging the technology for existing features of the Oracle Database like the In-Memory Database Cache or Oracle Coherence<br /></li><li>leveraging the technology to build new features like file to database integration or mySQL to Oracle integration (If we consider mySQL could turn into an Oracle offer one day soon).<br /></li><li>probably many more things I cannot even think about...<br /></li></ul><a name="q7"></a><h2>How does XStream work?</h2>XStream relies on a few facts:<br /><ul><li>Messages are consumed once on the destination; whether they are made persistent or not doesn't actually matter. Once consumed a message on the destination will never be requested again.<br /></li><li>There is an order in the messages; in addition, there is a raw value that strictly increases and uniquely identifies the message positions in that order<br /></li><li>If a message is lost in the chain, that's because the chain has broken; and, in that case, all the subsequent messages are lost too.<br /></li><li>The message order is kept during the propagation from the source to the destination<br /></li><li>Messages are persistent on the source; Or, saying it in another way, if the destination doesn't get a message, the source will be able to resend it and in the same right order... Assuming it gets back the message position where to restart at!<br /></li></ul>As a result, an XStream program deals with its server:<br /><ul><li>In the case of XStream Inbound, the program gets the last position from the server and then enqueue its messages from that point. On a regular basis, it gets some feedback from the server saying what messages are consumed on the destination and can release those messages from the source</li><li>In the case of XStream Outbound, the program sends the last position of the applied messages and then dequeue messages from that point. On regular basis, it provides some feedback to the server saying what messages are consumed to allow the server to release them from its source.<br /></li></ul><a name="q8"></a><h2>How can I know more about XStream?</h2>To know everything about XStream, read the following documentation:<br /><ul><li><a href="http://download.oracle.com/docs/cd/E11882_01/doc.112/e15874/toc.htm">Oracle® Database XStream Guide</a></li><li><a href="http://download.oracle.com/docs/cd/E11882_01/appdev.112/e11045/toc.htm">Oracle® Database XStream Java API Reference</a></li><li><a href="http://download.oracle.com/docs/cd/E15881_01/doc.104/e15889.pdf">Oracle GoldenGate 10.4 Licencing Guide</a></li></ul></span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com0tag:blogger.com,1999:blog-1041798052670608448.post-20687968257199361582009-10-04T14:52:00.000-07:002009-10-04T15:42:53.937-07:00XStream Outbound... A sample Java ProgramLike with <a href="http://wedostreams.blogspot.com/2009/10/xstream-inbound-sample-java-program.html">XStream InBound in my previous post</a>, you'll find below a simple Java for XStream Outbound. This new program subscribes to Streams captured changes. Does it sound easy ? It is, like you'll figure out below.<span class="fullpost"><br /><h2>Step 1: Create a Sample Schema</h2>For this sample program, create a schema and a table :<br /><pre>connect / as sysdba<br /><br />create user source<br /> identified by source<br /> default tablespace users<br /> temporary tablespace temp;<br /><br />grant connect,resource to source;<br /><br />col dbname new_value dbname<br /><br />select value dbname<br /> from v$parameter<br />where name='db_unique_name';<br /><br />prompt &&dbname<br /><br />connect source/source<br /><br />create table t7(<br /> id number primary key,<br /> text1 varchar2(80),<br /> text2 varchar2(80));<br /><br />insert into t7(id, text1, text2)<br /> values (1,'Text 1','Text 1');<br /><br />insert into t7(id, text1, text2)<br /> values (2,'Text 2','Text 2');<br /><br />commit;</pre><h2>Step 2: Create a Streams Administrator</h2>To subscribe to Streams changes, you must create a Streams administrator:<br /><pre>connect / as sysdba<br /><br />create tablespace streams_tbs<br /> datafile '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'<br /> size 25M autoextend on maxsize 256M;<br /><br />CREATE USER strmadmin<br /> IDENTIFIED BY strmadmin<br /> DEFAULT TABLESPACE streams_tbs<br /> QUOTA UNLIMITED ON streams_tbs<br /> temporary tablespace temp;<br /><br />grant dba to strmadmin;<br /><br />begin<br /> dbms_streams_adm.set_up_queue(<br /> queue_table => 'strmadmin.streams_queue_table',<br /> queue_name => 'strmadmin.streams_queue');<br />end;<br />/<br /><br />exec dbms_streams_auth.grant_admin_privilege('strmadmin', true);<br /><br />select *<br /> from dba_streams_administrator;</pre><h2>Step 3: Create a Simple XStream Out Configuration</h2>For our Java program to dequeue messages, you must create a XStream Outbound server:<br /><pre>connect strmadmin/strmadmin<br /><br />begin<br /> dbms_xstream_adm.create_outbound(<br /> server_name => 'DEMO_SERVER',<br /> source_database => '&&dbname',<br /> table_names => 'source.t7',<br /> schema_names => null,<br /> capture_user => null,<br /> connect_user => null,<br /> comment => 'XStream OutBound Server Demonstration');<br />end;<br />/<br /><br />SELECT *<br /> FROM DBA_XSTREAM_OUTBOUND;<br /><br />select *<br /> from dba_apply<br />where purpose='XSTREAM OUT';<br /></pre><h2>Step 4: Create a JAVA XStream Outbound Client</h2>Create a <code>XStreamOutDemo.java</code> file, like below. Change the strings in red to meet your own configuration:<br /><pre>import java.sql.Connection;<br />import java.sql.DriverManager;<br /><br />import oracle.jdbc.internal.OracleConnection;<br /><br />import oracle.streams.XStreamOut;<br />import oracle.streams.ChunkColumnValue;<br />import oracle.streams.ColumnValue;<br />import oracle.streams.DefaultRowLCR;<br />import oracle.streams.LCR;<br />import oracle.streams.RowLCR;<br /><br />public class XStreamOutDemo {<br /><br />public static void main(String args[])<br />{<br />String out_url = "jdbc:oracle:oci:@<span style="font-weight: bold; color: rgb(255, 0, 0);">arkzoyd-easyteam:1521:BLACK</span>";<br /><br />/*<br />* Connect to the Database<br />*/<br />Connection out_conn = null;<br />try<br />{<br /> DriverManager.registerDriver(new oracle.jdbc.OracleDriver());<br /> out_conn=DriverManager.getConnection(out_url, "<span style="color: rgb(255, 0, 0); font-weight: bold;">strmadmin</span>", "<span style="color: rgb(255, 0, 0); font-weight: bold;">strmadmin</span>");<br />}<br />catch(Exception e)<br />{<br /> System.out.println("DB Connection Failed: " + out_url);<br /> e.printStackTrace();<br />}<br /><br />/*<br />* Get a XStream Out Handler<br />*/<br />XStreamOut xsOut=null;<br />byte[] lastPosition = null;<br /><br />try<br />{<br />// when attach to an outbound server, client needs to tell outbound<br />// server the last position.<br />xsOut = XStreamOut.attach((OracleConnection) out_conn, "DEMO_SERVER",<br /> lastPosition, XStreamOut.DEFAULT_MODE);<br />System.out.println("Attached to outbound server: DEMO_SERVER");<br />System.out.print("Last Position is: ");<br />if (lastPosition != null) { printHex(lastPosition); }<br /> else { System.out.println("NULL");}<br />}<br />catch(Exception e)<br />{<br />System.out.println("cannot attach to outbound server: DEMO_SERVER");<br />System.out.println(e.getMessage());<br />e.printStackTrace();<br />}<br /><br />byte[] processedLowPosition = null;<br />try<br />{<br />while(true)<br />{<br /> // receive an LCR from outbound server<br /> LCR alcr = xsOut.receiveLCR(XStreamOut.DEFAULT_MODE);<br /><br /> if (xsOut.getBatchStatus() == XStreamOut.EXECUTING) // batch is active<br /> {<br /> assert alcr != null;<br /><br /> // also get chunk data for this LCR if any<br /> if (alcr instanceof RowLCR)<br /> {<br /> // receive chunk from outbound then send to inbound<br /> if (((RowLCR)alcr).hasChunkData())<br /> {<br /> ChunkColumnValue chunk = null;<br /> do<br /> {<br /> chunk = xsOut.receiveChunk(XStreamOut.DEFAULT_MODE);<br /> } while (!chunk.isEndOfRow());<br /> }<br /> }<br /><br /> String command=alcr.getCommandType();<br /> if (!command.equals("COMMIT"))<br /> System.out.print(command+" on "+ alcr.getObjectOwner()+"."+<br /> alcr.getObjectName()+"\n");<br /> else System.out.print(command+"\n");<br /> if (command.equals("INSERT") || command.equals("UPDATE")) {<br /> System.out.print(" -- NEW VALUES ----------\n"); <br /> for (int i=0;i<((RowLCR) alcr).getNewValues().length;i++) {<br /> System.out.print(" Column:"+<br /> fixsize(((ColumnValue) ((RowLCR) alcr).<br /> getNewValues()[i]).getColumnName(),10)+<br /> "Value:"+<br /> fixsize(((ColumnValue) ((RowLCR) alcr).<br /> getNewValues()[i]).getColumnData().<br /> stringValue(),30)+"\n");<br /> }<br /> }<br /> if (command.equals("UPDATE") || command.equals("DELETE")) {<br /> System.out.print(" -- OLD VALUES ----------\n");<br /> for (int i=0;i<((RowLCR) alcr).getOldValues().length;i++) {<br /> System.out.print(" Column:"+<br /> fixsize(((ColumnValue) ((RowLCR) alcr).<br /> getOldValues()[i]).getColumnName(),10)+<br /> "Value:"+<br /> fixsize(((ColumnValue) ((RowLCR) alcr).<br /> getOldValues()[i]).getColumnData().<br /> stringValue(),30)+"\n");<br /> }<br /> }<br /> System.out.print(" -- DML ----------\n");<br /> System.out.println(" "+((DefaultRowLCR) alcr).getStatement(false));<br /> processedLowPosition = alcr.getPosition();<br /> if (null != processedLowPosition)<br /> xsOut.setProcessedLowWatermark(processedLowPosition,<br /> XStreamOut.DEFAULT_MODE);<br /> System.out.print("Last Position is: ");<br /> printHex(processedLowPosition);<br /> } else // batch is end<br /> { assert alcr == null; }<br /> }<br />} catch(Exception e) {<br /> System.out.println("exception when processing LCRs");<br /> System.out.println(e.getMessage());<br /> e.printStackTrace();<br />}<br />}<br /><br />public static void printHex(byte[] b) { for (int i = 0; i <>length) {<br />output=text.substring(0,text.length());<br />} else {<br />output=text;<br />for (int i=0; i<(length-text.length()); i++) {<br /> output=output+" ";<br /> }<br />}<br />return output;<br />}<br />}</pre>To compile and execute the program, run the script below:<br /><pre>export CLASSPATH=.:$CLASSPATH<br />export CLASSPATH=$ORACLE_HOME/jdbc/lib/ojdbc6.jar:$CLASSPATH<br />export CLASSPATH=$ORACLE_HOME/rdbms/jlib/xstreams.jar:$CLASSPATH<br />export JAVA_HOME=/opt/jdk1.6.0_13<br />export PATH=$JAVA_HOME/bin:$PATH<br /><br />javac XStreamOutDemo.java<br />java XStreamOutDemo</pre>To test the program, open another session to the database and enqueue messages int the table like below:<br /><pre>insert into source.t7 values (3,'X','X');<br />commit;<br />update source.t7 set text2='Y' where id=3;<br />commit;<br />delete from source.t7 where id=3;<br />commit;</pre>The output looks like below:<br /><pre>INSERT on SOURCE.T7<br />-- NEW VALUES ----------<br />Column:ID Value:3 <br />Column:TEXT1 Value:X <br />Column:TEXT2 Value:X <br />Last Position is: 0000001d856d00000001000000010000001d856c000000010000000101<br />COMMIT<br />Last Position is: 0000001d856d00000001000000010000001d856d000000010000000101<br />UPDATE on SOURCE.T7<br />-- NEW VALUES ----------<br />Column:TEXT2 Value:Y <br />-- OLD VALUES ----------<br />Column:ID Value:3 <br />Column:TEXT2 Value:X <br />Last Position is: 0000001d856f00000001000000010000001d856e000000010000000101<br />COMMIT<br />Last Position is: 0000001d856f00000001000000010000001d856f000000010000000101<br />DELETE on SOURCE.T7<br />-- OLD VALUES ----------<br />Column:ID Value:3 <br />Column:TEXT1 Value:X <br />Column:TEXT2 Value:Y <br />Last Position is: 0000001d857200000001000000010000001d8570000000010000000101<br />COMMIT<br />Last Position is: 0000001d857200000001000000010000001d8572000000010000000101</pre>To exit the program, type:<br /><pre>CTRL+C</pre><h2>Step 5: Drop the Configuration</h2>I hope you've found this example useful. To drop the configuration, run the script below:<br /><pre>connect strmadmin/strmadmin<br /><br />begin<br /> dbms_xstream_adm.drop_outbound(<br /> server_name=>'DEMO_SERVER');<br />end;<br />/<br /><br />SELECT *<br /> FROM DBA_XSTREAM_OUTBOUND;<br /><br />select * from dba_apply;<br />select * from dba_capture;<br />select * from dba_streams_table_rules;<br /><br />connect / as sysdba<br /><br />drop user source cascade;<br />drop user strmadmin cascade;</pre>You're done! It's nice to enter the new Oracle Data Integration generation with GoldenGate, don't you think?</span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com1tag:blogger.com,1999:blog-1041798052670608448.post-91842907414857767402009-10-04T13:45:00.000-07:002009-10-04T14:48:10.592-07:00XStream Inbound... A Sample Java ProgramOracle XStream is a new feature of Oracle Database 11g Release 2. Though it has been developed by the Streams team and is part of the database, XStream is part of GoldenGate! And now that the GoldenGate deal has been closed, 7 days before Oracle Openworld, it's easy to guess why XStream went out a few hours ago.<br /><br />This post is about XStream; it provides a Java program that uses "XStream In" to enqueue messages to a buffered queue. A Streams apply process applies them to a table named <code>SOURCE.T8</code>.<span class="fullpost"><br /><br />To know everything about XStream, read the following documentation:<br /><ul><li><a href="http://download.oracle.com/docs/cd/E11882_01/doc.112/e15874/toc.htm">Oracle® Database XStream Guide</a></li><li><a href="http://download.oracle.com/docs/cd/E11882_01/appdev.112/e11045/toc.htm">Oracle® Database XStream Java API Reference</a></li><li><a href="http://download.oracle.com/docs/cd/E15881_01/doc.104/e15889.pdf">Oracle GoldenGate 10.4 Licencing Guide</a></li></ul><br /><h2>Step 1: Create a Sample Schema</h2>For this example, you'll need a schema and a table:<br /><pre>sqlplus / as sysdba<br /><br />create user source<br /> identified by source<br /> default tablespace users<br /> temporary tablespace temp;<br /><br />grant connect,resource to source;<br /><br />col dbname new_value dbname<br /><br />select value dbname<br /> from v$parameter<br />where name='db_unique_name';<br /><br />prompt &&dbname<br /><br />connect source/source<br /><br />create table t8(<br /> id number primary key,<br /> text1 varchar2(80),<br /> text2 varchar2(80));<br /><br />commit;<br /></pre><h2>Step 2: Create a Streams Administrator</h2>To use XStream, you'll also need a Streams administrator and a queue to stage buffered messages :<br /><pre>connect / as sysdba<br /><br />create tablespace streams_tbs<br /> datafile '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'<br /> size 25M autoextend on maxsize 256M;<br /><br />CREATE USER strmadmin<br /> IDENTIFIED BY strmadmin<br /> DEFAULT TABLESPACE streams_tbs<br /> QUOTA UNLIMITED ON streams_tbs<br /> temporary tablespace temp;<br /><br />grant dba to strmadmin;<br /><br />begin<br /> dbms_streams_adm.set_up_queue(<br /> queue_table => 'strmadmin.streams_queue_table',<br /> queue_name => 'strmadmin.streams_queue');<br />end;<br />/<br /><br />exec dbms_streams_auth.grant_admin_privilege('strmadmin', true);<br /><br />select *<br /> from dba_streams_administrator;<br /></pre><h2>Step 3: Create a Simple XStream In Configuration</h2>You must create and start a XStream In configuration before you use a Java or OCI client to enqueue messages:<br /><pre>connect strmadmin/strmadmin<br /><br />BEGIN<br /> DBMS_XSTREAM_ADM.CREATE_INBOUND(<br /> server_name => 'xin',<br /> queue_name => 'xin_queue');<br />END;<br />/<br /><br />SELECT *<br /> FROM DBA_XSTREAM_INBOUND;<br /><br />set pages 1000<br />select *<br /> from dba_apply<br />where purpose='XSTREAM IN';<br /><br />exec DBMS_APPLY_ADM.START_APPLY('xin');<br /></pre><h2>Step 4: Create a JAVA XStream Inbound Client</h2>Create a <code>XStreamInDemo.java</code> file that enqueue messages to the buffered queue. Change the part in red to match your environment:<br /><pre>import java.sql.Connection;<br />import java.sql.DriverManager;<br />import java.sql.ResultSet;<br />import java.sql.Statement;<br /><br />import java.util.Date;<br /><br />import oracle.jdbc.internal.OracleConnection;<br /><br />import oracle.sql.CHAR;<br />import oracle.sql.DATE;<br />import oracle.streams.ColumnValue;<br />import oracle.streams.DefaultColumnValue;<br />import oracle.streams.DefaultRowLCR;<br />import oracle.streams.RowLCR;<br />import oracle.streams.XStreamIn;<br /><br />public class XStreamInDemo {<br /><br />public static void main(String args[])<br />{<br />String in_url = "jdbc:oracle:oci:@<span style="font-weight: bold; color: rgb(255, 102, 102);">arkzoyd-easyteam:1521:BLACK</span>";<br /><br />/*<br />* Connect to the Database<br />*/<br />Connection in_conn = null;<br />try<br />{<br />DriverManager.registerDriver(new oracle.jdbc.OracleDriver());<br />in_conn=DriverManager.getConnection(in_url, "<span style="font-weight: bold; color: rgb(255, 102, 102);">strmadmin</span>", "<span style="font-weight: bold; color: rgb(255, 102, 102);">strmadmin</span>");<br />}<br />catch(Exception e)<br />{<br />System.out.println("DB Connection Failed: " + in_url);<br />e.printStackTrace();<br />}<br /><br />/*<br />* Get a XStream In Handler<br />*/<br />XStreamIn xsIn=null;<br />String xsinName="XIN";<br />byte[] lastPosition = null;<br />int transaction=0;<br />int rank=0;<br />try<br />{<br />xsIn = XStreamIn.attach ((OracleConnection)in_conn, xsinName,<br /> "HI2" , XStreamIn.DEFAULT_MODE);<br /><br />// use last position to decide where should we start sending LCRs<br />System.out.println("Attached to inbound server:"+xsinName);<br />System.out.print("Inbound Server Last Position is: ");<br />lastPosition = xsIn.getLastPosition();<br />if (null == lastPosition)<br />{<br /> System.out.println("null"); <br /> transaction=1;<br /> rank=1;<br />}<br />else {<br /> printHex(lastPosition); System.out.println("");<br /> transaction = getTransaction(lastPosition);<br /> rank = getRank(lastPosition);<br /> if (rank==1) rank=2; else { rank=1; transaction++; }<br />}<br />}<br />catch(Exception e)<br />{<br />System.out.println("cannot attach to inbound server: "+xsinName);<br />System.out.println(e.getMessage());<br />e.printStackTrace();<br />} <br /><br />/*<br />* Create a<br />*/<br /><br />try {<br />DATE mydate;<br />DefaultRowLCR alcr;<br />byte[] processedLowPosition;<br /><br />while(true) {<br /> if (rank==1) {<br /> mydate = new DATE();<br /> System.out.println("-- " +<br /> Integer.toString(transaction) +<br /> " -------------------------");<br /> alcr=new DefaultRowLCR(<br /> "BLACK", RowLCR.INSERT, "SOURCE", "T8",<br /> "X."+Integer.toString(transaction), null,<br /> encode2bytes(transaction, rank), mydate);<br /> ColumnValue[] newcolumn= new ColumnValue[3];<br /> newcolumn[0]= new DefaultColumnValue("ID",<br /> new oracle.sql.NUMBER(transaction));<br /> newcolumn[1]= new DefaultColumnValue("TEXT1",<br /> new CHAR("Hello2", CHAR.DEFAULT_CHARSET));<br /> newcolumn[2]= new DefaultColumnValue("TEXT2",<br /> new CHAR("Hello2", CHAR.DEFAULT_CHARSET));<br /> alcr.setNewValues(newcolumn);<br /> xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE);<br /> System.out.println(" " +alcr.getStatement(false));<br /> xsIn.flush(XStreamIn.DEFAULT_MODE);<br /> rank++;<br /> Thread.sleep(500);<br /> } else {<br /> mydate = new DATE();<br /> alcr=new DefaultRowLCR(<br /> "BLACK", RowLCR.COMMIT, null, null,<br /> "X."+Integer.toString(transaction), null,<br /> encode2bytes(transaction, rank), mydate);<br /> xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE);<br /> xsIn.flush(XStreamIn.DEFAULT_MODE); <br /> System.out.println(" " +alcr.getStatement(false));<br /> rank=1;<br /> transaction++;<br /> Thread.sleep(500);<br /> }<br /> System.out.print(" ");<br /> processedLowPosition =<br /> xsIn.getProcessedLowWatermark();<br /> if (processedLowPosition != null) {<br /> System.out.print("processedLowPosition: ");<br /> printHex(processedLowPosition);<br /> System.out.print(" (" +<br /> Integer.toString(getTransaction(processedLowPosition))+<br /> ", " +<br /> Integer.toString(getRank(processedLowPosition)) +<br /> ")");<br /> }<br /> else {<br /> System.out.print("processedLowPosition: null");<br /> }<br /> lastPosition =<br /> xsIn.getLastPosition();<br /> System.out.println("");<br />}<br />} catch (Exception e) {<br />System.out.println("exception when processing LCRs");<br />System.out.println(e.getMessage());<br />e.printStackTrace();<br />}<br />}<br /><br />public static void printHex(byte[] b)<br />{<br />for (int i = 0; i < b.length; ++i)<br />{<br />System.out.print(<br /> Integer.toHexString((b[i]&0xFF) | 0x100).substring(1,3));<br />}<br />}<br /><br />public static byte[] encode2bytes(int transaction, int rank)<br />{<br />byte[] mybyte= new byte[5];<br />mybyte[0] =(byte)( transaction >> 24 );<br />mybyte[1] =(byte)( (transaction << 8) >> 24 );<br />mybyte[2] =(byte)( (transaction << 16) >> 24 );<br />mybyte[3] =(byte)( (transaction << 24) >> 24 );<br />mybyte[4] =(byte)( rank ) ;<br />return mybyte;<br />}<br /><br />public static int getTransaction(byte[] mybyte)<br />{<br />int i = 0;<br />int pos = 0;<br />i += ((int) mybyte[pos++] & 0xFF) << 24;<br />i += ((int) mybyte[pos++] & 0xFF) << 16;<br />i += ((int) mybyte[pos++] & 0xFF) << 8;<br />i += ((int) mybyte[pos] & 0xFF);<br />return i;<br />}<br /><br />public static int getRank(byte[] mybyte)<br />{<br /> int foo;<br /> foo =((int)mybyte[4] & 0xFF);<br /> return foo;<br />}<br />}</pre><blockquote>Notes:<br /><ul><li>getLastPosition should be used once after you attach the server to get the last message handled by XStream In</li><li>The client has to be OCI or Java/OCI. You must include the xstreams.jar library in the classpath</li></ul></blockquote><br />To compile the code, set the <code>CLASSPATH</code>, the <code>PATH</code> and the <code>JAVA_HOME</code>. One done, you can simply run it:<br /><pre>export CLASSPATH=.:$CLASSPATH<br />export CLASSPATH=$ORACLE_HOME/jdbc/lib/ojdbc6.jar:$CLASSPATH<br />export CLASSPATH=$ORACLE_HOME/rdbms/jlib/xstreams.jar:$CLASSPATH<br />export JAVA_HOME=/opt/jdk1.6.0_13<br />export PATH=$JAVA_HOME/bin:$PATH<br /><br />javac XStreamInDemo.java<br />java XStreamInDemo</pre><h2>Check messages are applied to the <code>SOURCE.T8</code> table</h2>The program enqueue messages. You can verify messages are applied to the table like in the script below:<br /><pre>sqlplus / as sysdba<br /><br />select count(*)<br /> from source.T8;<br /><br /><em> COUNT(*)<br />----------<br /> 1073</em><br /><br />/<br /><br /><em> COUNT(*)<br />----------<br /> 1076</em></pre><h2>Clean Up the Environment</h2>To clean up the environment, execute the script below:<br /><pre>connect strmadmin/strmadmin<br /><br />exec DBMS_APPLY_ADM.STOP_APPLY('xin');<br /><br />BEGIN<br /> DBMS_XSTREAM_ADM.DROP_INBOUND(<br /> server_name => 'xin');<br />END;<br />/<br /><br />select *<br /> from DBA_XSTREAM_INBOUND;<br /><br />select *<br /> from dba_apply<br />where purpose='XSTREAM IN';<br /><br />connect / as sysdba<br /><br />drop user source cascade;<br />drop user strmadmin cascade;<br /></pre></span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com2tag:blogger.com,1999:blog-1041798052670608448.post-2829050816020318382009-09-21T09:02:00.000-07:002009-09-21T14:40:04.002-07:00Triggers and LOBs: Synchronous CDC and Synchronous Streams CaptureJust a quick post to outline some of the limits of using LOBs and triggers together; there is no secret, it's written (black on white) in the manual : <a href="http://download.oracle.com/docs/cd/E11882_01/appdev.112/e10765/lobs.htm#ADDCI4383">depending how a LOB column is updated, triggers on the table will fire... or NOT</a> ! The reason is indeed related to the design of the LOB Locator that allow data chunk updates without any reference to the changed row.<br /><br />Anyway, I don't want to dig too much into the details of why things are like they are. Instead, you'll find below some PL/SQL code samples. Those pieces of code show that, if you work with the LOB locator, the triggers won't fire. On the other side, if you update the LOB columns directly or create a locator, in that case, the triggers will fire.<span class="fullpost"><br /><h2>Create a table and a "for each row" trigger</h2>To begin, we'll create a table with a trigger. That trigger increments the number of times it fires in a separate table named <code>xlog</code> :<br /><pre>create table x (id number, text clob)<br /> lob(text) store as securefiles;<br /><br />create table xlog (id number, call number);<br /><br />create or replace trigger x_trigger<br /> after insert or update or delete on x<br /> for each row<br />begin<br /> if inserting then<br /> insert into xlog (id, call) values (:new.id, 1);<br /> else<br /> update xlog set call=call+1 where id=:old.id;<br /> end if;<br />end;<br />/</pre><blockquote><span style="font-weight: bold; font-style: italic;">Note:</span><br />To test Oracle Database 11g Release 2 Streams new features (in a latter post), I've used a securefile LOB.</blockquote><h2>Changes to LOB does fire triggers</h2>To begin, we change a few row data without using any LOB persistent locator. As you'll discover in the <code>xlog</code> table every row changes fire the trigger:<br /><pre>insert into x values (1,null);<br />update x set text='X' where id=1;<br />update x set text='XY' where id=1;<br />commit;<br /><br />select x.text, xlog.call<br /> from x, xlog<br /> where x.id=1<br /> and xlog.id=1;<br /><br />TEXT CALL<br />---- ----<br /> XY 3</pre><h2>Changes to LOB doesn't fire triggers</h2>On the other side, if you change the row data with the persistent LOB Locator like below, you'll find that the trigger doesn't fire:<br /><pre>declare<br /> my_x clob;<br />begin<br /> select text into my_x<br /> from x where id=1<br /> for update;<br /> dbms_lob.append(my_x,'Z');<br /> dbms_lob.append(my_x,'Z');<br /> commit;<br />end;<br />/<br /><br />select x.text, xlog.call<br /> from x, xlog<br /> where x.id=1<br /> and xlog.id=1;<br /><br />TEXT CALL<br />---- ----<br />XYZZ 3<br /></pre> <h2>Conclusion & Cleanup<br /></h2>If you just think about it, this may feed your thoughts about the LOB support (or actually non support!) with Synchronous Change Data Capture (CDC) and Synchronous Streams Capture. But enough of conclusions for today, lets just drop the tables and its trigger:<br /><pre>drop table x purge;<br />drop table xlog purge;</pre></span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com3tag:blogger.com,1999:blog-1041798052670608448.post-47479599521864814102009-09-06T14:33:00.000-07:002009-09-06T15:56:56.832-07:00The New Streams 11.2 SQL Generation FacilityI must admit, I'm kind of disappointed by the Oracle 11.2 Streams new features. Not that there is nothing important: the support for SecureFile LOBs and compressed tables are really things, I've been eagerly waiting for. This new release is huge and looks promising! However, I'm disappointed for the following 3 reasons:<br /><ul><li>I did not manage to use statement handlers yet. Though I find them very easy to use for some specific use cases. But by hand or with <code>dbms_streams_adm.maintain_change_table</code>, they've always been failing with <code>ORA-01008: not all variables bound</code></li><li>I was kind of hoping for the following restriction to be handled: "The <code>DBMS_STREAMS_ADVISOR_ADM</code> package does not gather information about synchronous captures or messaging clients.".</li><li>Last but not least, there are obvious hidden and undocumented features which raise my level of frustration to its maximum.</li></ul>Anyway, I imagine I have no other choices than to be patient and wish those features can be fixed and documented, hopefully, as soon as possible. In the meantime, I propose we explore one of those new features called SQL Generation.<span class="fullpost"><br /><br />SQL Generation is the ability you get in a Custom DML Handler or any piece of code that deal with LCR to transform it into its canonical SQL command. What we'll do is write a very simple example that will use a DML handler to store that SQL command in a table instead of applying it to the destination schema.<h2>Step 1: Create a sample Schema</h2>To begin with the sample, create a source schema with a table and a few rows:<br /><pre>connect / as sysdba<br /><br />create user source<br /> identified by source<br /><span class="Apple-style-span" style=" ;font-size:16px;"> </span>default tablespace users<br /><span class="Apple-style-span" style=" ;font-size:16px;"> </span>temporary tablespace temp;<br /><br />grant connect,resource to source;<br /><br />col dbname new_value dbname<br /><br />select value dbname<br /> from v$parameter<br />where name='db_unique_name';<br /><br />prompt &&dbname<br /><br />connect source/source<br /><br />create table t5(<br /> id number primary key,<br /> text1 varchar2(80),<br /> text2 varchar2(80));<br /><br />insert into t5(id, text1, text2)<br /> values (1,'Text 1','Text 1');<br /><br />insert into t5(id, text1, text2)<br /> values (2,'Text 2','Text 2');<br /><br />commit;</pre><h2>Step 2: Create the Streams administrator and a queue</h2>Once done, you can create a Streams Administrator and a queue to use in your configuration:<br /><pre>connect / as sysdba<br /><br />create tablespace streams_tbs<br /> datafile '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'<br /> size 25M autoextend on maxsize 256M;<br /><br />CREATE USER strmadmin<br /> IDENTIFIED BY strmadmin<br /> DEFAULT TABLESPACE streams_tbs<br /> QUOTA UNLIMITED ON streams_tbs<br /> temporary tablespace temp;<br /><br />grant dba to strmadmin;<br /><br />begin<br /> dbms_streams_adm.set_up_queue(<br /> queue_table =>'strmadmin.streams_queue_table',<br /> queue_name =>'strmadmin.streams_queue');<br />end;<br />/<br /><br />exec dbms_streams_auth.grant_admin_privilege('strmadmin', true);<br /><br />select *<br /> from dba_streams_administrator;<br /><br />USERNAME LOC ACC<br />--------- --- ---<br />STRMADMIN YES YES</pre><h2>Step 3: Create a Capture Process</h2>Then create a capture process on the source schema:<br /><pre>connect / as sysdba<br /><br />var first_scn number;<br />set serveroutput on<br /><br />DECLARE<br /> scn NUMBER;<br />BEGIN<br /> DBMS_CAPTURE_ADM.BUILD(<br /> first_scn => scn);<br /> DBMS_OUTPUT.PUT_LINE('First SCN Value = ' || scn);<br /> :first_scn := scn;<br />END;<br />/<br /><br />col first_scn new_value first_scn<br />select :first_scn first_scn<br /> from dual;<br /><br />connect strmadmin/strmadmin<br /><br />prompt &&first_scn<br />prompt &&dbname<br /><br />BEGIN<br /> DBMS_CAPTURE_ADM.CREATE_CAPTURE(<br /> queue_name => 'strmadmin.streams_queue',<br /> capture_name => 'SQLGEN_CAPTURE',<br /> rule_set_name => NULL,<br /> source_database => '&&dbname',<br /> use_database_link => false,<br /> first_scn => &&first_scn,<br /> logfile_assignment => 'implicit');<br />END;<br />/<br /><br />col capture_name format a15<br />col queue_name format a13<br />col first_scn format 999999999999<br />col start_scn format 999999999999<br />col rule_set_name format a11<br /><br />select capture_name,<br /> queue_name,<br /> first_scn,<br /> start_scn,<br /> rule_set_name<br /> from dba_capture;<br /><br />BEGIN<br /> DBMS_STREAMS_ADM.ADD_TABLE_RULES(<br /> table_name => 'source.t5',<br /> streams_type => 'capture',<br /> streams_name => 'SQLGEN_CAPTURE',<br /> queue_name => 'strmadmin.streams_queue',<br /> include_dml => true,<br /> include_ddl => false,<br /> include_tagged_lcr => true,<br /> source_database => '&&dbname',<br /> inclusion_rule => true);<br />END;<br />/<br /><br />set lines 120<br />col streams_name format a16<br />col streams_type format a9<br />col table format a15<br />col rule_type format a8<br />col rule_name format a15<br />col rule_condition format a60 wor wra<br /><br />select streams_name,<br /> streams_type,<br /> table_owner||'.'||table_name "TABLE",<br /> rule_type,<br /> rule_name,<br /> rule_condition<br /> from dba_streams_table_rules<br />where streams_name='SQLGEN_CAPTURE'<br /> and table_owner='SOURCE'<br /> and table_name='T5';</pre><h2>Step 4: Create a procedure and a table to store the SQL of the LCR</h2>Don't apply the LCR on the destination; instead, create a table and a procedure that will use SQL Generation to store the change vector as a DML command in that table:<br /><pre>connect strmadmin/strmadmin<br /><br />create table mydml_tab(<br /> id number,<br /> sqltext clob);<br /><br />create sequence mydml_seq;<br /><br />create or replace procedure mydml_handler(in_any in anydata)<br />is<br /> lcr sys.lcr$_row_record;<br /> v_sqltext clob:=' /* COMMENT */';<br /> rc number;<br />begin<br /> rc := in_any.GETOBJECT(lcr);<br /><span class="Apple-style-span" style="color:#FF0000;"><b> lcr.get_row_text(v_sqltext);</b></span><br /> insert into mydml_tab<br /> values (mydml_seq.nextval,v_sqltext);<br />end;<br />/</pre><h2>Step 5: Create an Apply process with a DML handler</h2>Create an apply process and add the DML handler to it:<br /><pre>connect strmadmin/strmadmin<br /><br />BEGIN<br /> DBMS_STREAMS_ADM.ADD_TABLE_RULES(<br /> table_name => 'source.t5',<br /> streams_type => 'apply',<br /> streams_name => 'SQLGEN_APPLY',<br /> queue_name => 'strmadmin.streams_queue',<br /> include_dml => true,<br /> include_ddl => false,<br /> include_tagged_lcr => true,<br /> source_database => '&&dbname',<br /> inclusion_rule => true);<br />END;<br />/<br />col apply_name format a13<br />col queue_name format a13<br />col rule_set_name format a11<br /><br />select apply_name,<br /> queue_name,<br /> rule_set_name,<br /> status<br /> from dba_apply;<br /><br />set lines 120<br />col streams_name format a16<br />col streams_type format a9<br />col table_owner format a11<br />col table_name format a15<br />col rule_type format a8<br />col rule_name format a15<br /><br />select STREAMS_NAME,<br /> STREAMS_TYPE,<br /> TABLE_OWNER,<br /> TABLE_NAME,<br /> RULE_TYPE,<br /> RULE_NAME<br /> from DBA_STREAMS_TABLE_RULES<br />where STREAMS_NAME='SQLGEN_APPLY';<br /><br />begin<br /> dbms_apply_adm.set_dml_handler(<br /> object_name => 'SOURCE.T5',<br /> object_type => 'TABLE',<br /> operation_name => 'INSERT',<br /> error_handler => false,<br /> user_procedure => 'strmadmin.mydml_handler',<br /> apply_name => 'SQLGEN_APPLY');<br />end;<br />/<br /><br />begin<br /> dbms_apply_adm.set_dml_handler(<br /> object_name => 'SOURCE.T5',<br /> object_type => 'TABLE',<br /> operation_name => 'UPDATE',<br /> error_handler => false,<br /> user_procedure => 'strmadmin.mydml_handler',<br /> apply_name => 'SQLGEN_APPLY');<br />end;<br />/<br /><br />begin<br /> dbms_apply_adm.set_dml_handler(<br /> object_name => 'SOURCE.T5',<br /> object_type => 'TABLE',<br /> operation_name => 'DELETE',<br /> error_handler => false,<br /> user_procedure => 'strmadmin.mydml_handler',<br /> apply_name => 'SQLGEN_APPLY');<br />end;<br />/<br /><br />begin<br /> dbms_apply_adm.set_table_instantiation_scn(<br /> source_object_name => 'source.t5',<br /> source_database_name => '&&dbname',<br /> instantiation_scn => &&first_scn);<br />end;<br />/<br /><br />col "OBJ3CT" format a15<br />col operation_name format a8<br />col user_procedure format a28<br />col apply_name format a13<br /><br />select object_owner||'.'||object_name "OBJ3CT",<br /> operation_name,<br /> user_procedure,<br /> apply_name<br /> from dba_apply_dml_handlers;<br /><br />col source_database format a10<br />col "OBJECT" format a15<br />set numwidth 15<br />select source_database,<br /> SOURCE_OBJECT_OWNER||'.'||SOURCE_OBJECT_NAME "OBJECT",<br /> instantiation_scn<br /> from dba_apply_instantiated_objects;</pre><h2>Step 6: Start the Apply and Capture processes</h2>Now start the apply and the capture processes and wait for them to keep up with the current position of the log writer:<br /><pre>exec dbms_apply_adm.start_apply('SQLGEN_APPLY')<br />exec dbms_capture_adm.start_capture('SQLGEN_CAPTURE')</pre><h2>Step 7: Test the SQL Generation</h2>You can test your settings by inserting or updating a row in the source table; After some time, query the table that stores the generated SQL:<br /><pre>insert into source.t5 values (3,'Hello','Hello');<br />commit;<br /><br />-- Wait a few minutes<br />col id format 99<br />col sqltext format a50<br />set long 1000<br />set longchunksize 1000<br />select * from mydml_tab;</pre>The table content should look like below:<br /><pre> ID SQLTEXT<br />--- --------------------------------------------------<br />1 /* COMMENT */ INSERT INTO "SOURCE"."T5"("ID","TEX<br /> T1","TEXT2" ) VALUES ( 3,'Hello','Hello')</pre>You can perform more tests with <code>UPDATE</code> or <code>DELETE</code><br /><h2>Step 8: Drop the test environment</h2>Like always with my example, I propose you drop the whole configuration before you leave:<br /><pre>connect / as sysdba<br /><br />exec dbms_apply_adm.stop_apply('SQLGEN_APPLY')<br />exec dbms_capture_adm.stop_capture('SQLGEN_CAPTURE')<br /><br />exec dbms_apply_adm.delete_all_errors('SQLGEN_APPLY');<br />exec dbms_apply_adm.drop_apply('SQLGEN_APPLY')<br />exec dbms_capture_adm.drop_capture('SQLGEN_CAPTURE')<br /><br />begin<br />for i in (select source_object_owner||'.'||<br /> source_object_name name<br /> from dba_apply_instantiated_objects<br /> where source_object_owner in ('SOURCE'))<br />loop<br />dbms_apply_adm.set_table_instantiation_scn(<br /> source_object_name => i.name,<br /> source_database_name => '&&dbname',<br /> instantiation_scn => null);<br />end loop;<br />end;<br />/<br /><br />begin<br />for i in (select object_owner||'.'||<br /> object_name name,<br /> operation_name,<br /> apply_name<br /> from dba_apply_dml_handlers<br /> where object_owner in ('SOURCE'))<br />loop<br />dbms_apply_adm.set_dml_handler(<br /> object_name => i.name,<br /> object_type => 'TABLE',<br /> operation_name=> i.operation_name,<br /> user_procedure=> null,<br /> apply_name => i.apply_name);<br />end loop;<br />end;<br />/<br /><br />exec dbms_streams_adm.remove_queue('strmadmin.streams_queue',true,true);<br /><br />drop user strmadmin cascade;<br /><br />drop tablespace streams_tbs<br />including contents and datafiles;<br /><br />drop user source cascade;</pre></span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com1tag:blogger.com,1999:blog-1041798052670608448.post-14817003749196242632009-08-27T15:00:00.000-07:002009-08-27T16:55:52.203-07:00Tracking Streams Changes with V$STREAMS_MESSAGE_TRACKING<code>V$STREAMS_MESSAGE_TRACKING</code> is one of the best new features of Oracle Streams 11g. For a full description and an example, check the <a href="http://download.oracle.com/docs/cd/B28359_01/server.111/b28322/mon_rep.htm#BABBFAJD">"Tracking LCRs Through a Stream"</a> section of the documentation. In this post, we'll be creating a procedure after setting a 1-way Streams replication and the message tracking. You'll see how easy to use it is. To shorten this post, I won't be describing how to configure the replication. The 2 scripts attached, should help you to quickly get to it:<ul><li><a href="http://arkzoyd.free.fr/wedostreams/05_StreamsSchema1WayReplication.sql">05_StreamsSchema1WayReplication.sql</a>, assuming you have 2 databases and you can connect with SQL*Plus as oracle, creates a 1-way streams replication of the DEMO schema</li><li><a href="http://arkzoyd.free.fr/wedostreams/05_StreamsSchema1WayReplication_CleanUp.sql">05_StreamsSchema1WayReplication_CleanUp.sql</a> drops the whole configuration.</li></ul><span class="fullpost">Connect to the source database (i.e. <code>BLACK</code>), set the label and create a procedure in the <code>DEMO</code> schema:<pre>. oraenv<br />BLACK<br /><br />sqlplus / as sysdba<br /><br />set serveroutput on<br /><br />declare<br />tracking_label VARCHAR2(15);<br />begin<br />dbms_streams_adm.set_message_tracking(<br />tracking_label => 'ARKZOYD',<br />actions => DBMS_STREAMS_ADM.ACTION_MEMORY);<br />END;<br />/<br /><br />create or replace procedure demo.hizoyd is<br />begin<br />null;<br />end;<br />/</pre>Check the view to see how the message has been managed by Streams:<pre>set lines 120<br />col component_name format a19<br />col component_type format a18<br />col action format a23<br />col object_owner format a14<br />col object_name format a14<br />col command_type format a16<br />select component_name,<br /> component_type,<br /> action,<br /> object_owner,<br /> object_name,<br /> command_type<br />from V$STREAMS_MESSAGE_TRACKING<br />where tracking_label='ARKZOYD'<br />order by timestamp;</pre>Here is the result:<pre>COMPONENT_NAME COMPONENT_TYPE ACTION OBJECT_OWNER OBJECT_NAME COMMAND_TYPE<br />----------------- ----------------- ----------------------- ------------- -------------- ----------------<br />STREAMS_CAPTURE CAPTURE Created Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE<br />STREAMS_CAPTURE CAPTURE Rule evaluation Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE<br />STREAMS_CAPTURE CAPTURE Enqueue Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE<br />STREAMS_CAPTURE CAPTURE Created COMMIT<br />STREAMS_CAPTURE CAPTURE Enqueue COMMIT<br />STREAMS_PROPAGATION PROPAGATION SENDER Dequeued Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE<br />STREAMS_PROPAGATION PROPAGATION SENDER Propagation Sender sent Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE<br />STREAMS_PROPAGATION PROPAGATION SENDER Dequeued COMMIT<br />STREAMS_PROPAGATION PROPAGATION SENDER Propagation Sender sent COMMIT</pre>The tracking is propagated to the destination database (i.e. <code>WHITE</code>) like you can see below :<pre>exit<br /><br />. oraenv<br />WHITE<br /><br />sqlplus / as sysdba<br /><br />set pages 20<br />set lines 120<br />col component_name format a19<br />col component_type format a20<br />col action format a28<br />col object_owner format a14<br />col object_name format a14<br />col command_type format a16<br />select component_name,<br /> component_type,<br /> action,<br /> object_owner,<br /> object_name,<br /> command_type<br />from V$STREAMS_MESSAGE_TRACKING<br />where tracking_label='ARKZOYD'<br />order by timestamp;</pre>Here is the result:<pre>COMPONENT_NAME COMPONENT_TYPE ACTION OBJECT_OWNER OBJECT_NAME COMMAND_TYPE<br />----------------- ------------------ ------------------------- ------------- -------------- ----------------<br />STREAMS_PROPAGATION PROPAGATION RECEIVER Propagation receiver enqueue Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE<br />STREAMS_PROPAGATION PROPAGATION RECEIVER Rule evaluation Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE<br />STREAMS_PROPAGATION PROPAGATION RECEIVER Rule evaluation Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE<br />STREAMS_PROPAGATION PROPAGATION RECEIVER Propagation receiver enqueue Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE<br />STREAMS_PROPAGATION PROPAGATION RECEIVER Propagation receiver enqueue COMMIT<br />STREAMS_PROPAGATION PROPAGATION RECEIVER Rule evaluation COMMIT<br />STREAMS_PROPAGATION PROPAGATION RECEIVER Rule evaluation COMMIT<br />STREAMS_PROPAGATION PROPAGATION RECEIVER Propagation receiver enqueue COMMIT<br />STREAMS_APPLY APPLY READER Dequeued Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE<br />STREAMS_APPLY APPLY READER Dequeued COMMIT<br />STREAMS_APPLY APPLY SERVER Apply executed Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE<br />STREAMS_APPLY APPLY SERVER Commit COMMIT</pre>And you can check the procedure has been replicated to the <code>WHITE</code> database:<pre>select text <br /> from dba_source<br /> where owner='DEMO'<br /> and name='HIZOYD'<br /> order by line;<br /><br />TEXT<br />-------------------<br />procedure hizoyd is<br /> begin<br /> null;<br /> end;</pre></span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com0tag:blogger.com,1999:blog-1041798052670608448.post-72494404499506736572009-08-27T08:53:00.000-07:002009-08-27T08:54:19.552-07:00Are your triggers triggered by Streams?That sounds like a fair question, don't you think? Like often with Oracle the answer is split between "It depends" and "That's not as easy as I thought first". Let's start with how it's supposed to work; From the documentation, How an apply process behaves depends on the <code>fire_once</code> property of the trigger:<ul><li>If the trigger <code>fire_once</code> is <code>TRUE</code>, then the trigger is not executed by an apply process</li><li>If the trigger <code>fire_once</code> is <code>FALSE</code>, then the trigger is suppose to be always executed, wether that's a regular DML/DDL command or an Streams apply process performing the change</li></ul>That sounds easy. Lets have a look with a concrete example!<span class="fullpost"><br /><br />To build the test case, we'll implement a simple Streams One-Way Replication. To do it in 10 minutes, assuming your database is in archivelog, follow Step 1 to Step 9 of the "<a href="http://wedostreams.blogspot.com/2009/01/oracle-streams-101.html">Oracle Streams One Way Table Replication 101</a>" post. Ready?<h2>Test when <code>fire_once</code> is <code>TRUE</code> (i.e. Default)</h2>To begin, we'll create a table and a trigger in the <code>DESTINATION</code> schema; we'll check if the trigger is supposed to fire ONCE or ALWAYS:<pre>connect / as sysdba<br /><br />create table destination.t1_x<br /> (id number primary key,<br /> text varchar2(80));<br /><br />create or replace trigger destination.t1_x_trigger<br /> after insert on destination.T1 for each row <br /> begin<br /> insert into T1_X(id, text)<br /> values (:new.id, :new.text);<br /> end;<br />/<br /><br />set serveroutput on<br />begin<br /> if (dbms_ddl.IS_TRIGGER_FIRE_ONCE('DESTINATION', 'T1_X_TRIGGER'))<br /> then<br /> dbms_output.put_line('Trigger FIRE_ONCE=TRUE');<br /> else<br /> dbms_output.put_line('Trigger FIRE_ONCE=FALSE');<br /> end if;<br />end;<br />/<br /><br /><strong>Trigger FIRE_ONCE=TRUE</strong></pre>So that's really the default and that trigger is not supposed to be executed by an apply process. But before, we can test the trigger is working fine by inserting a value in the <code>DESTINATION.T1</code> table manually (That will make the 2 tables diverge but who cares?):<pre>insert into DESTINATION.T1<br /> values (9999, 'Text 9999');<br />commit;<br /><br />select * <br /> from DESTINATION.T1_X;<br /><br /> ID TEXT<br />---- ---------<br />9999 Text 9999</pre>Now that we are sure the tigger is working fine, we can test the trigger is not executed by the apply process. We'll change the <code>SOURCE.T1</code>table this time and will check the values in changed on the <code>DESTINATION.T1</code> but not in <code>DESTINATION.T1_X</code> as expected:<pre>insert into SOURCE.T1<br /> values (4, 'Text 4');<br />commit;<br /><br /><strong>pause</strong><br /><br />select * <br /> from DESTINATION.T1;<br /><br /> ID TEXT<br />---- ---------<br /> 1 Text 1<br /> 2 Text 2<br /> 3 Text 3<br />9999 Text 9999<br /> 4 Text 4<br /><br />select * <br /> from DESTINATION.T1_X;<br /><br /> ID TEXT<br />---- ---------<br />9999 Text 9999</pre>Very good!<h2>Test when <code>fire_once</code> is <code>FALSE</code></h2>Now we'll change the trigger's behavior and we'll perform another test to see if the trigger is executed when the change is applied to the <code>DESTINATION.T1</code> table:<pre>begin<br /> DBMS_DDL.SET_TRIGGER_FIRING_PROPERTY(<br /> trig_owner => 'DESTINATION', <br /> trig_name => 'T1_X_TRIGGER',<br /> fire_once => false);<br />end;<br />/<br /><br />set serveroutput on<br />begin<br /> if (dbms_ddl.IS_TRIGGER_FIRE_ONCE('DESTINATION', 'T1_X_TRIGGER'))<br /> then<br /> dbms_output.put_line('Trigger FIRE_ONCE=TRUE');<br /> else<br /> dbms_output.put_line('Trigger FIRE_ONCE=FALSE');<br /> end if;<br />end;<br />/<br /><br /><strong>Trigger FIRE_ONCE=FALSE</strong><br /><br />insert into SOURCE.T1<br /> values (5, 'Text 5');<br />commit;<br /><br /><strong>pause</strong><br /><br />select * <br /> from DESTINATION.T1;<br /><br /> ID TEXT<br />---- ---------<br /> 1 Text 1<br /> 2 Text 2<br /> 3 Text 3<br />9999 Text 9999<br /> 4 Text 4<br /> 5 Text 5<br /><br />select * <br /> from DESTINATION.T1_X;<br /><br /> ID TEXT<br />---- ---------<br />9999 Text 9999</pre>But wait a minute! Wasn't that trigger supposed to be executed? Let's restart all the processes to see if anything changes:<pre>exec dbms_capture_adm.stop_capture('STREAMS_CAPTURE');<br />exec dbms_apply_adm.stop_apply('STREAMS_APPLY');<br />exec dbms_capture_adm.start_capture('STREAMS_CAPTURE');<br />exec dbms_apply_adm.start_apply('STREAMS_APPLY');<br /><br />set serveroutput on<br />begin<br /> if (dbms_ddl.IS_TRIGGER_FIRE_ONCE('DESTINATION', 'T1_X_TRIGGER'))<br /> then<br /> dbms_output.put_line('Trigger FIRE_ONCE=TRUE');<br /> else<br /> dbms_output.put_line('Trigger FIRE_ONCE=FALSE');<br /> end if;<br />end;<br />/<br /><br /><strong>Trigger FIRE_ONCE=FALSE</strong><br /><br />insert into SOURCE.T1<br /> values (6, 'Text 6');<br /><br />commit;<br /><br />select * <br /> from DESTINATION.T1;<br /> 2 <br /> ID TEXT<br />---- ----------<br /> 1 Text 1<br /> 2 Text 2<br /> 3 Text 3<br />9999 Text 9999<br /> 4 Text 4<br /> 5 Text 5<br /> 6 Text 6<br /><br />select * <br /> from DESTINATION.T1_X;<br /><br /> ID TEXT<br />---- ----------<br />9999 Text 9999</pre>No, Same Story! Actually, I had to recreate the trigger to make it work:<pre>exec dbms_apply_adm.stop_apply('STREAMS_APPLY');<br />drop trigger destination.t1_x_trigger;<br /><br />create or replace trigger destination.t1_x_trigger<br /> after insert on destination.T1 for each row <br /> begin<br /> insert into T1_X(id, text)<br /> values (:new.id, :new.text);<br /> end;<br />/<br /><br />begin<br /> DBMS_DDL.SET_TRIGGER_FIRING_PROPERTY(<br /> trig_owner => 'DESTINATION', <br /> trig_name => 'T1_X_TRIGGER',<br /> fire_once => false);<br />end;<br />/<br /><br />set serveroutput on<br />begin<br /> if (dbms_ddl.IS_TRIGGER_FIRE_ONCE('DESTINATION', 'T1_X_TRIGGER'))<br /> then<br /> dbms_output.put_line('Trigger FIRE_ONCE=TRUE');<br /> else<br /> dbms_output.put_line('Trigger FIRE_ONCE=FALSE');<br /> end if;<br />end;<br />/<br /><br /><strong>Trigger FIRE_ONCE=FALSE</strong><br /><br />exec dbms_apply_adm.start_apply('STREAMS_APPLY');<br /><br />insert into SOURCE.T1<br /> values (7, 'Text 7');<br /><br />commit;<br /><br />select * <br /> from DESTINATION.T1;<br /><br /> ID TEXT<br />---- ----------<br /> 1 Text 1<br /> 2 Text 2<br /> 3 Text 3<br />9999 Text 9999<br /> 4 Text 4<br /> 5 Text 5<br /> 6 Text 6<br /> 7 Text 7<br /><br />select * <br /> from DESTINATION.T1_X;<br /><br /> ID TEXT<br />---- ----------<br />9999 Text 9999<br /> 7 Text 7</pre>Expected Behavior? Bug? I'm still split! What do you think? btw, don't forget <a href="http://wedostreams.blogspot.com/2009/01/oracle-streams-101.html#streams101p10">to clean up your Streams configuration</a>.</span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com2tag:blogger.com,1999:blog-1041798052670608448.post-55295650545211869802009-08-27T01:17:00.000-07:002009-08-27T04:26:27.958-07:00Data Comparison with DBMS_COMPARISONThe new 11g <code>DBMS_COMPARISON</code> package helps to detect how data of 2 tables differ. The interface is easy to use and quite powerful. Not only, it allows to compare and converge the whole content of 2 tables but you can also work with data samples and with views. This post shows how to use this feature and include flashback query. This can be very useful to use in combination with always changing data, you can only block for a few seconds, of a production system.<span class="fullpost"><br /><h2>Limits and Requirements of DBMS_COMPARISON</h2>Before we start, refer to the limits of <code>DBMS_COMPARISON</code>:<ul><li><a href="http://download.oracle.com/docs/cd/B28359_01/appdev.111/b28419/d_comparison.htm#CJHDJJDF">Database Character Set Requirements for the DBMS_COMPARISON Package</a></li><li><a href="http://download.oracle.com/docs/cd/B28359_01/appdev.111/b28419/d_comparison.htm#CJHCFIDI">Database Object Requirements for the DBMS_COMPARISON Package</a></li><li><a href="http://download.oracle.com/docs/cd/B28359_01/appdev.111/b28419/d_comparison.htm#CJHFEEGA">Index Column Requirements for the DBMS_COMPARISON Package</a></li><li><a href="http://download.oracle.com/docs/cd/B28359_01/appdev.111/b28419/d_comparison.htm#CJHHBEBE">Datatype Requirements for the DBMS_COMPARISON Package</a></li></ul><h2>DBMS_COMPARISON and FLASHBACK Query</h2>We'll compare the content of a table with a view that contains a flashback clause. In that case, the view is on the table itself; here is the script to create our example: <pre>create user demo<br />identified by demo<br />default tablespace users<br />temporary tablespace temp;<br /><br />grant connect, resource to demo;<br /><br />create table DEMO.T1 (<br />id number,<br />field1 varchar2(1000),<br />field2 varchar2(1000),<br />constraint T1_PK primary key(id));<br /><br />insert into demo.T1<br />(select rownum, to_char(rownum),to_char(rownum)<br /> from dual<br />connect by level <=10000); <br /><br />commit; <br /><br />col scn new_value scn<br /><br />select dbms_flashback.get_system_change_number scn<br /> from dual;<br /><br />insert into demo.T1<br /> (select 10000+rownum, to_char(rownum),to_char(rownum)<br /> from dual connect by level <=10000);<br /><br />commit;<br /><br />create view demo.t1v<br /> as select * from t1 as of scn &&scn;</pre>Once the schema created, we can use <code>create_comparison</code> and <code>compare</code> to find the differences between the table and the view:<pre>begin<br />dbms_comparison.create_comparison(<br /> COMPARISON_NAME => 'mycomparison',<br /> SCHEMA_NAME => 'demo', <br /> OBJECT_NAME => 't1v',<br /> DBLINK_NAME => null, <br /> REMOTE_SCHEMA_NAME => 'demo', <br /> REMOTE_OBJECT_NAME => 'T1',<br /> COMPARISON_MODE =><br /> DBMS_COMPARISON.CMP_COMPARE_MODE_OBJECT,<br /> COLUMN_LIST => '*',<br /> SCAN_MODE =><br /> DBMS_COMPARISON.CMP_SCAN_MODE_FULL);<br />end;<br />/<br /><br />var scanid number;<br /><br />SET SERVEROUTPUT ON<br />DECLARE<br />consistent BOOLEAN;<br />scan_info DBMS_COMPARISON.COMPARISON_TYPE;<br />BEGIN<br />consistent := DBMS_COMPARISON.COMPARE(<br /> comparison_name => 'mycomparison',<br /> scan_info => scan_info,<br /> perform_row_dif => TRUE);<br />:scanid:=scan_info.scan_id;<br />DBMS_OUTPUT.PUT_LINE('Scan ID: '||scan_info.scan_id);<br />IF consistent=TRUE THEN<br /> DBMS_OUTPUT.PUT_LINE('No differences were found.');<br />ELSE<br /> DBMS_OUTPUT.PUT_LINE('Differences were found.');<br />END IF;<br />END;<br />/<br /><br />COLUMN OWNER HEADING 'Comparison Owner' FORMAT A16<br />COLUMN COMPARISON_NAME HEADING 'Comparison Name' FORMAT A20<br />COLUMN SCHEMA_NAME HEADING 'Schema Name' FORMAT A11<br />COLUMN OBJECT_NAME HEADING 'Object Name' FORMAT A11<br />COLUMN CURRENT_DIF_COUNT HEADING 'Differences' FORMAT 9999999<br /><br />SELECT c.OWNER,<br />c.COMPARISON_NAME,<br />c.SCHEMA_NAME,<br />c.OBJECT_NAME,<br />s.CURRENT_DIF_COUNT<br />FROM DBA_COMPARISON c, DBA_COMPARISON_SCAN_SUMMARY s<br />WHERE c.COMPARISON_NAME = s.COMPARISON_NAME AND<br /> c.OWNER = s.OWNER AND<br /> s.SCAN_ID = :scanid;<br /><br />Comparison Owner Comparison Name Schema Name Object Name Differences<br />---------------- -------------------- ----------- ----------- -----------<br />SYS MYCOMPARISON DEMO T1V 10000<br /></pre>We can even make the table converge with the view:<pre>SET SERVEROUTPUT ON<br />DECLARE<br />scan_info DBMS_COMPARISON.COMPARISON_TYPE;<br />BEGIN<br />DBMS_COMPARISON.CONVERGE(<br /> comparison_name => 'mycomparison',<br /> scan_id => :scanid,<br /> scan_info => scan_info,<br /> converge_options => DBMS_COMPARISON.CMP_CONVERGE_LOCAL_WINS);<br />DBMS_OUTPUT.PUT_LINE('Local Rows Merged: '||scan_info.loc_rows_merged);<br />DBMS_OUTPUT.PUT_LINE('Remote Rows Merged: '||scan_info.rmt_rows_merged);<br />DBMS_OUTPUT.PUT_LINE('Local Rows Deleted: '||scan_info.loc_rows_deleted);<br />DBMS_OUTPUT.PUT_LINE('Remote Rows Deleted: '||scan_info.rmt_rows_deleted);<br />END;<br />/<br /><br />Local Rows Merged: 0<br />Remote Rows Merged: 0<br />Local Rows Deleted: 0<br />Remote Rows Deleted: 10000<br /><br />select count(*) from demo.t1;<br /><br />COUNT(*)<br />----------<br /> 10000<br /><br />select max(id) from demo.t1;<br /><br /> MAX(ID)<br />----------<br /> 10000<br /></pre>You'll be able to do much more from this example; to learn more, check:<br /><ul><li><a href="http://download.oracle.com/docs/cd/B28359_01/appdev.111/b28419/d_comparison.htm">DBMS_COMPARISON</a></li><li><a href="http://download.oracle.com/docs/cd/B28359_01/server.111/b28322/man_comp.htm#STREP146">Oracle Streams Replication Administrator's Guide - 12 Comparing and Converging Data</a>.</li></ul> You can drop the comparison and the demo schema for your next test:<br /><pre>exec dbms_comparison.drop_comparison('mycomparison');<br />drop user demo cascade;</pre><h2>How does DBMS_COMPARISON work?</h2>The chapter of the Streams Replication Administrator's Guide explains how the comparison is performed and that <code>ora_hash</code> is used. To get more details, I've traced the <code>compare</code> procedure and the main query look like that:<pre>SELECT ll.l_rowid, rr.r_rowid, NVL(ll."ID", rr."ID") idx_val<br /> FROM<br />(SELECT l.rowid l_rowid, l."ID",<br /> ora_hash(<br /> NVL(to_char(l."ID"),'ORA$STREAMS$NV'),<br /> 4294967295,<br /> ora_hash(<br /> NVL((l."FIELD1"), 'ORA$STREAMS$NV'),<br /> 4294967295,<br /> ora_hash(<br /> NVL((l."FIELD2"), 'ORA$STREAMS$NV'),<br /> 4294967295,<br /> 0<br /> )<br /> )<br /> ) l_hash <br /> FROM "DEMO"."T1" l<br /> WHERE l."ID">=:scan_min1 AND l."ID"<=:scan_max1 ) ll <br />FULL OUTER JOIN<br />(SELECT /*+ NO_MERGE REMOTE_MAPPED */ r.rowid r_rowid, r."ID",<br /> ora_hash(<br /> NVL(to_char(r."ID"), 'ORA$STREAMS$NV'),<br /> 4294967295,<br /> ora_hash(<br /> NVL((r."FIELD1"), 'ORA$STREAMS$NV'),<br /> 4294967295,<br /> ora_hash(<br /> NVL((r."FIELD2"), 'ORA$STREAMS$NV'),<br /> 4294967295,<br /> 0<br /> )<br /> )<br /> ) r_hash<br /> FROM "DEMO"."T2" r<br /> WHERE r."ID">=:scan_min1 <br /> AND r."ID"<=:scan_max1 ) rr <br />ON ll."ID"=rr."ID"<br />WHERE ll.l_hash IS NULL<br /> OR rr.r_hash IS NULL<br /> OR ll.l_hash <> rr.r_hash;</pre>It gives a good idea of what we can expect from the tool, no?</span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com0tag:blogger.com,1999:blog-1041798052670608448.post-19362826819555857102009-08-26T05:33:00.000-07:002009-08-26T05:41:11.306-07:00Oracle AQ Buffered Queues 101 (Part 2)I would like to get back to the <a href="http://wedostreams.blogspot.com/2009/01/oracle-aq-buffered-queues-101.html">Oracle AQ Buffered Queues 101</a> post, I've written a few months back. What you'll find below is an enhanced example, that may better suit your needs. As a matter of fact, this is something I've recently used to study the impact of Oracle*NET SDU, buffer size and the <code>latency</code> parameter on propagation performance. But that's another story...<br /><br />I strongly invite you to read the original post. It shows interesting things like "queue paused in flow control", "spilled messages" or how to loose buffered messages. Nevertheless, the previous example uses only one table. The one below exchanges messages between 2 queues in different databases. You'll see how to setup subscribers and schedule propagation to share your messages.<span class="fullpost"> To install it, assuming you have 2 databases 11.1 (or very likely 10.2), follow these steps:<ul><li><a href="http://wedostreams.blogspot.com/2009/08/oracle-aq-buffered-queues-101-part-2.html#bufferqueue01">Set Up the environnement to use SQL*Plus and to connect as SYSDBA</a></li><li><a href="http://wedostreams.blogspot.com/2009/08/oracle-aq-buffered-queues-101-part-2.html#bufferqueue02">Create a DEMO user and a User Defined Type in the 2 databases</a></li><li><a href="http://wedostreams.blogspot.com/2009/08/oracle-aq-buffered-queues-101-part-2.html#bufferqueue03">Create a database link between the source and the destination databases</a></li><li><a href="http://wedostreams.blogspot.com/2009/08/oracle-aq-buffered-queues-101-part-2.html#bufferqueue04">Create and start queues</a></li><li><a href="http://wedostreams.blogspot.com/2009/08/oracle-aq-buffered-queues-101-part-2.html#bufferqueue05">Create a subscribers on the source queue and schedule propagation to the destination queue</a></li><li><a href="http://wedostreams.blogspot.com/2009/08/oracle-aq-buffered-queues-101-part-2.html#bufferqueue06">Create an enqueue procedure in the source database</a></li><li><a href="http://wedostreams.blogspot.com/2009/08/oracle-aq-buffered-queues-101-part-2.html#bufferqueue07">Create a dequeue procedure in the destination database</a></li><li><a href="http://wedostreams.blogspot.com/2009/08/oracle-aq-buffered-queues-101-part-2.html#bufferqueue08">Enqueue a message on one end and dequeue it on the other end</a></li><li><a href="http://wedostreams.blogspot.com/2009/08/oracle-aq-buffered-queues-101-part-2.html#bufferqueue09">Clean Up the environment</a></li></ul>You can start right away...<h2><a name="bufferqueue01"> </a>Set Up the environnement to use SQL*Plus and to connect as SYSDBA</h2>To begin, you'll set the SQL*Plus variables that follow:<ul><li><strong>sourcegdb</strong> defines the global_name AND the network alias of the source database. We assume <code>global_names=true</code>, even if that's not strictly mandatory. It helps to make things more readable. Make sure you have the aliases setup everywhere and they match the database global names.</li><li><strong>destgdb</strong> defines the global_name AND the network alias of the destination database.</li><li><strong>source_user</strong> and <strong>dest_user</strong> define names of <code>SYSDBA</code> users on the source and on the destination databases.</li><li><strong>source_pwd</strong> and <strong>dest_pwd</strong> define the passwords of the corresponding <strong>source_user</strong> and <strong>dest_user</strong>.</li></ul><pre>accept sourcegdb default 'BLACK' -<br />prompt "Enter the Source Database Global Name [BLACK]: "<br /><br />accept destgdb default 'WHITE' -<br />prompt "Enter the Destination Database Global Name [WHITE]: "<br /><br />accept source_user default 'sys' -<br />prompt "Enter the Destination SYSDBA user [sys]: "<br /><br />accept source_pwd default 'change_on_install' -<br />prompt "Enter the Destination SYSDBA password [change_on_install]: "<br /><br />accept dest_user default 'sys' -<br />prompt "Enter the Destination SYSDBA user [sys]: "<br /><br />accept dest_pwd default 'change_on_install' -<br />prompt "Enter the Destination SYSDBA password [change_on_install]: "<br /></pre><h2><a name="bufferqueue02"> </a>Create a User DEMO and a User Defined Type in the 2 databases</h2>To go on with the example, you'll need to create the <code>DEMO</code> user in the 2 databases and create a type you'll use to send and receive messages. The script below creates those users and types.<blockquote>Note:<br />For this example, we assume there is no user named <code>DEMO</code>. We also assume the 2 tablespaces <code>USERS</code> and <code>TEMP</code> exist in the databases. There is no need for any of the database to run in ARCHIVELOG mode.</blockquote><br /><pre>connect &&source_user/&&source_pwd@&&sourcegdb as sysdba<br /><br />create user demo<br />identified by demo<br />default tablespace users<br />temporary tablespace temp;<br /><br />grant connect, resource, dba to demo;<br />grant execute on dbms_aq to demo;<br />grant execute on dbms_aqadm to demo;<br /><br />connect &&dest_user/&&dest_pwd@&&destgdb as sysdba<br /><br />create user demo<br />identified by demo<br />default tablespace users<br />temporary tablespace temp;<br /><br />grant connect, resource, dba to demo;<br />grant execute on dbms_aq to demo;<br />grant execute on dbms_aqadm to demo;<br /><br />connect demo/demo@&&sourcegdb<br /><br />create type mytype as object (<br /> id number<br /> , field1 varchar2(4000)<br /> , field2 varchar2(4000));<br />/<br /><br />connect demo/demo@&&destgdb<br /><br />create type mytype as object (<br /> id number<br /> , field1 varchar2(4000)<br /> , field2 varchar2(4000));<br />/</pre><h2><a name="bufferqueue03"> </a>Create a database link between the source and the destination databases</h2>To start the propagation job, you'll need the source database to connect to the destination database. Create and test a database link for that purpose:<pre>connect demo/demo@&&sourcegdb<br /><br />create database link &&destgdb<br /> connect to demo<br /> identified by demo<br /> using '&&destgdb';<br /><br />select * from dual@&&destgdb;</pre><h2><a name="bufferqueue04"> </a>Create and start queues</h2>Then, create and start the queues:<pre>connect demo/demo@&&destgdb<br /><br />begin<br />dbms_aqadm.create_queue_table(<br /> 'myqueue_table'<br /> , 'mytype'<br /> , multiple_consumers => true);<br />end;<br />/<br /><br />begin<br />dbms_aqadm.create_queue(<br /> 'myqueue'<br /> , 'myqueue_table');<br />end;<br />/<br /><br />begin<br />dbms_aqadm.start_queue('myqueue');<br />end;<br />/<br /><br />connect demo/demo@&&sourcegdb<br /><br />begin<br />dbms_aqadm.create_queue_table(<br /> 'myqueue_table'<br /> , 'mytype'<br /> , multiple_consumers => true);<br />end;<br />/<br /><br />begin<br />dbms_aqadm.create_queue(<br /> 'myqueue'<br /> , 'myqueue_table');<br />end;<br />/<br /><br />begin<br />dbms_aqadm.start_queue('myqueue');<br />end;<br />/</pre><h2><a name="bufferqueue05"> </a>Create a subscribers on the source queue and schedule propagation to the destination queue</h2>The next step consists in adding the subscribers that match the destination queue, to the source queue. In this example, we add 2 subscribers because we will eventually dequeue the messages from 2 separate programs (or for 2 distinct purposes). Once done, check the queues are compatibles and schedule the QUEUE to QUEUE propagation. <code>dba_queue_schedules</code> provides detailed informations about what is scheduled:<pre>connect demo/demo@&&sourcegdb<br /><br />begin<br />dbms_aqadm.add_subscriber(<br /> queue_name => 'myqueue'<br /> , subscriber => sys.aq$_agent('RED','demo.myqueue@&&destgdb',null)<br /> , queue_to_queue => true<br /> , delivery_mode => dbms_aqadm.buffered);<br />end;<br />/<br /><br />begin<br />dbms_aqadm.add_subscriber(<br /> queue_name => 'myqueue'<br /> , subscriber => sys.aq$_agent('BLUE','demo.myqueue@&&destgdb',null)<br /> , queue_to_queue => true<br /> , delivery_mode => dbms_aqadm.buffered);<br />end;<br />/<br /><br />set serveroutput on<br /><br />declare<br />rc binary_integer;<br />begin<br />dbms_aqadm.verify_queue_types(<br /> src_queue_name => 'myqueue'<br /> , dest_queue_name => 'demo.myqueue'<br /> , destination => '&&destgdb'<br /> , rc => rc);<br />dbms_output.put_line('If result is 1, it''s OKAY: '||rc);<br />end;<br />/<br /><br />begin<br />dbms_aqadm.schedule_propagation(<br /> queue_name => 'myqueue'<br /> , destination => '&&destgdb'<br /> , destination_queue => 'demo.myqueue');<br />end;<br />/<br /><br />set pages 1000<br />select schema<br /> , qname<br /> , destination<br /> , start_time<br /> , latency<br /> , schedule_disabled<br /> , session_id<br /> , total_number<br /> , failures<br /> , last_error_msg<br /> , message_delivery_mode<br />from dba_queue_schedules;</pre><h2><a name="bufferqueue06"> </a>Create an enqueue procedure in the source database</h2>Create an enqueue procedure demo_enqueue, that enqueues a message in the buffered part of the queue:<pre>connect demo/demo@&&sourcegdb<br /><br />create or replace procedure demo_enqueue(p_mytype mytype) is<br />enqueue_options DBMS_AQ.enqueue_options_t;<br />message_properties DBMS_AQ.message_properties_t;<br />recipients DBMS_AQ.aq$_recipient_list_t;<br />message_handle RAW(16);<br />begin<br />enqueue_options.visibility := dbms_aq.immediate;<br />enqueue_options.delivery_mode := dbms_aq.buffered;<br />dbms_aq.enqueue(<br /> queue_name => 'MYQUEUE',<br /> enqueue_options => enqueue_options,<br /> message_properties => message_properties,<br /> payload => p_mytype,<br /> msgid => message_handle);<br />commit;<br />end;<br />/</pre><h2><a name="bufferqueue07"> </a>Create a dequeue procedure in the destination database</h2><code>demo_dequeue</code> dequeues messages from the destination queue based on the consumer name:<pre>connect demo/demo@&&destgdb<br /><br />select * from aq$myqueue_table_S;<br /><br />set serveroutput on<br /><br />create or replace procedure demo_dequeue(p_consumer varchar2)<br />is<br />dequeue_options dbms_aq.dequeue_options_t;<br />message_properties dbms_aq.message_properties_t;<br />message_handle RAW(16);<br />v_mytype mytype;<br />no_messages exception;<br />pragma exception_init(no_messages, -25228);<br />begin<br />dequeue_options.wait := dbms_aq.no_wait;<br />dequeue_options.consumer_name := p_consumer;<br />dequeue_options.navigation := dbms_aq.first_message;<br />dequeue_options.visibility := dbms_aq.immediate;<br />dequeue_options.delivery_mode := dbms_aq.buffered;<br />loop<br />begin<br />dbms_aq.dequeue(<br /> queue_name => 'myqueue',<br /> dequeue_options => dequeue_options,<br /> message_properties => message_properties,<br /> payload => v_mytype,<br /> msgid => message_handle);<br />dbms_output.put_line('---------------------------------------------------------');<br />dbms_output.put_line('Message for Consumer "'||p_consumer||'": ');<br />dbms_output.put_line('ID :'||to_char(v_mytype.id));<br />dbms_output.put_line('FIELD1:'||v_mytype.field1);<br />dbms_output.put_line('FIELD2:'||v_mytype.field2);<br />dbms_output.put_line('---------------------------------------------------------');<br />dequeue_options.navigation := dbms_aq.next_message;<br />end;<br />end loop;<br />exception<br />when no_messages then<br /> dbms_output.put_line('No more messages');<br /> commit;<br />end;<br />/</pre><h2><a name="bufferqueue08"> </a>Enqueue a message on one end and dequeue it on the other end</h2>You are ready to test your case. Enqueue a message and check you get the messages for the 2 subscribers:<pre>connect demo/demo@&&sourcegdb<br />set serveroutput on<br />declare<br />v_mytype mytype;<br />begin<br />v_mytype := mytype(1, 'BLUE AND RED','Red And Blue');<br />demo_enqueue(v_mytype);<br />end;<br />/<br /><br />select * from aq$myqueue_table;<br /><br /><br />connect demo/demo@&&destgdb<br /><br />select * from aq$myqueue_table;<br /><br />set serveroutput on<br />exec demo_dequeue('BLUE')<br />exec demo_dequeue('RED')</pre><blockquote>Note:<br /><code>aq$myqueue_table</code> help to monitor the messages. The propagation has been set without any time means that the messages are always sent from the source to the destination; the <code>latency</code> (default to 60 in that case), is the only thing that can slightly impact the time needed for the message to be available for the consumers</blockquote><h2><a name="bufferqueue09"> </a>Clean Up the environment</h2>You are done! Before you leave, suppress the AQ propagation schedule, the queues and the <code>DEMO</code> users:<pre>connect &&source_user/&&source_pwd@&&sourcegdb as sysdba<br /><br />begin<br />dbms_aqadm.UNSCHEDULE_PROPAGATION(<br /> queue_name => 'demo.myqueue'<br /> , destination => '&&destgdb'<br /> , destination_queue => 'DEMO.MYQUEUE');<br />end;<br />/<br /><br />exec dbms_aqadm.drop_queue_table('demo.myqueue_table',TRUE)<br /><br />drop user demo cascade;<br /><br />select * from dba_queue_schedules;<br /><br />connect &&dest_user/&&dest_pwd@&&destgdb as sysdba<br /><br />exec dbms_aqadm.drop_queue_table('demo.myqueue_table',TRUE)<br /><br />drop user demo cascade;</pre></span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com4tag:blogger.com,1999:blog-1041798052670608448.post-86309343903313250672009-08-24T23:49:00.000-07:002009-08-25T00:23:26.641-07:00Googling The Oracle DictionaryThis doesn't sound to have much to do with Oracle Streams, but it has helped me to speed up my understanding of a few internals and to quickly get to a better reading of traces. "It" is actually a procedure. I've named it <code>findString</code>. It detects patterns stored in Oracle tables. It works with <code>VARCHAR2</code> only but you can easily make it work with <code>NUMBER</code> or whatever you need... Obviously, don't to use it on anything that is not a small TEST database or with non-selective patterns.<br /><br />Remember, it's just a tool, it won't explain anything about how things work. Keep also in mind that some data can be hold in sessions and that this procedure won't find them because it doesn't read temporary data. Enough talk, here is the code:<span class="fullpost"><pre>create or replace procedure findString(pattern varchar2)<br />is<br /> cnt number:=0;<br />begin<br /> for i in (select c.owner, c.table_name, c.column_name<br /> from DBA_TAB_COLS c, dba_tables t<br /> where c.data_type in ('VARCHAR2','NVARCHAR2')<br /> and c.owner=t.owner<br /> and c.table_name=t.table_name) loop<br /> begin<br /> execute immediate 'select count(*) from "'<br /> ||i.owner||'"."'||i.table_name||<br /> '" where "'||I.column_name||'" like :x'<br /> into cnt using pattern;<br /> exception when others then<br /> dbms_output.put_line('Error in in "'||i.owner||<br /> '"."'||i.table_name||'" column "'||<br /> i.column_name||'"');<br /> dbms_output.put_line(DBMS_UTILITY.<br /> FORMAT_ERROR_STACK);<br /> end;<br /> if (cnt>0) then<br /> dbms_output.put_line('Pattern Detected in "'||<br /> i.owner||'"."'||i.table_name||<br /> '" column "'||i.column_name||'"');<br /> end if;<br /> end loop;<br />end;<br />/</pre>To use it with SQL*Plus, set <code>serveroutput</code> to <code>on</code> and pass your search pattern in the parameter, like this:<pre>set serveroutput on<br />exec findString('%ONEWAY%TABLE%')<br /><br /><span style="color: rgb(204, 204, 204);">Pattern Detected in "SYSTEM"."LOGMNRC_GTLO" column "LVL0NAME"<br />Pattern Detected in "SYSTEM"."LOGMNR_OBJ$" column "NAME"<br />Pattern Detected in "SYS"."OBJ$" column "NAME"<br />Pattern Detected in "SYS"."HISTGRM$" column "EPVALUE"<br />Pattern Detected in "SYS"."STREAMS$_RULES" column "OBJECT_NAME"<br />Pattern Detected in "SYS"."STREAMS$_RULES" column "RULE_CONDITION"<br />Pattern Detected in "SYS"."STREAMS$_RULES" column "RULE_NAME"<br />Pattern Detected in "SYS"."WRH$_SEG_STAT_OBJ" column "BASE_OBJECT_NAME"<br />Pattern Detected in "SYS"."WRH$_SEG_STAT_OBJ" column "OBJECT_NAME"</span></pre></span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com0tag:blogger.com,1999:blog-1041798052670608448.post-4696063084594844722009-08-24T01:32:00.000-07:002009-08-24T06:09:20.254-07:00Propagation Error And Exception Queue ManagementIf you've tried to use the example from my previous blog post named "<a href="http://wedostreams.blogspot.com/2009/08/transfer-files-from-one-server-to.html">Transfer Files From One Server To Another With Oracle Streams</a>", you may have faced some propagation issues. <a href="http://wedostreams.blogspot.com/2009/08/transfer-files-from-one-server-to.html#TransferFile07">Step 7 of the post</a> helps to quickly work around any issue by stopping and restarting the propagation. However, this is just a work around and in a real case scenario, you would want to manage the message in the exception queue.<br /><br />This post goes deeper into the right way to fix any propagation issue; to begin, it simulates a propagation error, just in case you did not hit any by yourself ;-). The next section shows how you could fix the situation and manage the exception message by your own.<span class="fullpost"><br /><h2>1. Simulate a Propagation Error</h2>Like with <code>DBMS_FILE_TRANSFER</code>, external files transported with Oracle Streams must be multiple of 512 bytes in size. We'll use that limit to make a propagation process fail. To set up the example, follow "Step 1" to "Step 5" of the previous post "<a href="http://wedostreams.blogspot.com/2009/08/transfer-files-from-one-server-to.html">Transfer Files From One Server To Another With Oracle Streams</a>". Once done, instead of executing "Step 6", execute the step below that will make the propagation fail. <br />On <code>BLACK</code>, create a file that is <strong>NOT</strong> a multiple of 512 bytes in size, instantiate a <code>BFILE</code> that points to it and enqueue the message with that points to the file in the Streams queue:<pre>connect strmadmin/strmadmin<br /><br />select * from GLOBAL_NAME;<br /><br />GLOBAL_NAME<br />----------<br />BLACK<br /><br />!dd if=/dev/zero of=/tmp/black/demo2.zero bs=<strong>511</strong> count=1<br /><br />declare<br />v_demotyp demotyp:=demotyp(2, bfilename('tmp_dir','demo2.zero'));<br />v_any anydata;<br />begin<br />v_any:=anydata.convertobject(v_demotyp);<br />dbms_streams_messaging.enqueue(<br /> 'strmadmin.streams_queue',<br /> v_any);<br />end;<br />/<br />commit;</pre>This time the propagation fails with a message like the one below:<pre>col error_message format a100<br />set lines 100<br />set long 10000<br />set longchunksize 10000<br /><br />select error_message<br />from dba_propagation<br />where propagation_name='BFILE_PROPAGATION';<br /><br />ERROR_MESSAGE<br />-------------------------------------------------------<br />ORA-19505: failed to identify file "/tmp/black/demo2.zero"<br />ORA-27046: file size is not a multiple of logical block size<br />Additional information: 1</pre>You'll find the same error message in the queue propagation schedule:<pre>select last_error_msg<br />from dba_queue_schedules<br />where schema='STRMADMIN'<br /> and qname='STREAMS_QUEUE'<br /> and message_delivery_mode='PERSISTENT';<br /><br />LAST_ERROR_MSG<br />-------------------------------------------------------<br />ORA-19505: failed to identify file "/tmp/black/demo2.zero"<br />ORA-27046: file size is not a multiple of logical block size<br />Additional information: 1 </pre><h2>2. Fix the Error</h2>Fixing the error depends on what it is. Let's say in this case, you'll fix the error by padding the file with a "0x00" byte; You can do it with a <code>dd</code> and the <code>seek</code> parameter:<blockquote></blockquote><blockquote><b>Important note:</b><br />Don't use such a command on a real file without evaluating first what the impact would be on the file itself!</blockquote><pre>!dd if=/dev/zero of=/tmp/black/demo2.zero count=1 bs=1 seek=511 <br /><br />!ls -ltra /tmp/black/demo2.zero<br />-rw-r--r-- 1 oracle oinstall 512 2009-08-24 14:26 /tmp/black/demo2.zero</pre>That done, dequeue the message from the exception queue and re-enqueue it in the Streams queue; in that case that's probably a good idea to have only one queue per queue table to know the original queue of the message in the exception queue. Here is how to do it; if you want more examples, refer to Metalink Note "233103.1 - Dequeuing Messages from an Exception Queue":<pre>connect strmadmin/strmadmin<br /><br />begin<br />dbms_aqadm.start_queue(<br /> queue_name => 'strmadmin.AQ$_STREAMS_QUEUE_TABLE_E',<br /> enqueue => false,<br /> dequeue => true);<br />end;<br />/<br /><br />SET SERVEROUTPUT ON<br /><br />DECLARE<br /> v_deq_options DBMS_AQ.dequeue_options_t;<br /> v_msg_properties DBMS_AQ.message_properties_t;<br /> o_msgid RAW(16);<br /> v_any anydata;<br />BEGIN<br /> v_deq_options.consumer_name:=null;<br /> v_deq_options.dequeue_mode := DBMS_AQ.remove;<br /> v_deq_options.navigation := DBMS_AQ.NEXT_TRANSACTION;<br /> DBMS_AQ.dequeue(<br /> queue_name => 'strmadmin.AQ$_STREAMS_QUEUE_TABLE_E',<br /> dequeue_options => v_deq_options,<br /> message_properties => v_msg_properties,<br /> payload => v_any,<br /> msgid => o_msgid);<br /> dbms_streams_messaging.enqueue('strmadmin.streams_queue',v_any);<br /> COMMIT;<br />END;<br />/<br /><br />begin<br />dbms_aqadm.stop_queue(<br /> queue_name => 'strmadmin.AQ$_STREAMS_QUEUE_TABLE_E' ,<br /> enqueue => true,<br /> dequeue => true);<br />end;<br />/</pre>Once done, you'll see the error go away from the propagation and the schedule. The file, now of a correct size, will be transferred from <code>BLACK</code> to <code>WHITE</code>:<pre>select error_message<br /> from dba_propagation<br />where propagation_name='BFILE_PROPAGATION';<br /><br />ERROR_MESSAGE<br />-------------------------------------------------------<br /><br />select last_error_msg<br /> from dba_queue_schedules<br />where schema='STRMADMIN'<br /> and qname='STREAMS_QUEUE'<br /> and message_delivery_mode='PERSISTENT';<br /><br />LAST_ERROR_MSG<br />-------------------------------------------------------</pre>Check the file has been received on the <code>WHITE</code> side:<pre>!ls -ltra /tmp/white/demo2.zero<br />-rw-r----- 1 oracle oinstall 512 2009-08-24 14:33 /tmp/white/demo2.zero</pre>This is it: a very simple way to fix propagation errors. Once again, clean up the environment before you leave it for your next demo; delete the <code>demo2.zero</code> file on both sides of your configuration and execute "<a href="http://wedostreams.blogspot.com/2009/08/transfer-files-from-one-server-to.html#TransferFile08">Step 8 of the previous post</a>" to delete queues, tablespaces, administrators and more...</span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com1tag:blogger.com,1999:blog-1041798052670608448.post-55447850449826473822009-08-23T10:55:00.000-07:002009-08-24T00:14:55.539-07:00Transfer Files From One Server To Another With Oracle StreamsYou can leverage Oracle Streams to transfer files stored on a file system from one server to another. Not only, those files can be images or documents, assuming they are multiple of 512 bytes, but they also can be Oracle files, like Datapump exports, backups or data files. As a matter of fact, to me, sharing files across servers is probably the #1 reason why you would want to use Streams user messages.<br /><br />In this post, you'll find an easy to reproduce sample application. You'll create a User Define Type (UDT) with a BFILE (i.e. a file located outside the database on a file system or in ASM). Once done, you'll create an instance of that type, you'll wrap it in an ANYDATA type and will share it between databases with Streams. You'll figure out that, not only the message with a BFILE locator, but also the corresponding file will be transferred from the source database to the destination database.<span class="fullpost"><br /><br />This post is made of the following sections:<ul><li><a href="http://wedostreams.blogspot.com/2009/08/transfer-files-from-one-server-to.html#TransferFile01">Step 1: Configure Databases, Streams Administrators and Directories</a></li><li><a href="http://wedostreams.blogspot.com/2009/08/transfer-files-from-one-server-to.html#TransferFile02">Step 2: Configure Directories and the User Defined Type</a></li><li><a href="http://wedostreams.blogspot.com/2009/08/transfer-files-from-one-server-to.html#TransferFile03">Step 3: Create the Database Link and the Propagation Between the Queues</a></li><li><a href="http://wedostreams.blogspot.com/2009/08/transfer-files-from-one-server-to.html#TransferFile04">Step 4: Create A Streams Message Consumer and a Dequeue Program</a></li><li><a href="http://wedostreams.blogspot.com/2009/08/transfer-files-from-one-server-to.html#TransferFile05">Step 5: Test The Streams configuration is working</a></li><li><a href="http://wedostreams.blogspot.com/2009/08/transfer-files-from-one-server-to.html#TransferFile06">Step 6: Create a File and Transfer It with Streams</a></li><li><a href="http://wedostreams.blogspot.com/2009/08/transfer-files-from-one-server-to.html#TransferFile07">Step 7: Troubleshooting</a></li><li><a href="http://wedostreams.blogspot.com/2009/08/transfer-files-from-one-server-to.html#TransferFile08">Step 8: Clean-up the environment</a></li></ul><h2><a name="TransferFile01">Step 1:</a> Configure Databases and Streams Administrators</h2>If you read this blog or the documentation, you've seen it dozens of time already! In order for Streams to work, you must setup a Streams Administrator, a queue and, though it's not mandatory, it's strongly recommended to set the <code>global_names</code> parameter of the two databases to <code>TRUE</code>. In this sample application, you'll assume we have 2 databases called <code>BLACK</code>, the source, and <code>WHITE</code>, the destination. You'll find below a table with the commands to run on the 2 databases to setup the Streams Administrator and the parameters:<br /><table width="100%"><tbody><tr><th style="border: 1px solid rgb(0, 0, 0);">On BLACK</th><th style="border: 1px solid rgb(0, 0, 0);">On WHITE</th></tr><tr><td style="border: 1px solid rgb(0, 0, 0);"><pre>select * from global_name;<br /><br />GLOBAL_NAME<br />-----------<br />BLACK<br /><br />alter system set global_names=true;</pre></td><td style="border: 1px solid rgb(0, 0, 0);"><pre>select * from global_name;<br /><br />GLOBAL_NAME<br />-----------<br />WHITE<br /><br />alter system set global_names=true;</pre></td></tr><tr><td style="border: 1px solid rgb(0, 0, 0);"><pre>connect / as sysdba<br /><br />CREATE TABLESPACE streams_tbs DATAFILE<br />'/u01/app/oracle/oradata/BLACK/streams_tbs.dbf'<br /> SIZE 25M AUTOEXTEND ON MAXSIZE 256M;<br /><br />CREATE USER strmadmin IDENTIFIED BY strmadmin<br /> DEFAULT TABLESPACE streams_tbs<br /> QUOTA UNLIMITED ON streams_tbs;<br /><br />grant dba to strmadmin;<br /><br />BEGIN<br />DBMS_STREAMS_ADM.SET_UP_QUEUE(<br />queue_table => 'strmadmin.streams_queue_table',<br />queue_name => 'strmadmin.streams_queue');<br />END;<br />/<br /><br />begin<br />dbms_streams_auth.grant_admin_privilege(<br /> grantee => 'strmadmin',<br /> grant_privileges => true);<br />end;<br />/<br /></pre></td><td style="border: 1px solid rgb(0, 0, 0);"><pre>connect / as sysdba<br /><br />CREATE TABLESPACE streams_tbs DATAFILE<br />'/u01/app/oracle/oradata/WHITE/streams_tbs.dbf'<br /> SIZE 25M AUTOEXTEND ON MAXSIZE 256M;<br /><br />CREATE USER strmadmin IDENTIFIED BY strmadmin<br /> DEFAULT TABLESPACE streams_tbs<br /> QUOTA UNLIMITED ON streams_tbs;<br /><br />grant dba to strmadmin;<br /><br />BEGIN<br />DBMS_STREAMS_ADM.SET_UP_QUEUE(<br />queue_table => 'strmadmin.streams_queue_table',<br />queue_name => 'strmadmin.streams_queue');<br />END;<br />/<br /><br />begin<br />dbms_streams_auth.grant_admin_privilege(<br /> grantee => 'strmadmin',<br /> grant_privileges => true);<br />end;<br />/</pre></td></tr></tbody></table><blockquote>Note:<br />Because we don't use any Streams capture process, those databases don't have to be in archivelog mode.</blockquote><h2><a name="TransferFile02">Step 2:</a> Configure Directories and the User Defined Type</h2>Once the Streams Administrator created, you'll create a directory with the same name on the 2 databases as well as a User Defined Type. To allow the instances associated with the 2 databases to run on the same server, the directory path for <code>BLACK</code> differs from the directory path for <code>WHITE</code>:<table width="100%"><tbody><tr><th style="border: 1px solid rgb(0, 0, 0);">On BLACK</th><th style="border: 1px solid rgb(0, 0, 0);">On WHITE</th></tr><tr><td style="border: 1px solid rgb(0, 0, 0);"><pre>!mkdir -p /tmp/black<br /><br />create directory tmp_dir<br /> as '/tmp/black';<br /><br />grant read, write, execute<br /> on directory tmp_dir<br /> to strmadmin;</pre></td><td style="border: 1px solid rgb(0, 0, 0);"><pre>!mkdir -p /tmp/white<br /><br />create directory tmp_dir<br /> as '/tmp/white';<br /><br />grant read, write, execute<br /> on directory tmp_dir<br /> to strmadmin;</pre></td></tr><tr><td style="border: 1px solid rgb(0, 0, 0);"><pre>connect strmadmin/strmadmin<br /><br />create type demotyp as object<br /> (id number,<br /> myfile bfile);<br />/</pre></td><td style="border: 1px solid rgb(0, 0, 0);"><pre>connect strmadmin/strmadmin<br /><br />create type demotyp as object<br /> (id number,<br /> myfile bfile);<br />/</pre></td></tr></tbody></table><h2><a name="TransferFile03">Step 3:</a> Create the Database Link and the Propagation Between the Queues</h2>Make sure you can add an alias <code>WHITE</code> to connect to the <code>WHITE</code> database in the Oracle*Net configuration of the <code>BLACK</code> database (e.g. in the <code>tnsnames.ora</code> file). Once done, you can create a database link from <code>BLACK</code> to <code>WHITE</code> and create a Propagation process to send messages from the source to the destination database. Connect to the <code>BLACK</code> and run the script below:<br /><pre>connect strmadmin/strmadmin<br /><br />select * from GLOBAL_NAME;<br /><br />GLOBAL_NAME<br />----------<br />BLACK<br /><br />CREATE DATABASE LINK WHITE<br /> connect to STRMADMIN<br /> identified by strmadmin<br /> using 'WHITE';<br /><br />BEGIN<br /> DBMS_PROPAGATION_ADM.CREATE_PROPAGATION(<br /> propagation_name => 'bfile_propagation',<br /> source_queue => 'strmadmin.streams_queue',<br /> destination_queue => 'strmadmin.streams_queue',<br /> destination_dblink => 'white',<br /> rule_set_name => null,<br /> queue_to_queue => TRUE);<br />END;<br />/</pre><h2><a name="TransferFile04">Step 4:</a> Create A Streams Message Consumer And A Dequeue Program</h2>You can now configure the <code>WHITE</code> database so that a program of yours dequeues messages from its Streams queue. The script below creates a Streams DEQUEUE client and execute an anonymous PL/SQL block that receives and removes any messages (if <code>ID>0</code>):<br /><pre>connect strmadmin/strmadmin<br /><br />begin<br /> DBMS_STREAMS_ADM.ADD_MESSAGE_RULE (<br /> message_type => 'strmadmin.demotyp',<br /> rule_condition => ':MSG.ID > 0',<br /> streams_type => 'DEQUEUE',<br /> streams_name => 'demostrm',<br /> queue_name => 'strmadmin.streams_queue');<br />end;<br />/<br /><br />select streams_name,<br /> queue_owner||'.'||queue_name queue<br /> from DBA_STREAMS_MESSAGE_CONSUMERS;<br /><br />set serveroutput on<br />declare<br /> v_demotyp demotyp;<br /> v_any anydata;<br /> v_out pls_integer;<br />begin<br /> dbms_streams_messaging.dequeue(<br /> queue_name => 'strmadmin.streams_queue',<br /> streams_name => 'demostrm',<br /> payload => v_any,<br /> dequeue_mode => 'REMOVE',<br /> navigation => 'NEXT TRANSACTION',<br /> wait => DBMS_STREAMS_MESSAGING.FOREVER);<br /> v_out := v_any.getobject(v_demotyp);<br /> dbms_output.put_line('Message Received: '||<br /> to_char(v_demotyp.id));<br /> commit;<br />end;<br />/</pre><h2><a name="TransferFile05">Step 5:</a> Test The Streams Configuration</h2>You can now test your settings. In order to do it, enqueue a message on the <code>BLACK</code> side. For this first test, don't "attach" any file to the message; leave <code>myfile</code> to <code>NULL</code>:<pre>connect strmadmin/strmadmin<br /><br />declare<br /> v_demotyp demotyp:=demotyp(1, null);<br /> v_any anydata;<br />begin<br /> v_any:=anydata.convertobject(v_demotyp);<br /> dbms_streams_messaging.enqueue(<br /> 'strmadmin.streams_queue',<br /> v_any);<br />end;<br />/<br />commit;</pre>The program running on the <code>WHITE</code> should display the following line and stop:<br /><pre>Message Received: 1</pre><h2><a name="TransferFile06">Step 6:</a> Create a File and Transfer It with Streams</h2>You'll perform the same test, but this time with a file attached. Restart the program on <code>WHITE</code>:<pre>select * from GLOBAL_NAME; <br /><br />GLOBAL_NAME<br />----------<br />WHITE <br /><br />set serveroutput on<br /><br />declare<br /> v_demotyp demotyp;<br /> v_any anydata;<br /> v_out pls_integer;<br />begin<br /> dbms_streams_messaging.dequeue(<br /> queue_name => 'strmadmin.streams_queue',<br /> streams_name => 'demostrm',<br /> payload => v_any,<br /> dequeue_mode => 'REMOVE',<br /> navigation => 'NEXT TRANSACTION',<br /> wait => DBMS_STREAMS_MESSAGING.FOREVER);<br /> v_out := v_any.getobject(v_demotyp);<br /> dbms_output.put_line('Message Received: '||<br /> to_char(v_demotyp.id));<br /> commit;<br />end;<br />/</pre>Create a file that is a multiple of 512 bytes in size on the <code>BLACK</code> side, allocate a <code>BFILE</code> that points to it and send it to <code>WHITE</code> with Streams:<pre>connect strmadmin/strmadmin<br /><br />select * from GLOBAL_NAME; <br /><br />GLOBAL_NAME<br />----------<br />BLACK<br /><br />!dd if=/dev/zero of=/tmp/black/demo.zero bs=512 count=1 <br /><br />declare<br /> v_demotyp demotyp:=demotyp(2, bfilename('tmp_dir','demo.zero')); <br /> v_any anydata;<br />begin <br /> v_any:=anydata.convertobject(v_demotyp); <br /> dbms_streams_messaging.enqueue(<br /> 'strmadmin.streams_queue',<br /> v_any);<br />end;<br />/<br />commit;</pre>The program you've restarted on <code>WHITE</code> should display something like:<pre>Message Received: 2</pre>And the file should now be present on the <code>WHITE</code> side too:<pre>!ls -l /tmp/white<br />total 4<br />-rw-r----- 1 oracle oinstall 512 2009-08-24 00:25 demo.zero</pre><h2><a name="TransferFile07">Step 7:</a> Troubleshooting</h2>Troubleshooting this sample is beyond the scope of the post. However if the configuration is not working, that's very likely it's related to the propagation of the file (the file size, some privileges on the <code>/tmp/white</code> directory, etc). You can query DBA_PROPAGATION and the queue tables to know more about the status of your message. If there is any error at the propagation level, fix it and you restart the propagation:<pre>exec dbms_propagation_adm.stop_propagation('bfile_propagation',true);<br />exec dbms_propagation_adm.start_propagation('bfile_propagation');</pre><h2><a name="TransferFile08">Step 8:</a> Clean-up the environment</h2>As always, leave the database as it was when you've first started. The scripts below removes all the components:<table width="100%"><tbody><tr><th style="border: 1px solid rgb(0, 0, 0);">On BLACK</th><th style="border: 1px solid rgb(0, 0, 0);">On WHITE</th></tr><tr><td style="border: 1px solid rgb(0, 0, 0);"><pre>connect / as sysdba<br /><br />!rm /tmp/black/demo.zero<br /><br />begin<br />dbms_propagation_adm.stop_propagation(<br /> 'bfile_propagation',true);<br />dbms_propagation_adm.drop_propagation(<br /> 'bfile_propagation',true);<br />end;<br />/<br /><br />drop directory tmp_dir;<br /><br />begin<br />DBMS_STREAMS_ADM.REMOVE_QUEUE(<br /> queue_name => 'strmadmin.streams_queue',<br /> cascade => true,<br /> drop_unused_queue_table=> true);<br />end;<br />/<br /><br />drop user strmadmin cascade;<br />drop tablespace streams_tbs<br /> including contents and datafiles;</pre></td><td style="border: 1px solid rgb(0, 0, 0);"><pre>connect / as sysdba<br /><br />!rm /tmp/white/demo.zero<br /><br />col streams_name format a15<br />col queue format a40<br />select streams_name,<br /> queue_owner||'.'||queue_name queue<br /> from DBA_STREAMS_MESSAGE_CONSUMERS;<br /><br />begin<br />for i in (select rule_owner, rule_name<br /> from DBA_STREAMS_RULES<br /> where STREAMS_TYPE='DEQUEUE'<br /> and STREAMS_NAME='DEMOSTRM')<br /> loop<br /> dbms_streams_adm.remove_rule(<br /> rule_name => i.rule_owner||'.'||i.rule_name,<br /> streams_type => 'DEQUEUE',<br /> streams_name => 'DEMOSTRM',<br /> drop_unused_rule => true);<br /> end loop;<br />end;<br />/<br /><br />col streams_name format a15<br />col queue format a40<br />select streams_name,<br /> queue_owner||'.'||queue_name queue<br />from DBA_STREAMS_MESSAGE_CONSUMERS;<br /><br />drop directory tmp_dir;<br />drop user strmadmin cascade;<br />drop tablespace streams_tbs<br /> including contents and datafiles;</pre></td></tr></tbody><tbody></tbody></table><br />In the next post, we'll get a bit deeper in the troubleshooting part; we'll generate an error with the propagation and we'll see how to fix it. Stay tuned!</span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com0tag:blogger.com,1999:blog-1041798052670608448.post-34394169373218284622009-04-25T03:21:00.000-07:002009-04-25T03:22:21.151-07:00User-Defined DDL LCRsIn "<a href="http://wedostreams.blogspot.com/2009/04/user-defined-dml-lcrs.html">User-Defined DML LCRs</a>", I've shown how to use <code>lcr$_row_record.construct</code> to build LCRs for <code>DELETE</code>, <code>UPDATE</code> and <code>INSERT</code>; In the same post, I've created an apply process to execute those LCRs. I wanted to add an example of a DDL LCR at the same time, but <a href="http://wedostreams.blogspot.com/2009/04/user-defined-dml-lcrs.html?showComment=1240645380000#c8906115355666788808">because of an error with my 11.1.0.7 <code>AL32UTF8</code> database</a>, I have not been able to.<br /><br />Not a big deal! I've created a <code>WE8ISO8859P15</code> database and I'll correct that miss fire right away. You'll find below a sample configuration that shows how to use <code>lcr$_ddl_record.construct</code>.<span class="fullpost"><br /><h3>Streams Queue and Apply</h3>We'll use the same schema for the queue and for the apply process and to apply the DDL. To speed up the setup, I've granted DBA to the schema owner; There are 3 points worth to mention:<br /><ul><li>I've set <code>apply_capture </code>parameter to <code>false </code> to make sure the apply process dequeues persistent messages</li><li>I did not add any rule set to the apply so that it dequeues every message in the queue</li><li>There is no need to define an instantiation SCN or to setup a SCN in the LCR (In the case of user-defined LCRs)</li></ul> <pre>begin<br /> dbms_streams_adm.set_up_queue(<br /> queue_table => 'custom_queue_table',<br /> queue_name => 'custom_queue');<br />end;<br />/<pre>declare<br /> v_name varchar2(256);<br />begin<br /> select value into v_name<br /> from v$parameter<br /> where name='db_unique_name';<br />dbms_apply_adm.create_apply(<br /> queue_name => 'custom_queue',<br /> apply_name => 'custom_apply',<br /> apply_captured => false,<br /> source_database => v_name );<br />end;<br />/<br /><br />exec dbms_apply_adm.start_apply('CUSTOM_APPLY');<br /></pre></pre><h3>Create and Enqueue A DDL LCR</h3>Once everything ready you can build the LCRs and enqueue them in the queue so that the <code>CREATE TABLE</code> can be consumed and executed by the apply processes:<pre>declare<br /> v_name varchar2(256);<br /> v_any anydata;<br /> lcr sys.lcr$_ddl_record;<br /> rc pls_integer;<br /> enqopt DBMS_AQ.ENQUEUE_OPTIONS_T;<br /> mprop DBMS_AQ.MESSAGE_PROPERTIES_T;<br /> enq_msgid RAW(16);<br />begin<br /> -- Get DB Name and SCN<br /> select value into v_name<br /> from v$parameter<br /> where name='db_unique_name';<br /><br /> mprop.SENDER_ID := SYS.AQ$_AGENT(user, null, null);<br /><br /> lcr:=sys.lcr$_ddl_record.construct(<br /> source_database_name => v_name,<br /> command_type => 'CREATE TABLE',<br /> object_owner => user,<br /> object_name => 'MYTABLE',<br /> object_type => 'TABLE',<br /> ddl_text =><br /> 'create table MYTABLE(id number primary key)',<br /> logon_user => user,<br /> current_schema => user,<br /> base_table_owner => user,<br /> base_table_name => 'MYTABLE',<br /> tag => null,<br /> transaction_id => null,<br /> scn => null);<br /><br /> DBMS_AQ.ENQUEUE(<br /> queue_name => 'custom_queue',<br /> enqueue_options => enqopt,<br /> message_properties => mprop,<br /> payload => anydata.ConvertObject(lcr),<br /> msgid => enq_msgid);<br />end;<br />/<br /><br />commit;<br /><br />desc mytable<br /><br />Name Null? Type<br />---- -------- ------<br />ID NOT NULL NUMBER<br /><br />declare<br /> v_name varchar2(256);<br /> v_any anydata;<br /> lcr sys.lcr$_ddl_record;<br /> rc pls_integer;<br /> enqopt DBMS_AQ.ENQUEUE_OPTIONS_T;<br /> mprop DBMS_AQ.MESSAGE_PROPERTIES_T;<br /> enq_msgid RAW(16);<br />begin<br /> -- Get DB Name and SCN<br /> select value into v_name<br /> from v$parameter<br /> where name='db_unique_name';<br /><br /> mprop.SENDER_ID := SYS.AQ$_AGENT(user, null, null);<br /><br /> lcr:=sys.lcr$_ddl_record.construct(<br /> source_database_name => v_name,<br /> command_type => 'DROP TABLE',<br /> object_owner => user,<br /> object_name => 'MYTABLE',<br /> object_type => 'TABLE',<br /> ddl_text =><br /> 'drop table MYTABLE purge',<br /> logon_user => user,<br /> current_schema => user,<br /> base_table_owner => user,<br /> base_table_name => 'MYTABLE',<br /> tag => null,<br /> transaction_id => null,<br /> scn => null);<br /> DBMS_AQ.ENQUEUE(<br /> queue_name => 'custom_queue',<br /> enqueue_options => enqopt,<br /> message_properties => mprop,<br /> payload => anydata.ConvertObject(lcr),<br /> msgid => enq_msgid);<br />end;<br />/<br /><br />commit;<br /><br />desc mytable<br /><br />ERROR:<br />ORA-04043: object mytable does not exist<br /></pre><h3>Conclusion</h3>This is it. With a <code>WE8ISO8859P15</code> database, it works both with 10g and 11g and it should even work with Oracle Standard Edition... At least in theory!</span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com0tag:blogger.com,1999:blog-1041798052670608448.post-83197887782489014352009-04-21T12:02:00.000-07:002009-04-21T12:05:21.470-07:00Oracle Streams Can Do Things, Mortals Can't! The Sequel.In my post "<a href="http://wedostreams.blogspot.com/2009/04/oracle-streams-can-do-things-mortals.html">Oracle Streams Can Do Things, Mortals Can't!</a>", I've shown one of the challenges of applying changes, row by row, like Streams does. It can be the case when one of the impacted columns holds a unique, a primary key or a referential integrity constraint and you change several rows as part of the same update. I've also shown that Streams can actually handle those cases while you would have to <span style="font-style: italic;">DEFER</span> the validation of those constraints to the commit time to perform the same operations manually.<br /><br />I've written another post that has just been published <a href="http://www.pythian.com/news/author/arkzoyd">with my other articles on The Pythian Group Blog</a>. It's entitled "<a href="http://www.pythian.com/news/2057/oracle-delete-and-re-insert-row-in-the-same-statement">Delete and Re-Insert a Row in the Same Statement</a>" and it's actually the sequel of my previous post. It shows how you can leverage Streams ability to run several SQL statements as if they were running as part of the same statement. You could use it to move rows... Even if your table doesn't have <code>ROW MOVEMENT ENABLE</code> and the constraints are not <code>DEFERRABLE</code>.Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com0tag:blogger.com,1999:blog-1041798052670608448.post-81366491205882385112009-04-21T09:58:00.000-07:002009-04-21T09:59:31.647-07:00User-Defined DML LCRsOne of the great features that comes with Oracle Streams is the ability you have to use the APIs to extend the framework and build almost anything you have in mind. Streams provides an easy way to share data across databases and with applications. In this post, I'll be providing an example of PL/SQL code that publishes DML LCRs in a queue; An Apply Handler will execute the content of those LCRs.<span class="fullpost"><br /><h3>Test Schema</h3>For the purpose of this demonstration, you can build one table <code>T1</code> in the schema of your choice. Below is the script that creates that table and displays its content:<br /><pre>create table T1<br /> (id number,<br /> text varchar2(10),<br /> constraint T1_PK primary key(id));<br /><br />insert into T1 values (1, 'Text 1');<br /><br />commit;<br /><br />col id format 99<br />col text format a6<br /><br />select id, text<br /> from T1;<br /><br />ID TEXT<br />-- ------<br />1 Text 1</pre><span style="font-weight: bold; font-style: italic;"></span><span style="font-weight: bold; font-style: italic;"></span><h3>Streams Queue and Apply </h3>I use the same schema for the table, the queue and the apply process. To speed up the setup, I've granted DBA to the schema owner; There are 3 points worth to mention:<br /><ul><li>I've set <code>apply_capture </code>parameter to <code>false </code> to make sure the apply process dequeues persistent messages</li><li>I did not add any rule set to the apply so that it dequeues every message in the queue</li><li>There is no need to define an instantiation SCN or to setup a SCN in the LCR</li></ul> <pre>begin<br /> dbms_streams_adm.set_up_queue(<br /> queue_table => 'custom_queue_table',<br /> queue_name => 'custom_queue');<br />end;<br />/<span style="font-family: monospace;"></span><span class="fullpost"><pre>declare<br /> v_name varchar2(256);<br />begin<br /> select value into v_name<br /> from v$parameter<br /> where name='db_unique_name';<br />dbms_apply_adm.create_apply(<br /> queue_name => 'custom_queue',<br /> apply_name => 'custom_apply',<br /> apply_captured => false,<br /> source_database => v_name );<br />end;<br />/<br /><br />exec dbms_apply_adm.start_apply('CUSTOM_APPLY');<br /></pre></span></pre><h3><span style="font-weight: bold; font-style: italic;">Create and Enqueue DML LCRs</span></h3>Once everything ready you can build the LCRs and enqueue them in the queue so that the <code>DELETE</code>, <code></code><code></code><code>UPDATE</code> or <code></code><code> </code><code>INSERT</code> can be consumed and executed by the apply processes; You'll find below 3 example of such user defined DML LCR<br /><ul><li>User Defined DELETE LCR</li></ul><pre>declare<br /> v_name varchar2(256);<br /> lcr sys.lcr$_row_record;<br /> v_oldlist sys.lcr$_row_list;<br /> v_oldid sys.lcr$_row_unit;<br /> v_oldtxt sys.lcr$_row_unit;<br /> enqopt DBMS_AQ.ENQUEUE_OPTIONS_T;<br /> mprop DBMS_AQ.MESSAGE_PROPERTIES_T;<br /> enq_msgid RAW(16);<br />begin<br /> -- Get DB Name<br /> select value into v_name<br /> from v$parameter<br /> where name='db_unique_name';<br /><br /> mprop.SENDER_ID := SYS.AQ$_AGENT(user, null, null);<br /><br /> v_oldid:=sys.lcr$_row_unit(<br /> 'ID',<br /> anydata.convertnumber(1),<br /> DBMS_LCR.NOT_A_LOB,<br /> null,<br /> null);<br /> v_oldtxt:=sys.lcr$_row_unit(<br /> 'TEXT',<br /> anydata.convertvarchar2('Text 1'),<br /> DBMS_LCR.NOT_A_LOB,<br /> null,<br /> null);<br /> v_oldlist:=sys.lcr$_row_list(v_oldid,v_oldtxt);<br /> lcr:=sys.lcr$_row_record.construct(<br /> source_database_name=>v_name,<br /> command_type => 'DELETE',<br /> object_owner=> user,<br /> object_name => 'T1',<br /> tag => null,<br /> transaction_id => null,<br /> scn => null,<br /> old_values => v_oldlist);<br /> DBMS_AQ.ENQUEUE(<br /> queue_name => 'custom_queue',<br /> enqueue_options => enqopt,<br /> message_properties => mprop,<br /> payload => anydata.ConvertObject(lcr),<br /> msgid => enq_msgid);<br /><br />end;<br />/<br />commit;<br /><br />select *<br /> from t1;<br /><br />no rows selected<br /></pre><ul><li>User Defined INSERT LCR</li></ul><span style="font-weight: bold; font-style: italic;"></span><pre>declare<br /> v_name varchar2(256);<br /> lcr sys.lcr$_row_record;<br /> v_newlist sys.lcr$_row_list;<br /> v_newid sys.lcr$_row_unit;<br /> v_newtxt sys.lcr$_row_unit;<br /> enqopt DBMS_AQ.ENQUEUE_OPTIONS_T;<br /> mprop DBMS_AQ.MESSAGE_PROPERTIES_T;<br /> enq_msgid RAW(16);<br />begin<br /> -- Get DB Name<br /> select value into v_name<br /> from v$parameter<br /> where name='db_unique_name';<br /><br /> mprop.SENDER_ID := SYS.AQ$_AGENT(user, null, null);<br /><br /> v_newid:=sys.lcr$_row_unit(<br /> 'ID',<br /> anydata.convertnumber(1),<br /> DBMS_LCR.NOT_A_LOB,<br /> null,<br /> null);<br /> v_newtxt:=sys.lcr$_row_unit(<br /> 'TEXT',<br /> anydata.convertvarchar2('Text 9'),<br /> DBMS_LCR.NOT_A_LOB,<br /> null,<br /> null);<br /> v_newlist:=sys.lcr$_row_list(v_newid,v_newtxt);<br /> lcr:=sys.lcr$_row_record.construct(<br /> source_database_name=>v_name,<br /> command_type => 'INSERT',<br /> object_owner=> user,<br /> object_name => 'T1',<br /> tag => null,<br /> transaction_id => null,<br /> scn => null,<br /> new_values => v_newlist);<br /> DBMS_AQ.ENQUEUE(<br /> queue_name => 'custom_queue',<br /> enqueue_options => enqopt,<br /> message_properties => mprop,<br /> payload => anydata.ConvertObject(lcr),<br /> msgid => enq_msgid);<br /><br />end;<br />/<br />commit;<br /><br />select *<br /> from t1;<br /><br />ID TEXT<br />-- ------<br />1 Text 9</pre><span style="font-weight: bold; font-style: italic;"></span><ul><li>User Defined UPDATE LCR</li></ul><pre>declare<br /> v_name varchar2(256);<br /> lcr sys.lcr$_row_record;<br /> v_oldlist sys.lcr$_row_list;<br /> v_oldid sys.lcr$_row_unit;<br /> v_oldtxt sys.lcr$_row_unit;<br /> v_newlist sys.lcr$_row_list;<br /> v_newid sys.lcr$_row_unit;<br /> v_newtxt sys.lcr$_row_unit;<br /> enqopt DBMS_AQ.ENQUEUE_OPTIONS_T;<br /> mprop DBMS_AQ.MESSAGE_PROPERTIES_T;<br /> enq_msgid RAW(16);<br />begin<br /> -- Get DB Name<br /> select value into v_name<br /> from v$parameter<br /> where name='db_unique_name';<br /><br /> mprop.SENDER_ID := SYS.AQ$_AGENT(user, null, null);<br /><br /> v_oldid:=sys.lcr$_row_unit(<br /> 'ID',<br /> anydata.convertnumber(1),<br /> DBMS_LCR.NOT_A_LOB,<br /> null,<br /> null);<br /> v_newid:=sys.lcr$_row_unit(<br /> 'ID',<br /> anydata.convertnumber(1),<br /> DBMS_LCR.NOT_A_LOB,<br /> null,<br /> null);<br /> v_oldtxt:=sys.lcr$_row_unit(<br /> 'TEXT',<br /> anydata.convertvarchar2('Text 9'),<br /> DBMS_LCR.NOT_A_LOB,<br /> null,<br /> null);<br /> v_newtxt:=sys.lcr$_row_unit(<br /> 'TEXT',<br /> anydata.convertvarchar2('Text 1'),<br /> DBMS_LCR.NOT_A_LOB,<br /> null,<br /> null);<br /> v_oldlist:=sys.lcr$_row_list(v_oldid,v_oldtxt);<br /> v_newlist:=sys.lcr$_row_list(v_newid,v_newtxt);<br /> lcr:=sys.lcr$_row_record.construct(<br /> source_database_name=>v_name,<br /> command_type => 'UPDATE',<br /> object_owner=> user,<br /> object_name => 'T1',<br /> tag => null,<br /> transaction_id => null,<br /> scn => null,<br /> old_values => v_oldlist,<br /> new_values => v_newlist);<br /> DBMS_AQ.ENQUEUE(<br /> queue_name => 'custom_queue',<br /> enqueue_options => enqopt,<br /> message_properties => mprop,<br /> payload => anydata.ConvertObject(lcr),<br /> msgid => enq_msgid);<br /><br />end;<br />/<br />commit;<br /><br />select *<br /> from t1;<br /><br />ID TEXT<br />-- ------<br />1 Text 1<br /></pre><h3><span style="font-weight: bold; font-style: italic;"></span><span style="font-weight: bold; font-style: italic;">Conclusion</span></h3>As you can see, creating and enqueuing DML LCRs is very simple.<br /><br />I'm hiting an issue with DDL LCRs on top of 11.1.0.7/Linux x86 and, for now, the only way I manage to create one is by using <code>dbms_streams.convert_xml_to_lcr</code>. I'll write a new post as soon as my issue is fixed.</span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com2tag:blogger.com,1999:blog-1041798052670608448.post-62562263229990760382009-04-18T03:44:00.000-07:002009-04-18T04:00:36.707-07:00Oracle Streams Can Do Things, Mortals Can't!In my last post, "<a href="http://wedostreams.blogspot.com/2009/04/do-your-tables-need-keys-be-replicated.html">Do Your Tables Need Keys Be Replicated With Streams?</a>", I've introduced the "Manual Streams Simulator". The goal was to show how Oracle Streams works by using the simplest model we can. What I'll be showing now is that, like every tool or model, the "Manual Streams Simulator" has its limits. Actually I'll be pointing one of the things Streams can do, that you and your programs can NOT...<span class="fullpost"><br /><br />So, this post is about one of the many challenges Oracle Streams faces... because of its design. A very simple example will help you understand what I mean. Let's consider a table <code>T1</code> and with a column <code>COL1</code> and a unique constraint on <code>COL1</code>:<br /><pre>create table T1 (COL1 number,<br /> constraint T1_UK unique(COL1));<br /><br />insert into T1(COL1) values(1);<br />insert into T1(COL1) values(2);<br />commit;</pre>To identify the rows in the database, you can query their <code>ROWID</code>:<br /><pre>select ROWID, COL1<br /> from T1;<br /><br />ROWID COL1<br />------------------ ----<br />AAATF5AABAAAVxpAAA 1<br />AAATF5AABAAAVxpAAB 2<br /></pre>With Oracle and with a single update, you turn the value 1 into 2 and turn the value 2 into 1; here is how:<br /><pre>update T1 set COL1=decode(COL1,1,2,1)<br />where COL1 in (1,2);<br />commit;<br /></pre><blockquote>Note:<br />That's not the case with all the RDBMS; but if you work with Oracle that's not something you </blockquote>If you don't trust me, check the <code>ROWID</code> after the update:<br /><pre>select ROWID, COL1<br /> from T1;<br /><br />ROWID COL1<br />------------------ ----<br />AAATF5AABAAAVxpAAA 2<br />AAATF5AABAAAVxpAAB 1<br /></pre>Just think about it: the Capture process or the synchronous capture would turn that update into 2 LCRs that would be applied by 2 differents statements by the server apply process. If you trace the Apply server process, you'll see each statement actually look like that:<pre>update /*+ streams restrict_all_ref_cons */ "DESTINATION"."T1" p<br /> set "COL1"=decode(:1,'N',"COL1",:2)<br /> where (:3='N') and (:4="COL1")</pre>So? Do you see what I mean? If you want to execute 2 separate update statements to change 1 into 2 and 2 into 1, you will need to defer the unique constraint, right? A Streams apply executes the 2 updates without any defer! The reason I'm so sure, is because it will work even if the constraint is not deferrable on the destination.<br /><br />How does it manage to do it? Obviously that's an internal trick! What I know for sure is that the SCN of the capture is involved with that and I strongly suspect the <code>/*+ streams */</code> hint is part of the deal. But unless you get your hand on the code, I doubt you'll manage to guess more.<br /><br />The good news is that you can leverage that internal trick for your own; in my next post, I'll show you how you could use Streams to run an <code>INSERT</code> and a <code>DELETE</code> as if they where executed at the exact same SCN. It will be posted with my other posts <a href="http://www.pythian.com/news/author/arkzoyd">in my section of The Pythian Group Blog</a>. Just be patient, I'll keep you updated.</span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com0tag:blogger.com,1999:blog-1041798052670608448.post-88681026262497236802009-04-12T00:35:00.000-07:002009-04-12T00:41:31.121-07:00Do Your Tables Need Keys Be Replicated With Streams?There is a lot of misunderstanding regarding the "requirement" for unique or primary keys on tables you want to replicate with Oracle Streams. That's probably worth some explanations...<br /><br />Check the questions below: <ul><li>Do you need a primary or a unique key on a table to replicate it with Streams?</li><li>How would you replicate a table that contains duplicated rows?</li><li>What if the primary key of the destination table is different from the primary key of the source table?</li></ul>This post, not only answers these questions, but also explains <span style="font-style: italic;">WHY</span> things are like they are.<span class="fullpost"> <h3>The Manual Streams Simulator</h3>I'm not 100% sure what is the best way to understand the challenges of a SQL based replication. A way that has worked pretty well with me was to model it <sup>[<a href="http://wedostreams.blogspot.com/2009/04/do-your-tables-need-keys-be-replicated.html#keys-footer1">1</a>]</sup>. I've called that approach the <i>Manual Streams Simulator.</i> <br /><br />The <i>Manual Streams Simulator</i> is nothing more than a reproduction of a Streams configuration, in which <span style="font-style: italic;">YOU</span> immitate with SQL what Streams would capture, propagate or apply. Among the many advantages of this <i>Simulator</i>, there are its extreme simplicity and the fact it is free of code thus bug-free! For now, the way the replicated environment is instantiated is not of any real interest. For this reason we'll just build 2 tables and immitate a 1-way Streams table replication between those: <ul><li><code>T1</code> is the source table; it contains 3 columns and 3 lines and doesn't have any index or key</li><li><code>T2</code> is a copy of <code>T1</code> and will be used as the destination for the replication</li></ul> The script below creates the 2 tables described above:<pre>create table t1(<br /> col1 number,<br /> col2 number,<br /> col3 number);<br /><br />insert into t1 values (1,1,1);<br />insert into t1 values (1,2,2);<br />insert into t1 values (1,2,3);<br /><br />commit;<br /><br />col col1 format 9999<br />col col2 format 9999<br />col col3 format 9999<br />col rowid format a20<br /><br />select col1, col2, col3, rowid<br /> from t1;<br /><br />COL1 COL2 COL3 ROWID<br />---- ---- ---- ------------------<br /> 1 1 1 AAASzVAABAAAVxpAAA<br /> 1 2 2 AAASzVAABAAAVxpAAB<br /> 1 2 3 AAASzVAABAAAVxpAAC<br /> <br />create table t2 as select * from t1;<br /><br />select col1, col2, col3, rowid<br /> from t2;<br /><br />COL1 COL2 COL3 ROWID<br />---- ---- ---- ------------------<br /> 1 1 1 AAASzWAABAAAVxxAAA<br /> 1 2 2 AAASzWAABAAAVxxAAB<br /> 1 2 3 AAASzWAABAAAVxxAAC</pre> <h3>Not all DML statements are equals</h3> The first thing that comes to mind right after instantiating the tables is that not all DML statements are equals. The simplest DML statement you can get by far is <code>insert ... values ()</code>; if it is executed on the source table:<pre>insert into t1(col1, col2, col3)<br /> values (2,3,4);<br /><br />commit;</pre>All you have to do is execute the same statement on the destination; In this case, we just need to change the table name from <code>T1</code> to <code>T2</code>:<pre>insert into t1(col1, col2, col3)<br /> values (2,3,4);<br /><br />commit;</pre>With an update or a delete, it's different story. Try to transform the SQL statements below so they can run on the destination:<pre>update T1 set col3=5 where rowid='AAASzVAABAAAVxpAAB';<br /><br />commit;<br /><br />delete from T1 where rowid='AAASzVAABAAAVxpAAC';<br /><br />commit;</pre>What you can guess with this simple example is probably the first challenge of a SQL based replication. In the case a row is modified or deleted on the source, the process that applies the changes on the detination needs to identify the corresponding row. In the case of new inserted rows, there is no such a need.<br /><br />A consequence of the above is that, if you replicate a table that is an insert-only table, it will work in all the cases; unless it contains unsupported data types... It won't matter if you have duplicated rows in it, if you have not enabled any supplemental logging or if you don't have any way to identify rows, wether it's a key or something else. It will always work! Obviously it is a different story with deletes and updates. <blockquote>Note:<br />In the case of a recovery or a physical standby, the changes including the ones generated by deletes or updates are re-applied to the exact same data, the exact same rows. For this reason those processes can use the rowid to regenerate the changes; there is no such the constraints like the constraints of the SQL-based replication. Database recovery and physical standby MRP can rely on the rowid to replicate the changes.</blockquote> <h3>Value-Based and Row-Level Capture</h3> Most of, not to say all, the applications have SQL statements like this one:<pre>insert into mytable<br /> values(1, sysdate);<br /><br />commit;</pre>If this statement is captured like it is executed, the resulting data would differ in many cases on the source and on the destination. This is because there is no way to guaranty the same statement can be applied at the same time on both side. To insure data don't differ on both side of the replication, the 2 streams implicit captures, Synchronous and Asynchronous, don't rely on the formulas included in the original SQL: <span style="font-style: italic;">They capture the values of the changes!</span><br /><br />A direct consequence of this, is that when SQL statement modifies several rows, Streams implicit captures track the values of the changes for each one of the impacted rows; In fact they generate a Logical Change Record (LCR) for each one of the changed rows. These approaches also guaranty changes captured by a statement like the one below will be the same on the source and the destination:<pre>delete from mytable<br /> where col2<=sysdate-360;<br />commit;</pre> <blockquote>Note<br />Nothing prevents you from creating a trigger on the source database, capturing the original DML statements <a href="http://download.oracle.com/docs/cd/B28359_01/appdev.111/b28370/triggers.htm#i1006199">with the ora_sql_text event attribute</a>, managing it as user message and applying it on the destination database. However, the likelyhood it will result in different data on the destination database is very close to 100%. In addition, there is no easy or reliable way to detect what statement could lead to different data (and that's just for a 1-way replication!).</blockquote> <h3>How to identify changed rows?</h3> Lets get back to the <i>Manual Streams Simulator</i> and run a statement like the one below:<pre>update T1 set col3=5 where rowid='AAASzVAABAAAVxpAAB';<br /><br />commit;</pre>How can you write a SQL statement that would guaranty the exact same row will be changed on the destination table? <ul style="font-weight: bold;"><li>When the source and destination tables have the same primary key</li></ul> If the 2 tables have the same primary or unique key, it is easy to manage: capture the value of the key with the update and build the <code>WHERE</code> clause condition with it, instead of the <code>rowid</code>; Lets say the key is (<code>COL2</code>, <code>COL3</code><code></code>), what you would write to make sure the change will be the same on the source and destination is :<pre>update T2 set col3=5<br />where col2=2 and col3=2;<br /><br />commit;</pre>The 2 Streams implicit captures behave exactly like the <i>Manual Streams Simulator</i>: (1) If you use the synchronous capture, the OLD and NEW values for all the columns are captured anyway so Streams Apply can easily build the <code>WHERE</code> clause condition like you did; (2) If you use the asynchronous capture, all you need is to add supplemental logging for the primary key or the unique keys on the source table to get to the same result. <ul style="font-weight: bold;"><li>When the source and destination tables have a row identifier that is not enforced by a key or when the keys on the 2 tables don't match</li></ul>You won't make any difference between this second case and the previous one for the <i>Manual Streams Simulator</i>. If you know there is a unique identifier, even if it's not enforced by a key or if the key is different on the destination, you won't pay attention to that. Lets say the row identifier is (<code>COL2</code>, <code>COL3</code><code></code>) too, what you would write to make sure the change will be the same on the source and destination is identical to what you've written previously:<pre>update T2 set col3=5<br />where col2=2 and col3=2;<br /><br />commit;</pre>Obviously, Streams cannot guess what a row identifier that is not enforced by a key is. You will have to help it. In the case of the synchronous capture, all the OLD and NEW values for all the columns are captured so Streams will be able to build the SQL that corresponds with the row change without changing anything on the capture side; You just need you tell it what the identifier is on the apply side; In the case of the asynchronous capture, columns values are not captured by default and you'll need is to add the supplemental logging for the columns that are the unique identifier; here is how to do it:<pre>alter table T1<br /> add supplemental log group T1_UID(COL2, COL3) always;</pre>In both cases, you also need to tell the Apply process what the unique identifier is. There are 2 ways to do it:<br /><ul><li>you can use the <code>dbms_apply_adm.set_key_columns</code> procedure and define the identifier to use on the destination</li><li>you can create the primary key corresponding to the unique identifier of the source on the destination table. You can do it, even if the identifier you use is not enforced by a key on the source. Obviously it assume there is no duplicate values for the key on the destination. </li></ul><blockquote>Note:<br />If you don't create the key on the destination table and don't specify it with the <code>dbms_apply_adm.set_key_columns</code> procedure, the apply process assumes the key is made of all the columns of the destination table, except the ones of type LONG, LONG RAW and LOB. In the above example, if the update is captured by an asynchronous capture, it will fail with a message like the one below:<pre>ORA-26787: The row with key ("COL1","COL2","COL3")= (,2,2) does not exist in table T2<br />ORA-01403: no data found</pre>In the case of a synchronous capture, the same update will succeed because all the column values are captured and the LCR will actually contain <code>(1,2,2)</code></blockquote> <ul style="font-weight: bold;"><li>When the source table doesn't have any unique identifier</li></ul> In that case, Streams assumes all the columns, except the ones of type LONG, LONG RAW and LOB, make a key. If you use asynchronous capture, make sure you capture the data of all those columns by running the statement below:<pre>alter table T1<br /> add supplemental log data (all) columns;</pre> <blockquote>Note<br />That approach may not work for you. If there are duplicate values for what Streams considers as an identifier, the replication will work until one of those duplicate values is modified. The Apply process will then generate an error that will look like the one below: <pre>ORA-01422: exact fetch returns more than requested number of rows</pre>And you'll have to manage that error manually. In most cases, using the <code>allow_duplicate_rows</code> parameter to workaround this situation is a bad idea; however if your table doesn't have any column of type LONG, LONG RAW and LOB, a DML handler that would modify only one of the several duplicated rows can help to work around that issue.</blockquote> <h3>Retrieving rows efficiently on the destination</h3> Lets now imagine <code>T1</code> is 1 million rows, 1GB in size and you don't use any of those exadata storage servers; What if you run <code>update T1 set col3=col3+1;</code> on the source database? I confess this is probably not a good idea, even without Streams, but who knows what developers can write ?<br /><br />Capturing this change with one of the Streams implicit capture may indeed be a very bad idea. It will result in 1 million LCR and 1 update per row on the destination. If the row identifier is made of one column and is not indexed on the destination, the apply server process will perform a "table full scan" for every one of the 1 million captured changes. You can easily imagine that will just make the replication impossible in this case.<br /><br />So, if it doesn't really matter wether the source table has a primary key or not, the way you index the destination table can be very important for Streams to be viable. Keep that in mind when you setup a Streams replication with source tables that don't have primary keys. <h3>Conclusion</h3>The existence of keys, unique or primary, on the capture side of a Streams configuration greatly simplifies the replication. On the other hand, missing a key on a table doesn't mean it cannot be replicated with Streams. In most cases, you can avoid to create surrogate keys with a sequence and a trigger on the source, that won't be supported by most application: <ul><li>If a table is an insert-only table, and doesn't have unsupported data types, it can always be replicated with the implicit captures,</li><li>If you can find a combination of columns to be used to identify rows, even if it's not enforced by a key, you can use it to setup the replication</li><li>If a table has duplicated rows but no LONG, LONG RAW and LOB, you can write a DML handler to manage the error associated with the change of one of these rows</li></ul> One other important aspect to consider if you build a Streams configuration is its purpose. For example if you use Streams to migrate a database from AIX to Linux, or the other way around, managing 95% of the problem may be enough to meet your downtime requirements; You can then complete your Streams strategy with the method of your choice. And that's already another story...<br /><br /><br /><a id="keys-footer1" name="keys-footer1"></a><span style="font-size: smaller;">[1] If I tend to agree with <a href="http://prodlife.wordpress.com/2009/04/08/stand-back-im-going-to-try-science/#comments">Moshez thoughts about "science is inductive reasoning" VS "deductive reasoning and (I would add) absurd reasoning are mathematics"</a>, I would like to emphasis that the <i>Manual Streams Simulator</i> doesn't have anything to do with science; it's engineering, or I should actually say reverse engineering.<br /><br /><br /></span></span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com0tag:blogger.com,1999:blog-1041798052670608448.post-15115320747923243672009-03-29T17:01:00.001-07:002009-03-29T17:03:02.511-07:00How to Use A DDL Apply Handler to Manage "RENAME" CommandsIn my last post, <a style="font-style: italic;" href="http://wedostreams.blogspot.com/2009/03/this-is-why-streams-cannot-apply-rename.html">This Is Why Streams Cannot Apply "RENAME"!</a>, I've explained why the Streams Apply processes don't manage the <code>RENAME</code> operations. Like I said, the easiest way to deal with this is to use the <code>ALTER ... RENAME TO ... </code>command instead. Unfortunately, it's not always easy to change the code of an application. That's especially the case when it's a packaged application or if the application is maintain by a third-party.<br /><br />In this post, I'll explain how to use a DDL Apply Handler to workaround that limit. What the handler does is simply change the <code>RENAME</code> into an <code>ALTER TABLE... RENAME TO ...</code>. and set the instantiation SCN of the renamed table. If you want to test this example, you should implement a schema or global replication. There is a complete example in my previous post entitled <a style="font-style: italic;" href="http://wedostreams.blogspot.com/2009/03/renaming-schema-of-ddl-statement-in-lcr.html">Renaming The Schema Of a DDL Statement in a LCR</a> and it requires only one database. What follows is based on that example but you can easily adapt it to your needs...<span class="fullpost"><br /><br />This post is made of the following sections:<br /><ul><li><a href="http://wedostreams.blogspot.com/2009/03/how-to-use-ddl-apply-handler-to-manage.html#DDLHandler01">Setup a Schema Replication</a><br /></li><li><a href="http://wedostreams.blogspot.com/2009/03/how-to-use-ddl-apply-handler-to-manage.html#DDLHandler02">Create the DDL Apply Handler</a><br /></li><li><a href="http://wedostreams.blogspot.com/2009/03/how-to-use-ddl-apply-handler-to-manage.html#DDLHandler03">Add the DDL Handler to the Apply</a><br /></li><li><a href="http://wedostreams.blogspot.com/2009/03/how-to-use-ddl-apply-handler-to-manage.html#DDLHandler04">Execute a test</a><br /></li><li><a href="http://wedostreams.blogspot.com/2009/03/how-to-use-ddl-apply-handler-to-manage.html#DDLHandler05">Clean the environment </a></li></ul><blockquote>Note:<br />Before you start, there is a few things that are worth noting:<br /><ul><li>In order to parse the DDL text, I use a package of my own named <a href="http://arkzoyd.free.fr/wedostreams/arkzoyd_sqlcode.sql">arkzoyd_sqlcode</a>. You can download and install it in your Streams administrator's schema, e.g. <code>STRMADMIN</code>. I won't publish the source of it, at least for now, but if you are interested, leave a comment with your email; I won't publish the comment but I'll contact you.</li><li>It's more experimental than production ready; All comments are welcome but I don't guaranty you'll make it work. So far, I've successfully set it up on a 11.1.0.7 Enterprise Edition database running on Linux x86 32bits and 64bits.</li><li>I would have expected changing the DDL text in a custom rule-based transformation would have worked. But for some reasons, despite its change as well as the change of the operation type, the apply process doesn't instantiate the table after its renaming, like it does with a <code>ALTER TABLE... RENAME TO ...</code>. That's why I've used a DDL Handler instead of a custom rule-based transformation.</li><li>If the new DDL text, modified with <code>LCR$_DDL_RECORD.SET_DDL_TEXT</code>, is too small, junk characters are appended to it. That's look like a bug or at least that's something else I don't understand. To workaround that issue I pad the string with spaces at its end. </li></ul></blockquote><h3><a name="DDLHandler01"></a>Setup a Schema Replication</h3>If you don't want to use 2 separate databases, you can follow my previous blog post where I explain how to setup a schema-level replication with one only database: <a style="font-style: italic;" href="http://wedostreams.blogspot.com/2009/03/renaming-schema-of-ddl-statement-in-lcr.html">Renaming The Schema Of a DDL Statement in a LCR</a>.<br /><h3><a name="DDLHandler02"></a>Create the DDL Apply Handler</h3>The DDL Apply Handler is a procedure that takes the DDL LCR as an input. You'll find below a template of a handler that manages the <code>RENAME</code>. What it does is (1) get DDL text, (2) if the command type is <code>NULL</code> and it's related to a table, check if that's a rename and build the corresponding <code>ALTER TABLE ... RENAME TO ...</code>, (3) if that's a rename, change the command type to <code>ALTER TABLE </code>and set the instantiations SCN for the new table, (4) execute the DDL LCR; below is the script that creates it: <br /><pre>connect strmadmin/strmadmin<br /><br />@arkzoyd_sqlcode.sql<br /><br />create or replace procedure ddl_apply_handler(in_any anydata) <br />is<br /> lcr sys.lcr$_ddl_record;<br /> rc pls_integer;<br /> ddl_text clob;<br /> seq number;<br /> tlist arkzoyd_sqlcode.ddl_rename_output_t; <br />begin<br /> -- Access the LCR and create a clob for the ddl text<br /> rc := in_any.getobject(lcr);<br /> dbms_lob.createtemporary(ddl_text, true);<br /><br /> -- Get the DDL text<br /> lcr.get_ddl_text(ddl_text);<br /><br /> -- Transform the DDL<br /> if (lcr.get_command_type is null)<br /> and (lcr.get_object_type='TABLE') then<br /> begin<br /> tlist:=arkzoyd_sqlcode.get_renamed_tables(ddl_text);<br /> exception when others then<br /> -- That's not a rename command<br /> tlist.old_table_name:=null; <br /> end; <br /> if (tlist.old_table_name is not null) then<br /> lcr.set_ddl_text(rpad(to_clob('alter table '||<br /> tlist.old_table_name||<br /> ' rename to ' ||<br /> tlist.new_table_name)<br /> ,200));<br /> lcr.set_command_type('ALTER TABLE');<br /><br /> dbms_apply_adm.set_table_instantiation_scn(<br /> lcr.get_base_table_owner()||'.'||<br /> tlist.new_table_name ,<br /> lcr.get_source_database_name() ,<br /> lcr.get_scn());<br /> end if; <br /> end if;<br /><br /> lcr.execute();<br /> -- Free temporary LOB space<br /> dbms_lob.freetemporary(ddl_text);<br />end;<br />/<br /></pre><h3><a name="DDLHandler03"></a>Add the DDL Handler to the Apply</h3>Once the procedure created, add it to the apply:<br /><pre>begin<br /> dbms_apply_adm.stop_apply('streams_apply');<br /><br /> dbms_apply_adm.alter_apply(<br /> apply_name => 'streams_apply',<br /> ddl_handler => 'strmadmin.ddl_apply_handler');<br /><br /> dbms_apply_adm.start_apply('streams_apply');<br /><br />end;<br />/<br /><br />col apply_name format a13<br />col ddl_handler format a40<br />select apply_name,ddl_handler<br /> from dba_apply;<br /><br />APPLY_NAME DDL_HANDLER<br />------------- ----------------------------------------<br />STREAMS_APPLY "STRMADMIN"."DDL_APPLY_HANDLER"<br /></pre><h3><a name="DDLHandler04"></a>Execute a test </h3>Once the DDL Handler configure, we can run a test check if it's working: <br /><pre>connect source/source<br /><br />create table test (id number);<br /><br />insert into test values (1);<br /><br />commit;<br /><br />rename test to newtest;<br /><br />insert into newtest values (2);<br /><br />commit;<br /><br />connect destination/destination<br /><br />select * from destination.newtest;<br /><br />ID<br />--<br />1<br />2<br /><br />connect source/source<br /><br />drop table newtest purge;<br /><br />connect destination/destination<br /><br />select * from destination.newtest<br /> *<br />ERROR at line 1:<br />ORA-00942: table or view does not exist<br /></pre><h3><a name="DDLHandler05"></a>Clean the environment</h3>To leave the environment like it was before you've created it; You can run the script below and refer to the <a href="http://wedostreams.blogspot.com/2009/03/renaming-schema-of-ddl-statement-in-lcr.html#transformddl09">last section of<span style="text-decoration: underline;"><span style="font-style: italic;"> </span></span></a><a style="font-style: italic;" href="http://wedostreams.blogspot.com/2009/03/renaming-schema-of-ddl-statement-in-lcr.html#transformddl09">Renaming The Schema Of a DDL Statement in a LCR</a> to remove the schema level replication: <pre>begin<br /> dbms_apply_adm.stop_apply('streams_apply');<br /><br /> dbms_apply_adm.alter_apply(<br /> apply_name => 'streams_apply',<br /> remove_ddl_handler => true);<br /><br /> dbms_apply_adm.start_apply('streams_apply');<br /><br />end;<br />/<br /><br />col apply_name format a13<br />col ddl_handler format a40<br /><br />select apply_name,ddl_handler<br /> from dba_apply;<br /><br />APPLY_NAME DDL_HANDLER<br />------------- ----------------------------------------<br />STREAMS_APPLY<br /><br />drop package body arkzoyd_sqlcode;<br />drop package arkzoyd_sqlcode;<br />drop procedure ddl_apply_handler;</pre></span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com1tag:blogger.com,1999:blog-1041798052670608448.post-1711090826079839442009-03-20T10:16:00.000-07:002009-03-20T10:28:52.934-07:00This Is Why Streams Cannot Apply "RENAME"!You've probably mentioned it in the "<a href="http://download.oracle.com/docs/cd/B28359_01/server.111/b28321/ap_restrictions.htm#sthref1582">Types of DDL Changes Ignored by an Apply Process</a>" section of the documentation, Streams processes cannot apply a <code>RENAME</code> command, though it can capture it and you can catch it in a DDL handler. But, do you know why?<br /><br />The answer to that question actually stands in the 2 commands below:<br /><span class="fullpost"><pre>alter session set current_schema=scott;<br /><br />rename emp to emp2;<br />*<br />ERROR at line 1:<br />ORA-03001: unimplemented feature<br /></pre>For some obscure reasons <code>RENAME</code> cannot be used after <code>current_schema</code> has been changed. Unfortunately the apply process changes the current schema to apply DDL so that it can be executed, even if the schema impacted by the DDL command is not in the source command. So, rename does not work. Obviously you can write a DDL handler and parse the command to manage that manually. But if you think that's easy, ask yourself why the Streams development team did not to that themselves to provide the feature out-of-the-box... Then run:<pre>create table "alter table rename"(col1 number);<br /><br />/* alter table rename */ rename "alter table rename" to "rename";</pre>There is a few remaining mystery, though: If <code>ALTER TABLE ... RENAME TO ...</code> doesn't suffer those limits, why do DBA and developers execute <code>RENAME</code> to rename a table? Will that change in 11g Release 2?</span>Gregoryhttp://www.blogger.com/profile/03131420011493218178noreply@blogger.com0