procedure xpp_internal_process_request(
context raw,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw,
payloadl number)
is
/**
* The procedure is responsible process internal queue of XPP.
*
* @param context The context. (Defined by Oracle)
* @param reginfo The reginfo. (Defined by Oracle)
* @param descr The description of processed queue. (Defined by Oracle)
* @param payload The message for processing. (Defined by Oracle)
* @param payloadl The payloadl should be identification of the message. (Defined by Oracle)
*
*/
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message xpp_type_request;
e_no_messages exception;
pragma exception_init (e_no_messages, -25228);
--
l_curr_schema varchar2(100);
l_curr_UID varchar2(100);
l_procedure_name varchar2(100) := 'xpp_internal_process_request';
l_error_text xpp_log_trace.msg%type;
begin
xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Start.');
dequeue_options.dequeue_mode := dbms_aq.REMOVE;
dequeue_options.msgid := descr.msg_id;
dequeue_options.consumer_name := descr.consumer_name;
dequeue_options.wait := dbms_aq.NO_WAIT;
dbms_aq.DEQUEUE(
queue_name => descr.queue_name
,dequeue_options => dequeue_options
,message_properties => message_properties
,payload => message
,msgid => message_handle
);
xpp_log.add_log(xpp_log.c_warning, l_procedure_name, 'dbms_aq.dequeue operation: '||to_char(message.operation)||'.');
begin
select
sys_context('userenv','current_schema'),
sys_context('userenv','SESSION_USER')
into
l_curr_schema,
l_curr_UID
from dual;
exception
when others then
null;
end;
xpp_log.add_log(
xpp_log.c_info
,l_procedure_name
,'operation: '||to_char(message.operation)||', '
||' xpp_schema: '||to_char(message.xpp_schema)||', '
||' queue_name: '||to_char(message.queue_name)||', '
||' queue_table_name: '||to_char(message.queue_table_name)||', '
||' queue_type_name: '||to_char(message.queue_type_name)||', '
||' storage_clause: '||to_char(message.storage_clause)||', '
||' queue_comments: '||to_char(message.queue_comments)||', '
||' user_to_grant: '||to_char(message.user_to_grant)||', '
||' name_of_procedure: '||to_char(message.name_of_procedure)||', '
||' recipient_name: '||to_char(message.recipient_name)||', '
||' seq_xpp_internal_msg: '||to_char(message.seq_xpp_internal_msg)||', '
||' username: '||to_char(l_curr_UID)||', '
||' schema: '||to_char(l_curr_schema)||', '
||' message_handle: '||to_char(message_handle)||'. '
);
if message.operation = 'define_queue_table' then
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => message.queue_table_name
-- , sort_list => 'PRIORITY,ENQ_TIME'
, queue_payload_type => message.queue_type_name
, storage_clause => message.storage_clause
, multiple_consumers => true
-- FALSE means queues created in the table can only have one consumer for each message. This is the default.
-- TRUE means queues created in the table can have multiple consumers for each message.
);
xpp_log.add_log( xpp_log.c_info, l_procedure_name,'Sucessfull procesing. queue table: '||to_char(message.queue_table_name)||' created.');
-- send message OK
xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'OK');
elsif message.operation = 'destroy_queue_table' then
DBMS_AQADM.DROP_QUEUE_TABLE(
queue_table => message.queue_table_name
);
xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull procesing. queue table: '||to_char(message.queue_table_name)||' dropped.');
-- send message OK
xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'OK');
elsif message.operation = 'define_queue' then
DBMS_AQADM.CREATE_QUEUE (
queue_name => message.xpp_schema||'.'||message.queue_name
--, xpp_schema => message.xpp_schema
, queue_table => message.queue_table_name
, queue_type => DBMS_AQADM.NORMAL_QUEUE
, max_retries => 0
, retry_delay => 0
, retention_time => 1209600
, dependency_tracking => false
, comment => message.queue_comments
, auto_commit => false
);
xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull procesing. queue: '||to_char(message.queue_name)||' created.');
dbms_lock.sleep(3);
DBMS_AQADM.START_QUEUE(
queue_name => message.xpp_schema||'.'||message.queue_name
);
xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull procesing. queue: '||to_char(message.queue_name)||' started.');
for one_user in (
SELECT REGEXP_SUBSTR (val, '[0-9a-zA-Z]+', 1, lvl) u
FROM(
SELECT val, LEVEL lvl
FROM (
select trim(message.user_to_grant) as val
from dual
)
CONNECT BY LEVEL <=
LENGTH (val)
- LENGTH (REPLACE (val, ','))
+ NVL(LENGTH(TRIM(REPLACE (substr(val, length(val), 1), ','))),0)
)
) loop
DBMS_AQADM.GRANT_QUEUE_PRIVILEGE (
privilege => 'ALL'
, queue_name => message.queue_name
, grantee => one_user.u
, grant_option => false
);
xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull procesing. queue: '||to_char(one_user.u)||' granted.');
end loop;
-- send message OK
xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'OK');
elsif message.operation = 'destroy_queue' then
DBMS_AQADM.STOP_QUEUE(
queue_name => message.xpp_schema||'.'||message.queue_name
);
xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull procesing. queue: '||to_char(message.queue_name)||' stopped.');
DBMS_AQADM.DROP_QUEUE(
queue_name => message.xpp_schema||'.'||message.queue_name
);
xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull procesing.queue: '||to_char(message.queue_name)||' dropped.');
-- send message OK
xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'OK');
elsif message.operation = 'add_subscriber' then
DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => message.xpp_schema||'.'||message.queue_name
,subscriber => sys.aq$_agent( message.recipient_name, null, null )
);
xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull adding of subscription '||to_char(message.recipient_name)||' queue: '||to_char(message.xpp_schema||'.'||message.queue_name)||' added.');
-- send message OK
xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'OK');
elsif message.operation = 'del_subscriber' then
DBMS_AQADM.REMOVE_SUBSCRIBER(
queue_name => message.xpp_schema||'.'||message.queue_name
,subscriber => sys.aq$_agent( message.recipient_name, null, null )
);
xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull remove of subscription '||to_char(message.recipient_name)||' queue: '||to_char(message.xpp_schema||'.'||message.queue_name)||' removed.');
-- send message OK
xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'OK');
elsif message.operation = 'add_notify_procedure' then
dbms_aq.register (
sys.aq$_reg_info_list(
sys.aq$_reg_info(
message.xpp_schema||'.'||message.queue_name||':'||message.name_of_procedure
,dbms_aq.namespace_aq
,'plsql://'||message.xpp_schema||'.'||message.name_of_procedure
,hextoraw('ff')
)
)
,1
);
xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull add of notification procedure '||to_char(message.name_of_procedure)||' queue: '||to_char(message.queue_name)||' added.');
-- send message OK
xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'OK');
elsif message.operation = 'del_notify_procedure' then
xpp_log.add_log(xpp_log.c_info, l_procedure_name, '1 ['||message.xpp_schema||'.'||message.queue_name||':'||message.name_of_procedure||'] 2: ['||'plsql://'||message.xpp_schema||'.'||message.name_of_procedure||'].');
dbms_aq.unregister (
sys.aq$_reg_info_list(
sys.aq$_reg_info(
message.xpp_schema||'.'||message.queue_name||':'||message.name_of_procedure
,dbms_aq.namespace_aq
,'plsql://'||message.xpp_schema||'.'||message.name_of_procedure
,hextoraw('ff')
)
)
,1
);
xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'Sucessfull remove of notification procedure '||to_char(message.name_of_procedure)||' queue: '||to_char(message.queue_name)||' removed.');
-- send message OK
xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'OK');
else
xpp_log.add_log(xpp_log.c_error, l_procedure_name, 'Unknown operation: '||to_char(message.operation)||'.');
-- send message Error
xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', 'ERR: Unknown operation: '||to_char(message.operation)||'.');
end if;
xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'End.');
exception
when e_no_messages then
xpp_log.add_log(xpp_log.c_info, l_procedure_name, 'End no message to process.');
when others then
l_error_text := substr('ERR: ORA-'||to_char(SQLCODE)||' >'||SQLERRM||'< '||dbms_utility.format_error_backtrace, 1, 3900);
xpp_log.add_log(xpp_log.c_error, l_procedure_name, l_error_text);
-- send message Error
xpp_internal.send_message('m_'||to_char(message.seq_xpp_internal_msg)||'_result', l_error_text);
--commit;
end xpp_internal_process_request; |