@@ -32,11 +32,13 @@ use tokio::sync::oneshot;
3232#[ cfg( doc) ]  
3333use  crate :: ConvexClient ; 
3434use  crate :: { 
35+  convex_logs, 
3536 sync:: { 
3637 ReconnectProtocolReason , 
3738 ServerMessage , 
3839 } , 
3940 value:: Value , 
41+  ConvexError , 
4042} ; 
4143
4244mod  request_manager; 
@@ -71,7 +73,7 @@ struct Query {
7173} 
7274
7375/// An identifier for a single subscriber to a query. 
74- #[ derive( Copy ,  Clone ,  Debug ,  Default ,  Eq ,  PartialEq ,  PartialOrd ,  Ord ) ]  
76+ #[ derive( Copy ,  Clone ,  Debug ,  Default ,  Eq ,  PartialEq ,  PartialOrd ,  Ord ,   Hash ) ]  
7577#[ cfg_attr( test,  derive( proptest_derive:: Arbitrary ) ) ]  
7678pub  struct  SubscriberId ( QueryId ,  usize ) ; 
7779
@@ -165,7 +167,7 @@ impl LocalSyncState {
165167 None  => panic ! ( "INTERNAL BUG: Unknown query id {query_id}" ) , 
166168 Some ( t)  => t, 
167169 } ; 
168-  let  mut   local_query = match  self . query_set . get_mut ( & query_token)  { 
170+  let  local_query = match  self . query_set . get_mut ( & query_token)  { 
169171 None  => panic ! ( "INTERNAL BUG: No query found for query token {query_token:?}" , ) , 
170172 Some ( q)  => q, 
171173 } ; 
@@ -273,7 +275,8 @@ impl RemoteQuerySet {
273275 start_version, 
274276 end_version, 
275277 modifications, 
276-  }  = transition else  { 
278+  }  = transition
279+  else  { 
277280 panic ! ( "not transition" ) ; 
278281 } ; 
279282 if  start_version != self . version  { 
@@ -290,22 +293,33 @@ impl RemoteQuerySet {
290293 StateModification :: QueryUpdated  { 
291294 query_id, 
292295 value, 
293-  log_lines :  _ , 
296+  log_lines, 
294297 journal :  _, 
295298 }  => { 
299+  for  log_line in  log_lines. 0  { 
300+  convex_logs ! ( "{}" ,  log_line) ; 
301+  } 
296302 self . remote_query_set 
297303 . insert ( query_id,  FunctionResult :: Value ( value) ) ; 
298304 } , 
299305 StateModification :: QueryFailed  { 
300306 query_id, 
301307 error_message, 
302-  log_lines :  _ , 
308+  log_lines, 
303309 journal :  _, 
304-  // TODO @srb: Implement ConvexError in Rust client queries 
305-  error_data :  _, 
310+  error_data, 
306311 }  => { 
307-  self . remote_query_set 
308-  . insert ( query_id,  FunctionResult :: ErrorMessage ( error_message) ) ; 
312+  for  log_line in  log_lines. 0  { 
313+  convex_logs ! ( "{}" ,  log_line) ; 
314+  } 
315+  let  function_result = match  error_data { 
316+  Some ( v)  => FunctionResult :: ConvexError ( ConvexError  { 
317+  message :  error_message, 
318+  data :  v, 
319+  } ) , 
320+  None  => FunctionResult :: ErrorMessage ( error_message) , 
321+  } ; 
322+  self . remote_query_set . insert ( query_id,  function_result) ; 
309323 } , 
310324 StateModification :: QueryRemoved  {  query_id }  => { 
311325 self . remote_query_set . remove ( & query_id) ; 
@@ -368,7 +382,7 @@ impl OptimisticQueryResults {
368382/// The main methods, [`subscribe`](Self::subscribe()), 
369383/// [`unsubscribe`](Self::unsubscribe()), and 
370384/// [`mutation`](Self::mutation()) directly correspond to its 
371- /// equivalent for the external [ConvexClient](crate::ConvexClient) . 
385+ /// equivalent for the external [ConvexClient]. 
372386/// 
373387/// The only different method is [`get_query`](Self::get_query()), which 
374388/// returns the current value for a query given its query id. This method can be 
@@ -583,22 +597,16 @@ impl BaseConvexClient {
583597 } 
584598 return  Ok ( Some ( self . state . latest_results . clone ( ) ) ) ; 
585599 } , 
586-  ServerMessage :: QueriesFailed  {  failures }  => { 
587-  // Note that we never expect to receive this as it is not sent by the server. 
588-  for  failure in  failures { 
589-  tracing:: error![ failure. message] ; 
590-  } 
591-  tracing:: error!( 
592-  "Received unexpected QueriesFailed from server. Restarting protocol." 
593-  ) ; 
594-  return  Err ( "QueriesFailed, see tracing::error for more details." . to_string ( ) ) ; 
595-  } , 
596600 ServerMessage :: MutationResponse  { 
597601 request_id, 
598602 result, 
599603 ts, 
600-  log_lines :  _ , 
604+  log_lines, 
601605 }  => { 
606+  for  log_line in  log_lines. 0  { 
607+  convex_logs ! ( "{}" ,  log_line) ; 
608+  } 
609+ 
602610 if  let  Some ( ts)  = ts { 
603611 self . observe_timestamp ( ts) ; 
604612 } 
@@ -629,8 +637,11 @@ impl BaseConvexClient {
629637 ServerMessage :: ActionResponse  { 
630638 request_id, 
631639 result, 
632-  log_lines :  _ , 
640+  log_lines, 
633641 }  => { 
642+  for  log_line in  log_lines. 0  { 
643+  convex_logs ! ( "{}" ,  log_line) ; 
644+  } 
634645 let  request_id = RequestId :: new ( request_id) ; 
635646 self . request_manager . update_request ( 
636647 & request_id, 
@@ -672,9 +683,7 @@ impl BaseConvexClient {
672683 let  remote_query_results = & self . remote_query_set . remote_query_set ; 
673684 let  mut  query_id_to_value = BTreeMap :: new ( ) ; 
674685 for  ( query_id,  result)  in  remote_query_results. iter ( )  { 
675-  let  Some ( _udf_path)  = self 
676-  . state 
677-  . query_path ( * query_id)  else  { 
686+  let  Some ( _udf_path)  = self . state . query_path ( * query_id)  else  { 
678687 // It's possible that we've already unsubscribed to this query but 
679688 // the server hasn't learned about that yet. If so, ignore this one. 
680689 continue ; 
@@ -701,3 +710,17 @@ impl BaseConvexClient {
701710 self . optimistic_query_results . query_result ( query_id) 
702711 } 
703712} 
713+ 
714+ /// Macro used for piping UDF logs to a custom formatter that exposes 
715+ /// just the log content, without any additional Rust metadata. 
716+ #[ macro_export]  
717+ macro_rules!  convex_logs { 
718+  ( target:  $target: expr,  $( $arg: tt) +)  => { 
719+  tracing:: event!( target:  "convex_logs" ,  tracing:: Level :: DEBUG ,  $( $arg) +) ; 
720+  // Additional custom behavior can be added here 
721+  } ; 
722+  ( $( $arg: tt) +)  => { 
723+  tracing:: event!( target:  "convex_logs" ,  tracing:: Level :: DEBUG ,  $( $arg) +) ; 
724+  // Additional custom behavior can be added here 
725+  } ; 
726+ } 
0 commit comments