hw-kafka-client-5.3.0: Kafka bindings for Haskell
Safe HaskellNone
LanguageHaskell2010

Kafka.Producer

Description

Module to produce messages to Kafka topics.

Here's an example of code to produce messages to a topic:

import Control.Exception (bracket)
import Control.Monad (forM_)
import Data.ByteString (ByteString)
import Kafka.Producer

-- Global producer properties
producerProps :: ProducerProperties
producerProps = brokersList ["localhost:9092"]
             <> logLevel KafkaLogDebug

-- Topic to send messages to
targetTopic :: TopicName
targetTopic = TopicName "kafka-client-example-topic"

-- Run an example
runProducerExample :: IO ()
runProducerExample =
    bracket mkProducer clProducer runHandler >>= print
    where
      mkProducer = newProducer producerProps
      clProducer (Left _)     = pure ()
      clProducer (Right prod) = closeProducer prod
      runHandler (Left err)   = pure $ Left err
      runHandler (Right prod) = sendMessages prod

-- Example sending 2 messages and printing the response from Kafka
sendMessages :: KafkaProducer -> IO (Either KafkaError ())
sendMessages prod = do
  err1 <- produceMessage prod (mkMessage Nothing (Just "test from producer") )
  forM_ err1 print

  err2 <- produceMessage prod (mkMessage (Just "key") (Just "test from producer (with key)"))
  forM_ err2 print

  pure $ Right ()

mkMessage :: Maybe ByteString -> Maybe ByteString -> ProducerRecord
mkMessage k v = ProducerRecord
                  { prTopic = targetTopic
                  , prPartition = UnassignedPartition
                  , prKey = k
                  , prValue = v
                  }
Synopsis

Documentation

data KafkaProducer Source #

The main type for Kafka message production, used e.g. to send messages.

Its constructor is intentionally not exposed, instead, one should used newProducer to acquire such a value.

data ProducerRecord Source #

Represents messages to be enqueued onto a Kafka broker (i.e. used for a producer)

Instances

Instances details
Generic ProducerRecord Source # 
Instance details

Defined in Kafka.Producer.Types

Associated Types

type Rep ProducerRecord 
Instance details

Defined in Kafka.Producer.Types

