MongoDB Atlas v3.30.0 published on Friday, Mar 21, 2025 by Pulumi
mongodbatlas.getStreamProcessor
Explore with Pulumi AI
# Data Source: mongodbatlas.StreamProcessor
mongodbatlas.StreamProcessor describes a stream processor.
Example Usage
S
Coming soon!
Coming soon!
Coming soon!
Coming soon!
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.mongodbatlas.StreamInstance;
import com.pulumi.mongodbatlas.StreamInstanceArgs;
import com.pulumi.mongodbatlas.inputs.StreamInstanceDataProcessRegionArgs;
import com.pulumi.mongodbatlas.StreamConnection;
import com.pulumi.mongodbatlas.StreamConnectionArgs;
import com.pulumi.mongodbatlas.inputs.StreamConnectionDbRoleToExecuteArgs;
import com.pulumi.mongodbatlas.inputs.StreamConnectionAuthenticationArgs;
import com.pulumi.mongodbatlas.inputs.StreamConnectionSecurityArgs;
import com.pulumi.mongodbatlas.StreamProcessor;
import com.pulumi.mongodbatlas.StreamProcessorArgs;
import com.pulumi.mongodbatlas.inputs.StreamProcessorOptionsArgs;
import com.pulumi.mongodbatlas.inputs.StreamProcessorOptionsDlqArgs;
import com.pulumi.mongodbatlas.MongodbatlasFunctions;
import com.pulumi.mongodbatlas.inputs.GetStreamProcessorsArgs;
import com.pulumi.mongodbatlas.inputs.GetStreamProcessorArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }
    public static void stack(Context ctx) {
        var example = new StreamInstance("example", StreamInstanceArgs.builder()
            .projectId(projectId)
            .instanceName("InstanceName")
            .dataProcessRegion(StreamInstanceDataProcessRegionArgs.builder()
                .region("VIRGINIA_USA")
                .cloud_provider("AWS")
                .build())
            .build());
        var example_sample = new StreamConnection("example-sample", StreamConnectionArgs.builder()
            .projectId(projectId)
            .instanceName(example.instanceName())
            .connectionName("sample_stream_solar")
            .type("Sample")
            .build());
        var example_cluster = new StreamConnection("example-cluster", StreamConnectionArgs.builder()
            .projectId(projectId)
            .instanceName(example.instanceName())
            .connectionName("ClusterConnection")
            .type("Cluster")
            .clusterName(clusterName)
            .dbRoleToExecute(StreamConnectionDbRoleToExecuteArgs.builder()
                .role("atlasAdmin")
                .type("BUILT_IN")
                .build())
            .build());
        var example_kafka = new StreamConnection("example-kafka", StreamConnectionArgs.builder()
            .projectId(projectId)
            .instanceName(example.instanceName())
            .connectionName("KafkaPlaintextConnection")
            .type("Kafka")
            .authentication(StreamConnectionAuthenticationArgs.builder()
                .mechanism("PLAIN")
                .username(kafkaUsername)
                .password(kafkaPassword)
                .build())
            .bootstrapServers("localhost:9092,localhost:9092")
            .config(Map.of("auto.offset.reset", "earliest"))
            .security(StreamConnectionSecurityArgs.builder()
                .protocol("PLAINTEXT")
                .build())
            .build());
        var stream_processor_sample_example = new StreamProcessor("stream-processor-sample-example", StreamProcessorArgs.builder()
            .projectId(projectId)
            .instanceName(example.instanceName())
            .processorName("sampleProcessorName")
            .pipeline(serializeJson(
                jsonArray(
                    jsonObject(
                        jsonProperty("$source", jsonObject(
                            jsonProperty("connectionName", mongodbatlasStreamConnection.example-sample().connectionName())
                        ))
                    ), 
                    jsonObject(
                        jsonProperty("$emit", jsonObject(
                            jsonProperty("connectionName", mongodbatlasStreamConnection.example-cluster().connectionName()),
                            jsonProperty("db", "sample"),
                            jsonProperty("coll", "solar"),
                            jsonProperty("timeseries", jsonObject(
                                jsonProperty("timeField", "_ts")
                            ))
                        ))
                    )
                )))
            .state("STARTED")
            .build());
        var stream_processor_cluster_to_kafka_example = new StreamProcessor("stream-processor-cluster-to-kafka-example", StreamProcessorArgs.builder()
            .projectId(projectId)
            .instanceName(example.instanceName())
            .processorName("clusterProcessorName")
            .pipeline(serializeJson(
                jsonArray(
                    jsonObject(
                        jsonProperty("$source", jsonObject(
                            jsonProperty("connectionName", mongodbatlasStreamConnection.example-cluster().connectionName())
                        ))
                    ), 
                    jsonObject(
                        jsonProperty("$emit", jsonObject(
                            jsonProperty("connectionName", mongodbatlasStreamConnection.example-kafka().connectionName()),
                            jsonProperty("topic", "topic_from_cluster")
                        ))
                    )
                )))
            .state("CREATED")
            .build());
        var stream_processor_kafka_to_cluster_example = new StreamProcessor("stream-processor-kafka-to-cluster-example", StreamProcessorArgs.builder()
            .projectId(projectId)
            .instanceName(example.instanceName())
            .processorName("kafkaProcessorName")
            .pipeline(serializeJson(
                jsonArray(
                    jsonObject(
                        jsonProperty("$source", jsonObject(
                            jsonProperty("connectionName", mongodbatlasStreamConnection.example-kafka().connectionName()),
                            jsonProperty("topic", "topic_source")
                        ))
                    ), 
                    jsonObject(
                        jsonProperty("$emit", jsonObject(
                            jsonProperty("connectionName", mongodbatlasStreamConnection.example-cluster().connectionName()),
                            jsonProperty("db", "kafka"),
                            jsonProperty("coll", "topic_source"),
                            jsonProperty("timeseries", jsonObject(
                                jsonProperty("timeField", "ts")
                            ))
                        ))
                    )
                )))
            .state("CREATED")
            .options(StreamProcessorOptionsArgs.builder()
                .dlq(StreamProcessorOptionsDlqArgs.builder()
                    .coll("exampleColumn")
                    .connectionName(mongodbatlasStreamConnection.example-cluster().connectionName())
                    .db("exampleDb")
                    .build())
                .build())
            .build());
        final var example-stream-processors = MongodbatlasFunctions.getStreamProcessors(GetStreamProcessorsArgs.builder()
            .projectId(projectId)
            .instanceName(example.instanceName())
            .build());
        final var example-stream-processor = MongodbatlasFunctions.getStreamProcessor(GetStreamProcessorArgs.builder()
            .projectId(projectId)
            .instanceName(example.instanceName())
            .processorName(stream_processor_sample_example.processorName())
            .build());
        ctx.export("streamProcessorsState", example_stream_processor.applyValue(example_stream_processor -> example_stream_processor.state()));
        ctx.export("streamProcessorsResults", example_stream_processors.applyValue(example_stream_processors -> example_stream_processors.results()));
    }
}
resources:
  example:
    type: mongodbatlas:StreamInstance
    properties:
      projectId: ${projectId}
      instanceName: InstanceName
      dataProcessRegion:
        region: VIRGINIA_USA
        cloud_provider: AWS
  example-sample:
    type: mongodbatlas:StreamConnection
    properties:
      projectId: ${projectId}
      instanceName: ${example.instanceName}
      connectionName: sample_stream_solar
      type: Sample
  example-cluster:
    type: mongodbatlas:StreamConnection
    properties:
      projectId: ${projectId}
      instanceName: ${example.instanceName}
      connectionName: ClusterConnection
      type: Cluster
      clusterName: ${clusterName}
      dbRoleToExecute:
        role: atlasAdmin
        type: BUILT_IN
  example-kafka:
    type: mongodbatlas:StreamConnection
    properties:
      projectId: ${projectId}
      instanceName: ${example.instanceName}
      connectionName: KafkaPlaintextConnection
      type: Kafka
      authentication:
        mechanism: PLAIN
        username: ${kafkaUsername}
        password: ${kafkaPassword}
      bootstrapServers: localhost:9092,localhost:9092
      config:
        auto.offset.reset: earliest
      security:
        protocol: PLAINTEXT
  stream-processor-sample-example:
    type: mongodbatlas:StreamProcessor
    properties:
      projectId: ${projectId}
      instanceName: ${example.instanceName}
      processorName: sampleProcessorName
      pipeline:
        fn::toJSON:
          - $source:
              connectionName: ${mongodbatlasStreamConnection"example-sample"[%!s(MISSING)].connectionName}
          - $emit:
              connectionName: ${mongodbatlasStreamConnection"example-cluster"[%!s(MISSING)].connectionName}
              db: sample
              coll: solar
              timeseries:
                timeField: _ts
      state: STARTED
  stream-processor-cluster-to-kafka-example:
    type: mongodbatlas:StreamProcessor
    properties:
      projectId: ${projectId}
      instanceName: ${example.instanceName}
      processorName: clusterProcessorName
      pipeline:
        fn::toJSON:
          - $source:
              connectionName: ${mongodbatlasStreamConnection"example-cluster"[%!s(MISSING)].connectionName}
          - $emit:
              connectionName: ${mongodbatlasStreamConnection"example-kafka"[%!s(MISSING)].connectionName}
              topic: topic_from_cluster
      state: CREATED
  stream-processor-kafka-to-cluster-example:
    type: mongodbatlas:StreamProcessor
    properties:
      projectId: ${projectId}
      instanceName: ${example.instanceName}
      processorName: kafkaProcessorName
      pipeline:
        fn::toJSON:
          - $source:
              connectionName: ${mongodbatlasStreamConnection"example-kafka"[%!s(MISSING)].connectionName}
              topic: topic_source
          - $emit:
              connectionName: ${mongodbatlasStreamConnection"example-cluster"[%!s(MISSING)].connectionName}
              db: kafka
              coll: topic_source
              timeseries:
                timeField: ts
      state: CREATED
      options:
        dlq:
          coll: exampleColumn
          connectionName: ${mongodbatlasStreamConnection"example-cluster"[%!s(MISSING)].connectionName}
          db: exampleDb
