
    Ki                          S SK r S SKrS SKrS SKJr  S SKrS SKJrJrJ	r	   S SK
Jr  SrSr " S S\5      rg! \ a	    S SKJr   Nf = f)	    N)Thread)APIErrorDatetimeSerializer
batch_post)Emptyi  i  P c                   t    \ rS rSrSr\R                  " S5      r        SS jrS r	S r
S rS	 rS
 rSrg)Consumer   z.Consumes the messages from the client's queue.posthogNc                     [         R                  " U 5        SU l        X0l        X`l        X l        X@l        XPl        Xl        Xpl	        SU l
        Xl        Xl        Xl        g)zCreate a consumer thread.TN)r   __init__daemonflush_atflush_intervalapi_keyhoston_errorqueuegziprunningretriestimeouthistorical_migration)selfr   r   r   r   r   r   r   r   r   r   s              R/var/www/html/dynamic-report/venv/lib/python3.13/site-packages/posthog/consumer.pyr   Consumer.__init__   sT     	 ,	 
	
 $8!    c                     U R                   R                  S5        U R                  (       a#  U R                  5         U R                  (       a  M#  U R                   R                  S5        g)zRuns the consumer.zconsumer is running...zconsumer exited.N)logdebugr   uploadr   s    r   runConsumer.run=   sA    /0llKKM lll 	)*r   c                     SU l         g)zPause the consumer.FN)r   r"   s    r   pauseConsumer.pauseE   s	    r   c                    SnU R                  5       n[        U5      S:X  a  g U R                  U5        SnU H  nU R                  R                  5         M     U$ ! [         aJ  nU R                  R                  SU5        SnU R                  (       a  U R                  X25         SnANtSnAff = f! U H  nU R                  R                  5         M     Us  s $ = f)z:Upload the next batch of items, return whether successful.Fr   Tzerror uploading: %sN)	nextlenrequest	Exceptionr   errorr   r   	task_done)r   successbatcheitems        r   r!   Consumer.uploadI   s    		u:?	LLG 

$$& N  	(HHNN0!4G}}a'		( 

$$& Ns*   A 
B0&A B+&B3 +B00B3 3(Cc                    U R                   n/ n[        R                  " 5       nSn[        U5      U R                  :  Ga  [        R                  " 5       U-
  nXPR
                  :  a   U$  UR                  SU R
                  U-
  S9n[        [        R                  " U[        S9R                  5       5      nU[        :  a'  U R                  R                  S[        U5      5        M  UR                  U5        XG-  nU[         :  a  U R                  R#                  SU5         U$  [        U5      U R                  :  a  GM  U$ ! [$         a     U$ f = f)z)Return the next batch of items to upload.r   T)blockr   )clsz)Item exceeds 900kib limit, dropping. (%s)zhit batch size limit (size: %d))r   time	monotonicr*   r   r   getjsondumpsr   encodeMAX_MSG_SIZEr   r-   strappendBATCH_SIZE_LIMITr    r   )r   r   items
start_time
total_sizeelapsedr2   	item_sizes           r   r)   Consumer.next^   s0   

^^%

%j4==(nn&3G---" !yytT5H5H75RyS

45G H O O QR	|+HHNNCSY T"'
!11HHNN#DjQ  2 %j4==((   s   +A<E );E 
EEc                    ^ ^ S n[         R                  " [         R                  [        T R                  S-   US9UU 4S j5       nU" 5         g)z=Attempt to upload the batch and retry before raising an errorc                     [        U [        5      (       aF  U R                  S:X  a  gSU R                  s=:*  =(       a    S:  Os  =(       a    U R                  S:g  $ g)NzN/AFi  i  i  )
isinstancer   status)excs    r   fatal_exception)Consumer.request.<locals>.fatal_exception   sJ    #x(( ::& szz//C/FSZZ35FF r      )	max_triesgiveupc            	         > [        TR                  TR                  TR                  TR                  T TR
                  S9  g )N)r   r   r0   r   )r   r   r   r   r   r   )r0   r   s   r   send_request&Consumer.request.<locals>.send_request   s5     		YY%)%>%>r   N)backoffon_exceptionexpor,   r   )r   r0   rL   rR   s   ``  r   r+   Consumer.request|   sD    
	 
		LL)t||a/?

	

	 	r   )r   r   r   r   r   r   r   r   r   r   r   r   )d   NNg      ?F
      F)__name__
__module____qualname____firstlineno____doc__logging	getLoggerr   r   r#   r&   r!   r)   r+   __static_attributes__ r   r   r	   r	      sN    8


I
&C "9B+*<r   r	   )r:   r`   r7   	threadingr   rT   posthog.requestr   r   r   r   r   ImportErrorQueuer=   r@   r	   rc   r   r   <module>rh      sS         D D
  # Av A  s   8 AA