TEXT
procedure xpp_internal_process_request(
                                      context raw,
                                      reginfo sys.aq$_reg_info,
                                      descr sys.aq$_descriptor,
                                      payload raw,
                                      payloadl number)
is
/**
 * The procedure is responsible process internal queue of XPP. 
 * 
 * @param context                The context. (Defined by Oracle)
 * @param reginfo                The reginfo. (Defined by Oracle)
 * @param descr                  The description of processed queue. (Defined by Oracle)
 * @param payload                The message for processing. (Defined by Oracle)
 * @param payloadl               The payloadl should be identification of the message. (Defined by Oracle)
 *  
 */ 
 dequeue_options dbms_aq.dequeue_options_t;
 message_properties dbms_aq.message_properties_t;
 message_handle RAW(16);
 message xpp_type_request;

 e_no_messages             exception;
 pragma exception_init (e_no_messages,             -25228);


 --
 l_curr_schema    varchar2(100);
 l_curr_UID       varchar2(100);
 l_procedure_name varchar2(100) := 'xpp_internal_process_request';
 l_error_text     xpp_log_trace.msg%type;

begin

       xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Start.');


       dequeue_options.dequeue_mode  := dbms_aq.REMOVE;
       dequeue_options.msgid         := descr.msg_id;
       dequeue_options.consumer_name := descr.consumer_name;
       dequeue_options.wait          := dbms_aq.NO_WAIT;
       dbms_aq.DEQUEUE(
           queue_name => descr.queue_name
          ,dequeue_options => dequeue_options
          ,message_properties => message_properties
          ,payload => message
          ,msgid => message_handle
       );

       xpp_log.add_log(xpp_log.c_warning, l_procedure_name, 'dbms_aq.dequeue operation: '||to_char(message.operation)||'.');


         begin
           select
             sys_context('userenv','current_schema'),
             sys_context('userenv','SESSION_USER')
           into
             l_curr_schema,
             l_curr_UID
           from dual;
         exception
           when others then
              null;
         end;

       xpp_log.add_log(
         xpp_log.c_info
        ,l_procedure_name
        ,'operation: '||to_char(message.operation)||', '
       ||' xpp_schema: '||to_char(message.xpp_schema)||', '
       ||' queue_name: '||to_char(message.queue_name)||', '
       ||' queue_table_name: '||to_char(message.queue_table_name)||', '
       ||' queue_type_name: '||to_char(message.queue_type_name)||', '
       ||' storage_clause: '||to_char(message.storage_clause)||', '
       ||' queue_comments: '||to_char(message.queue_comments)||', '
       ||' user_to_grant: '||to_char(message.user_to_grant)||', '
       ||' name_of_procedure: '||to_char(message.name_of_procedure)||', '
       ||' recipient_name: '||to_char(message.recipient_name)||', '
       ||' seq_xpp_internal_msg: '||to_char(message.seq_xpp_internal_msg)||', '
       ||' username: '||to_char(l_curr_UID)||', '
       ||' schema: '||to_char(l_curr_schema)||', '
       ||' message_handle: '||to_char(message_handle)||'. '
       );



       if message.operation = 'define_queue_table' then
            DBMS_AQADM.CREATE_QUEUE_TABLE (
                   queue_table        => message.queue_table_name
                 -- , sort_list => 'PRIORITY,ENQ_TIME'
                 , queue_payload_type => message.queue_type_name
                 , storage_clause     => message.storage_clause
                 , multiple_consumers => true
                 -- FALSE means queues created in the table can only have one consumer for each message. This is the default.
                 -- TRUE means queues created in the table can have multiple consumers for each message.
            );
            xpp_log.add_log( xpp_log.c_info, l_procedure_name,'Sucessfull procesing. queue table: '||to_char(message.queue_table_name)||' created.');
            -- send message OK
            xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'OK');

       elsif message.operation = 'destroy_queue_table' then
            DBMS_AQADM.DROP_QUEUE_TABLE(
                   queue_table => message.queue_table_name
            );
            xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull procesing. queue table: '||to_char(message.queue_table_name)||' dropped.');
            -- send message OK
            xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'OK');


       elsif message.operation = 'define_queue' then
            DBMS_AQADM.CREATE_QUEUE (
                queue_name          => message.xpp_schema||'.'||message.queue_name
              --, xpp_schema          => message.xpp_schema
              , queue_table         => message.queue_table_name
              , queue_type          => DBMS_AQADM.NORMAL_QUEUE
              , max_retries         => 0
              , retry_delay         => 0
              , retention_time      => 1209600
              , dependency_tracking => false
              , comment             => message.queue_comments
              , auto_commit         => false
            );
            xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull procesing. queue: '||to_char(message.queue_name)||' created.');

            dbms_lock.sleep(3);

            DBMS_AQADM.START_QUEUE(
                queue_name   => message.xpp_schema||'.'||message.queue_name
            );

            xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull procesing. queue: '||to_char(message.queue_name)||' started.');


            for one_user in (
                SELECT REGEXP_SUBSTR (val, '[0-9a-zA-Z]+', 1, lvl) u
                FROM(
                    SELECT     val, LEVEL lvl
                    FROM (
                       select trim(message.user_to_grant) as val
                       from dual
                    )
                    CONNECT BY LEVEL <=
                                    LENGTH (val)
                                  - LENGTH (REPLACE (val, ','))
                                  + NVL(LENGTH(TRIM(REPLACE (substr(val, length(val), 1), ','))),0)
                )
            ) loop
                DBMS_AQADM.GRANT_QUEUE_PRIVILEGE (
                    privilege    => 'ALL'
                  , queue_name   =>  message.queue_name
                  , grantee      =>  one_user.u
                  , grant_option =>  false
                );

                xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull procesing. queue: '||to_char(one_user.u)||' granted.');
            end loop;
            -- send message OK
            xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'OK');


       elsif message.operation = 'destroy_queue' then

            DBMS_AQADM.STOP_QUEUE(
                queue_name   => message.xpp_schema||'.'||message.queue_name
            );

            xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull procesing. queue: '||to_char(message.queue_name)||' stopped.');

            DBMS_AQADM.DROP_QUEUE(
               queue_name => message.xpp_schema||'.'||message.queue_name
            );
            xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull procesing.queue: '||to_char(message.queue_name)||' dropped.');
            -- send message OK
            xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'OK');

       elsif message.operation = 'add_subscriber' then

            DBMS_AQADM.ADD_SUBSCRIBER(
               queue_name => message.xpp_schema||'.'||message.queue_name
              ,subscriber => sys.aq$_agent( message.recipient_name, null, null )
            );
            xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull adding of subscription '||to_char(message.recipient_name)||' queue: '||to_char(message.xpp_schema||'.'||message.queue_name)||' added.');
            -- send message OK
            xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'OK');

       elsif message.operation = 'del_subscriber' then
       
            DBMS_AQADM.REMOVE_SUBSCRIBER(
               queue_name => message.xpp_schema||'.'||message.queue_name
              ,subscriber => sys.aq$_agent( message.recipient_name, null, null )
            );
            xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull remove of subscription '||to_char(message.recipient_name)||' queue: '||to_char(message.xpp_schema||'.'||message.queue_name)||' removed.');
            -- send message OK
            xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'OK');

       elsif message.operation = 'add_notify_procedure' then

            dbms_aq.register (
               sys.aq$_reg_info_list(
                  sys.aq$_reg_info(
                     message.xpp_schema||'.'||message.queue_name||':'||message.name_of_procedure
                    ,dbms_aq.namespace_aq
                    ,'plsql://'||message.xpp_schema||'.'||message.name_of_procedure
                    ,hextoraw('ff')
                  )
               )
               ,1
            );
            xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull add of notification procedure '||to_char(message.name_of_procedure)||' queue: '||to_char(message.queue_name)||' added.');
            -- send message OK
            xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'OK');


       elsif message.operation = 'del_notify_procedure' then

            xpp_log.add_log(xpp_log.c_info, l_procedure_name, '1 ['||message.xpp_schema||'.'||message.queue_name||':'||message.name_of_procedure||'] 2: ['||'plsql://'||message.xpp_schema||'.'||message.name_of_procedure||'].');

            dbms_aq.unregister (
               sys.aq$_reg_info_list(
                  sys.aq$_reg_info(
                     message.xpp_schema||'.'||message.queue_name||':'||message.name_of_procedure
                    ,dbms_aq.namespace_aq
                    ,'plsql://'||message.xpp_schema||'.'||message.name_of_procedure
                    ,hextoraw('ff')
                  )
               )
               ,1
            );
            xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull remove of notification procedure '||to_char(message.name_of_procedure)||' queue: '||to_char(message.queue_name)||' removed.');
            -- send message OK
            xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'OK');


       else
            xpp_log.add_log(xpp_log.c_error, l_procedure_name, 'Unknown operation: '||to_char(message.operation)||'.');
            -- send message Error
            xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'ERR: Unknown operation: '||to_char(message.operation)||'.');
       end if;



   xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'End.');

exception
     when e_no_messages then
       xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'End no message to process.');
     when others then
        l_error_text := substr('ERR: ORA-'||to_char(SQLCODE)||' >'||SQLERRM||'< '||dbms_utility.format_error_backtrace, 1, 3900); 
        xpp_log.add_log(xpp_log.c_error, l_procedure_name, l_error_text);
       
        -- send message Error
        xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', l_error_text);

   --commit;
end xpp_internal_process_request;