variables:
  example-stream-processors:
    fn::invoke:
      function: mongodbatlas:getStreamProcessors
      arguments:
        projectId: ${projectId}
        instanceName: ${example.instanceName}
  example-stream-processor:
    fn::invoke:
      function: mongodbatlas:getStreamProcessor
      arguments:
        projectId: ${projectId}
        instanceName: ${example.instanceName}
        processorName: ${["stream-processor-sample-example"].processorName}
outputs:
  # example making use of data sources
  streamProcessorsState: ${["example-stream-processor"].state}
  streamProcessorsResults: ${["example-stream-processors"].results}
Using getStreamProcessor
Two invocation forms are available. The direct form accepts plain arguments and either blocks until the result value is available, or returns a Promise-wrapped result. The output form accepts Input-wrapped arguments and returns an Output-wrapped result.
function getStreamProcessor(args: GetStreamProcessorArgs, opts?: InvokeOptions): Promise<GetStreamProcessorResult>
function getStreamProcessorOutput(args: GetStreamProcessorOutputArgs, opts?: InvokeOptions): Output<GetStreamProcessorResult>def get_stream_processor(instance_name: Optional[str] = None,
                         processor_name: Optional[str] = None,
                         project_id: Optional[str] = None,
                         opts: Optional[InvokeOptions] = None) -> GetStreamProcessorResult