type Rep ProducerRecord = D1 ('MetaData "ProducerRecord" "Kafka.Producer.Types" "hw-kafka-client-5.3.0-8OZSN6XHxJUEBGq88IRkoi" 'False) (C1 ('MetaCons "ProducerRecord" 'PrefixI 'True) ((S1 ('MetaSel ('Just "prTopic") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 TopicName) :*: S1 ('MetaSel ('Just "prPartition") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 ProducePartition)) :*: (S1 ('MetaSel ('Just "prKey") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe ByteString)) :*: (S1 ('MetaSel ('Just "prValue") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe ByteString)) :*: S1 ('MetaSel ('Just "prHeaders") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Headers)))))
Show ProducerRecord Source # 
Instance details

Defined in Kafka.Producer.Types

Eq ProducerRecord Source # 
Instance details

Defined in Kafka.Producer.Types

type Rep ProducerRecord Source # 
Instance details

Defined in Kafka.Producer.Types

type Rep ProducerRecord = D1 ('MetaData "ProducerRecord" "Kafka.Producer.Types" "hw-kafka-client-5.3.0-8OZSN6XHxJUEBGq88IRkoi" 'False) (C1 ('MetaCons "ProducerRecord" 'PrefixI 'True) ((S1 ('MetaSel ('Just "prTopic") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 TopicName) :*: S1 ('MetaSel ('Just "prPartition") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 ProducePartition)) :*: (S1 ('MetaSel ('Just "prKey") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe ByteString)) :*: (S1 ('MetaSel ('Just "prValue") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe ByteString)) :*: S1 ('MetaSel ('Just "prHeaders") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Headers)))))

data ProducePartition Source #

 

Constructors

SpecifiedPartition !Int

The partition number of the topic

UnassignedPartition

Let the Kafka broker decide the partition

Instances

Instances details
Generic ProducePartition Source # 
Instance details

Defined in Kafka.Producer.Types

Associated Types

type Rep ProducePartition 
Instance details

Defined in Kafka.Producer.Types

type Rep ProducePartition = D1 ('MetaData "ProducePartition" "Kafka.Producer.Types" "hw-kafka-client-5.3.0-8OZSN6XHxJUEBGq88IRkoi" 'False) (C1 ('MetaCons "SpecifiedPartition" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'SourceUnpack 'SourceStrict 'DecidedStrict) (Rec0 Int)) :+: C1 ('MetaCons "UnassignedPartition" 'PrefixI 'False) (U1 :: Type -> Type))
Show ProducePartition Source # 
Instance details

Defined in Kafka.Producer.Types

Eq ProducePartition Source # 
Instance details

Defined in Kafka.Producer.Types

Ord ProducePartition Source # 
Instance details

Defined in Kafka.Producer.Types

type Rep ProducePartition Source # 
Instance details

Defined in Kafka.Producer.Types

type Rep ProducePartition = D1 ('MetaData "ProducePartition" "Kafka.Producer.Types" "hw-kafka-client-5.3.0-8OZSN6XHxJUEBGq88IRkoi" 'False) (C1 ('MetaCons "SpecifiedPartition" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'SourceUnpack 'SourceStrict 'DecidedStrict) (Rec0 Int)) :+: C1 ('MetaCons "UnassignedPartition" 'PrefixI 'False) (U1 :: Type -> Type))

data DeliveryReport Source #

The result of sending a message to the broker, useful for callbacks

Constructors

DeliverySuccess ProducerRecord Offset

The message was successfully sent at this offset

DeliveryFailure ProducerRecord KafkaError

The message could not be sent

NoMessageError KafkaError

An error occurred, but librdkafka did not attach any sent message

Instances

Instances details
Generic DeliveryReport Source # 
Instance details

Defined in Kafka.Producer.Types

Show DeliveryReport Source # 
Instance details

Defined in Kafka.Producer.Types

Eq DeliveryReport Source # 
Instance details

Defined in Kafka.Producer.Types

type Rep DeliveryReport Source # 
Instance details

Defined in Kafka.Producer.Types

newtype ImmediateError Source #

Data type representing an error that is caused by pre-flight conditions not being met

Instances

Instances details
Show ImmediateError Source # 
Instance details

Defined in Kafka.Producer.Types

Eq ImmediateError Source # 
Instance details

Defined in Kafka.Producer.Types

runProducer :: ProducerProperties -> (KafkaProducer -> IO (Either KafkaError a)) -> IO (Either KafkaError a) Source #

Deprecated: Use newProducer/closeProducer instead

Runs Kafka Producer. The callback provided is expected to call produceMessage to send messages to Kafka.

newProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError KafkaProducer) Source #

Creates a new kafka producer A newly created producer must be closed with closeProducer function.

produceMessage :: MonadIO m => KafkaProducer -> ProducerRecord -> m (Maybe KafkaError) Source #

Sends a single message. Since librdkafka is backed by a queue, this function can return before messages are sent. See flushProducer to wait for queue to empty.

produceMessage' :: MonadIO m => KafkaProducer -> ProducerRecord -> (DeliveryReport -> IO ()) -> m (Either ImmediateError ()) Source #

Sends a single message with a registered callback.

The callback can be a long running process, as it is forked by the thread that handles the delivery reports.

flushProducer :: MonadIO m => KafkaProducer -> m () Source #

Drains the outbound queue for a producer. This function is also called automatically when the producer is closed with closeProducer to ensure that all queued messages make it to Kafka.

closeProducer :: MonadIO m => KafkaProducer -> m () Source #

Closes the producer. Will wait until the outbound queue is drained before returning the control.

data RdKafkaRespErrT Source #

Constructors

RdKafkaRespErrBegin 
RdKafkaRespErrBadMsg 
RdKafkaRespErrBadCompression 
RdKafkaRespErrDestroy 
RdKafkaRespErrFail 
RdKafkaRespErrTransport 
RdKafkaRespErrCritSysResource 
RdKafkaRespErrResolve 
RdKafkaRespErrMsgTimedOut 
RdKafkaRespErrPartitionEof 
RdKafkaRespErrUnknownPartition 
RdKafkaRespErrFs 
RdKafkaRespErrUnknownTopic 
RdKafkaRespErrAllBrokersDown 
RdKafkaRespErrInvalidArg 
RdKafkaRespErrTimedOut 
RdKafkaRespErrQueueFull 
RdKafkaRespErrIsrInsuff 
RdKafkaRespErrNodeUpdate 
RdKafkaRespErrSsl 
RdKafkaRespErrWaitCoord 
RdKafkaRespErrUnknownGroup 
RdKafkaRespErrInProgress 
RdKafkaRespErrPrevInProgress 
RdKafkaRespErrExistingSubscription 
RdKafkaRespErrAssignPartitions 
RdKafkaRespErrRevokePartitions 
RdKafkaRespErrConflict 
RdKafkaRespErrState 
RdKafkaRespErrUnknownProtocol 
RdKafkaRespErrNotImplemented 
RdKafkaRespErrAuthentication 
RdKafkaRespErrNoOffset 
RdKafkaRespErrOutdated 
RdKafkaRespErrTimedOutQueue 
RdKafkaRespErrUnsupportedFeature 
RdKafkaRespErrWaitCache 
RdKafkaRespErrIntr 
RdKafkaRespErrKeySerialization 
RdKafkaRespErrValueSerialization 
RdKafkaRespErrKeyDeserialization 
RdKafkaRespErrValueDeserialization 
RdKafkaRespErrPartial 
RdKafkaRespErrReadOnly 
RdKafkaRespErrNoent 
RdKafkaRespErrUnderflow 
RdKafkaRespErrInvalidType 
RdKafkaRespErrRetry 
RdKafkaRespErrPurgeQueue 
RdKafkaRespErrPurgeInflight 
RdKafkaRespErrFatal 
RdKafkaRespErrInconsistent 
RdKafkaRespErrGaplessGuarantee 
RdKafkaRespErrMaxPollExceeded 
RdKafkaRespErrUnknownBroker 
RdKafkaRespErrNotConfigured 
RdKafkaRespErrFenced 
RdKafkaRespErrApplication 
RdKafkaRespErrAssignmentLost 
RdKafkaRespErrNoop 
RdKafkaRespErrAutoOffsetReset 
RdKafkaRespErrEnd 
RdKafkaRespErrUnknown 
RdKafkaRespErrNoError 
RdKafkaRespErrOffsetOutOfRange 
RdKafkaRespErrInvalidMsg 
RdKafkaRespErrUnknownTopicOrPart 
RdKafkaRespErrInvalidMsgSize 
RdKafkaRespErrLeaderNotAvailable 
RdKafkaRespErrNotLeaderForPartition 
RdKafkaRespErrRequestTimedOut 
RdKafkaRespErrBrokerNotAvailable 
RdKafkaRespErrReplicaNotAvailable 
RdKafkaRespErrMsgSizeTooLarge 
RdKafkaRespErrStaleCtrlEpoch 
RdKafkaRespErrOffsetMetadataTooLarge 
RdKafkaRespErrNetworkException 
RdKafkaRespErrCoordinatorLoadInProgress 
RdKafkaRespErrCoordinatorNotAvailable 
RdKafkaRespErrNotCoordinator 
RdKafkaRespErrTopicException 
RdKafkaRespErrRecordListTooLarge 
RdKafkaRespErrNotEnoughReplicas 
RdKafkaRespErrNotEnoughReplicasAfterAppend 
RdKafkaRespErrInvalidRequiredAcks 
RdKafkaRespErrIllegalGeneration 
RdKafkaRespErrInconsistentGroupProtocol 
RdKafkaRespErrInvalidGroupId 
RdKafkaRespErrUnknownMemberId 
RdKafkaRespErrInvalidSessionTimeout 
RdKafkaRespErrRebalanceInProgress 
RdKafkaRespErrInvalidCommitOffsetSize 
RdKafkaRespErrTopicAuthorizationFailed 
RdKafkaRespErrGroupAuthorizationFailed 
RdKafkaRespErrClusterAuthorizationFailed 
RdKafkaRespErrInvalidTimestamp 
RdKafkaRespErrUnsupportedSaslMechanism 
RdKafkaRespErrIllegalSaslState 
RdKafkaRespErrUnsupportedVersion 
RdKafkaRespErrTopicAlreadyExists 
RdKafkaRespErrInvalidPartitions 
RdKafkaRespErrInvalidReplicationFactor 
RdKafkaRespErrInvalidReplicaAssignment 
RdKafkaRespErrInvalidConfig 
RdKafkaRespErrNotController 
RdKafkaRespErrInvalidRequest 
RdKafkaRespErrUnsupportedForMessageFormat 
RdKafkaRespErrPolicyViolation 
RdKafkaRespErrOutOfOrderSequenceNumber 
RdKafkaRespErrDuplicateSequenceNumber 
RdKafkaRespErrInvalidProducerEpoch 
RdKafkaRespErrInvalidTxnState 
RdKafkaRespErrInvalidProducerIdMapping 
RdKafkaRespErrInvalidTransactionTimeout 
RdKafkaRespErrConcurrentTransactions 
RdKafkaRespErrTransactionCoordinatorFenced 
RdKafkaRespErrTransactionalIdAuthorizationFailed 
RdKafkaRespErrSecurityDisabled 
RdKafkaRespErrOperationNotAttempted 
RdKafkaRespErrKafkaStorageError 
RdKafkaRespErrLogDirNotFound 
RdKafkaRespErrSaslAuthenticationFailed 
RdKafkaRespErrUnknownProducerId 
RdKafkaRespErrReassignmentInProgress 
RdKafkaRespErrDelegationTokenAuthDisabled 
RdKafkaRespErrDelegationTokenNotFound 
RdKafkaRespErrDelegationTokenOwnerMismatch 
RdKafkaRespErrDelegationTokenRequestNotAllowed 
RdKafkaRespErrDelegationTokenAuthorizationFailed 
RdKafkaRespErrDelegationTokenExpired 
RdKafkaRespErrInvalidPrincipalType 
RdKafkaRespErrNonEmptyGroup 
RdKafkaRespErrGroupIdNotFound 
RdKafkaRespErrFetchSessionIdNotFound 
RdKafkaRespErrInvalidFetchSessionEpoch 
RdKafkaRespErrListenerNotFound 
RdKafkaRespErrTopicDeletionDisabled 
RdKafkaRespErrFencedLeaderEpoch 
RdKafkaRespErrUnknownLeaderEpoch 
RdKafkaRespErrUnsupportedCompressionType 
RdKafkaRespErrStaleBrokerEpoch 
RdKafkaRespErrOffsetNotAvailable 
RdKafkaRespErrMemberIdRequired 
RdKafkaRespErrPreferredLeaderNotAvailable 
RdKafkaRespErrGroupMaxSizeReached 
RdKafkaRespErrFencedInstanceId 
RdKafkaRespErrEligibleLeadersNotAvailable 
RdKafkaRespErrElectionNotNeeded 
RdKafkaRespErrNoReassignmentInProgress 
RdKafkaRespErrGroupSubscribedToTopic 
RdKafkaRespErrInvalidRecord 
RdKafkaRespErrUnstableOffsetCommit 
RdKafkaRespErrThrottlingQuotaExceeded 
RdKafkaRespErrProducerFenced 
RdKafkaRespErrResourceNotFound 
RdKafkaRespErrDuplicateResource 
RdKafkaRespErrUnacceptableCredential 
RdKafkaRespErrInconsistentVoterSet 
RdKafkaRespErrInvalidUpdateVersion 
RdKafkaRespErrFeatureUpdateFailed 
RdKafkaRespErrPrincipalDeserializationFailure 
RdKafkaRespErrEndAll