def get_stream_processor_output(instance_name: Optional[pulumi.Input[str]] = None,
                         processor_name: Optional[pulumi.Input[str]] = None,
                         project_id: Optional[pulumi.Input[str]] = None,
                         opts: Optional[InvokeOptions] = None) -> Output[GetStreamProcessorResult]func LookupStreamProcessor(ctx *Context, args *LookupStreamProcessorArgs, opts ...InvokeOption) (*LookupStreamProcessorResult, error)
func LookupStreamProcessorOutput(ctx *Context, args *LookupStreamProcessorOutputArgs, opts ...InvokeOption) LookupStreamProcessorResultOutput> Note: This function is named LookupStreamProcessor in the Go SDK.
public static class GetStreamProcessor 
{
    public static Task<GetStreamProcessorResult> InvokeAsync(GetStreamProcessorArgs args, InvokeOptions? opts = null)
    public static Output<GetStreamProcessorResult> Invoke(GetStreamProcessorInvokeArgs args, InvokeOptions? opts = null)
}public static CompletableFuture<GetStreamProcessorResult> getStreamProcessor(GetStreamProcessorArgs args, InvokeOptions options)
public static Output<GetStreamProcessorResult> getStreamProcessor(GetStreamProcessorArgs args, InvokeOptions options)
fn::invoke:
  function: mongodbatlas:index/getStreamProcessor:getStreamProcessor
  arguments:
    # arguments dictionaryThe following arguments are supported:
- InstanceName string
- Human-readable label that identifies the stream instance.
- ProcessorName string
- Human-readable label that identifies the stream processor.
- ProjectId string
- Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- InstanceName string
- Human-readable label that identifies the stream instance.
- ProcessorName string
- Human-readable label that identifies the stream processor.
- ProjectId string
- Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- instanceName String
- Human-readable label that identifies the stream instance.
- processorName String
- Human-readable label that identifies the stream processor.
- projectId String
- Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- instanceName string
- Human-readable label that identifies the stream instance.
- processorName string
- Human-readable label that identifies the stream processor.
- projectId string
- Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- instance_name str
- Human-readable label that identifies the stream instance.
- processor_name str
- Human-readable label that identifies the stream processor.
- project_id str
- Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- instanceName String
- Human-readable label that identifies the stream instance.
- processorName String
- Human-readable label that identifies the stream processor.
- projectId String
- Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
getStreamProcessor Result
The following output properties are available:
- Id string
- InstanceName string
- Human-readable label that identifies the stream instance.
- Options
GetStream Processor Options 
- Pipeline string
- ProcessorName string
- Human-readable label that identifies the stream processor.
- ProjectId string
- Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- State string
- Stats string
- Id string
- InstanceName string
- Human-readable label that identifies the stream instance.
- Options
GetStream Processor Options 
- Pipeline string
- ProcessorName string
- Human-readable label that identifies the stream processor.
- ProjectId string
- Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- State string
- Stats string
- id String
- instanceName String
- Human-readable label that identifies the stream instance.
- options
GetStream Processor Options 
- pipeline String
- processorName String
- Human-readable label that identifies the stream processor.
- projectId String
- Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- state String
- stats String
- id string
- instanceName string
- Human-readable label that identifies the stream instance.
- options
GetStream Processor Options 
- pipeline string
- processorName string
- Human-readable label that identifies the stream processor.
- projectId string
- Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- state string
- stats string
- id str
- instance_name str
- Human-readable label that identifies the stream instance.
- options
GetStream Processor Options 
- pipeline str
- processor_name str
- Human-readable label that identifies the stream processor.
- project_id str
- Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- state str
- stats str
- id String
- instanceName String
- Human-readable label that identifies the stream instance.
- options Property Map
- pipeline String
- processorName String
- Human-readable label that identifies the stream processor.
- projectId String
- Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- state String
- stats String
Supporting Types
GetStreamProcessorOptions   
- Dlq
GetStream Processor Options Dlq 
- Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
- Dlq
GetStream Processor Options Dlq 
- Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
- dlq
GetStream Processor Options Dlq 
- Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
- dlq
GetStream Processor Options Dlq 
- Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
- dlq
GetStream Processor Options Dlq 
- Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
- dlq Property Map
- Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
GetStreamProcessorOptionsDlq    
- Coll string
- Name of the collection to use for the DLQ.
- ConnectionName string
- Name of the connection to write DLQ messages to. Must be an Atlas connection.
- Db string
- Name of the database to use for the DLQ.
- Coll string
- Name of the collection to use for the DLQ.
- ConnectionName string
- Name of the connection to write DLQ messages to. Must be an Atlas connection.
- Db string
- Name of the database to use for the DLQ.
- coll String
- Name of the collection to use for the DLQ.
- connectionName String
- Name of the connection to write DLQ messages to. Must be an Atlas connection.
- db String
- Name of the database to use for the DLQ.
- coll string
- Name of the collection to use for the DLQ.
- connectionName string
- Name of the connection to write DLQ messages to. Must be an Atlas connection.
- db string
- Name of the database to use for the DLQ.
- coll str
- Name of the collection to use for the DLQ.
- connection_name str
- Name of the connection to write DLQ messages to. Must be an Atlas connection.
- db str
- Name of the database to use for the DLQ.
- coll String
- Name of the collection to use for the DLQ.
- connectionName String
- Name of the connection to write DLQ messages to. Must be an Atlas connection.
- db String
- Name of the database to use for the DLQ.
Package Details
- Repository
- MongoDB Atlas pulumi/pulumi-mongodbatlas
- License
- Apache-2.0
- Notes
- This Pulumi package is based on the mongodbatlasTerraform Provider.